X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_openvim%2Fhost_thread.py;fp=osm_openvim%2Fhost_thread.py;h=d8bca2e2e1ca13d091a34566d29610672f520cb8;hb=9f6571090b203922cabb0382226be0fa48d6e046;hp=0000000000000000000000000000000000000000;hpb=ee19576ffab403bf0218974ed96c98dc0375b507;p=osm%2Fopenvim.git diff --git a/osm_openvim/host_thread.py b/osm_openvim/host_thread.py new file mode 100644 index 0000000..d8bca2e --- /dev/null +++ b/osm_openvim/host_thread.py @@ -0,0 +1,2274 @@ +# -*- coding: utf-8 -*- + +## +# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# This file is part of openvim +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +''' +This is thread that interact with the host and the libvirt to manage VM +One thread will be launched per host +''' +__author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal" +__date__ = "$10-jul-2014 12:07:15$" + +import json +import yaml +import threading +import time +import Queue +import paramiko +from jsonschema import validate as js_v, exceptions as js_e +#import libvirt +import imp +from vim_schema import localinfo_schema, hostinfo_schema +import random +import os + +#TODO: insert a logging system + +# from logging import Logger +# import auxiliary_functions as af + +# TODO: insert a logging system + + +class host_thread(threading.Thread): + lvirt_module = None + + def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, + develop_bridge_iface): + '''Init a thread. + Arguments: + 'id' number of thead + 'name' name of thread + 'host','user': host ip or name to manage and user + 'db', 'db_lock': database class and lock to use it in exclusion + ''' + threading.Thread.__init__(self) + self.name = name + self.host = host + self.user = user + self.db = db + self.db_lock = db_lock + self.test = test + + if not test and not host_thread.lvirt_module: + try: + module_info = imp.find_module("libvirt") + host_thread.lvirt_module = imp.load_module("libvirt", *module_info) + except (IOError, ImportError) as e: + raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e)) + + + self.develop_mode = develop_mode + self.develop_bridge_iface = develop_bridge_iface + self.image_path = image_path + self.host_id = host_id + self.version = version + + self.xml_level = 0 + #self.pending ={} + + self.server_status = {} #dictionary with pairs server_uuid:server_status + self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed + self.next_update_server_status = 0 #time when must be check servers status + + self.hostinfo = None + + self.queueLock = threading.Lock() + self.taskQueue = Queue.Queue(2000) + self.ssh_conn = None + + def ssh_connect(self): + try: + #Connect SSH + self.ssh_conn = paramiko.SSHClient() + self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.ssh_conn.load_system_host_keys() + self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None) + except paramiko.ssh_exception.SSHException as e: + text = e.args[0] + print self.name, ": ssh_connect ssh Exception:", text + + def load_localinfo(self): + if not self.test: + try: + #Connect SSH + self.ssh_connect() + + command = 'mkdir -p ' + self.image_path + #print self.name, ': command:', command + (_, stdout, stderr) = self.ssh_conn.exec_command(command) + content = stderr.read() + if len(content) > 0: + print self.name, ': command:', command, "stderr:", content + + command = 'cat ' + self.image_path + '/.openvim.yaml' + #print self.name, ': command:', command + (_, stdout, stderr) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + print self.name, ': command:', command, "stderr:", stderr.read() + raise paramiko.ssh_exception.SSHException("Error empty file ") + self.localinfo = yaml.load(content) + js_v(self.localinfo, localinfo_schema) + self.localinfo_dirty=False + if 'server_files' not in self.localinfo: + self.localinfo['server_files'] = {} + print self.name, ': localinfo load from host' + return + + except paramiko.ssh_exception.SSHException as e: + text = e.args[0] + print self.name, ": load_localinfo ssh Exception:", text + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + print self.name, ": load_localinfo libvirt Exception:", text + except yaml.YAMLError as exc: + text = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) + print self.name, ": load_localinfo yaml format Exception", text + except js_e.ValidationError as e: + text = "" + if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'" + print self.name, ": load_localinfo format Exception:", text, e.message + except Exception as e: + text = str(e) + print self.name, ": load_localinfo Exception:", text + + #not loaded, insert a default data and force saving by activating dirty flag + self.localinfo = {'files':{}, 'server_files':{} } + #self.localinfo_dirty=True + self.localinfo_dirty=False + + def load_hostinfo(self): + if self.test: + return; + try: + #Connect SSH + self.ssh_connect() + + + command = 'cat ' + self.image_path + '/hostinfo.yaml' + #print self.name, ': command:', command + (_, stdout, stderr) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + print self.name, ': command:', command, "stderr:", stderr.read() + raise paramiko.ssh_exception.SSHException("Error empty file ") + self.hostinfo = yaml.load(content) + js_v(self.hostinfo, hostinfo_schema) + print self.name, ': hostlinfo load from host', self.hostinfo + return + + except paramiko.ssh_exception.SSHException as e: + text = e.args[0] + print self.name, ": load_hostinfo ssh Exception:", text + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + print self.name, ": load_hostinfo libvirt Exception:", text + except yaml.YAMLError as exc: + text = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) + print self.name, ": load_hostinfo yaml format Exception", text + except js_e.ValidationError as e: + text = "" + if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'" + print self.name, ": load_hostinfo format Exception:", text, e.message + except Exception as e: + text = str(e) + print self.name, ": load_hostinfo Exception:", text + + #not loaded, insert a default data + self.hostinfo = None + + def save_localinfo(self, tries=3): + if self.test: + self.localinfo_dirty = False + return + + while tries>=0: + tries-=1 + + try: + command = 'cat > ' + self.image_path + '/.openvim.yaml' + print self.name, ': command:', command + (stdin, _, _) = self.ssh_conn.exec_command(command) + yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True) + self.localinfo_dirty = False + break #while tries + + except paramiko.ssh_exception.SSHException as e: + text = e.args[0] + print self.name, ": save_localinfo ssh Exception:", text + if "SSH session not active" in text: + self.ssh_connect() + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + print self.name, ": save_localinfo libvirt Exception:", text + except yaml.YAMLError as exc: + text = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) + print self.name, ": save_localinfo yaml format Exception", text + except Exception as e: + text = str(e) + print self.name, ": save_localinfo Exception:", text + + def load_servers_from_db(self): + self.db_lock.acquire() + r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id}) + self.db_lock.release() + + self.server_status = {} + if r<0: + print self.name, ": Error getting data from database:", c + return + for server in c: + self.server_status[ server['uuid'] ] = server['status'] + + #convert from old version to new one + if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']: + server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' } + if server_files_dict['source file'][-5:] == 'qcow2': + server_files_dict['file format'] = 'qcow2' + + self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict } + if 'inc_files' in self.localinfo: + del self.localinfo['inc_files'] + self.localinfo_dirty = True + + def delete_unused_files(self): + '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database + Deletes unused entries at self.loacalinfo and the corresponding local files. + The only reason for this mismatch is the manual deletion of instances (VM) at database + ''' + if self.test: + return + for uuid,images in self.localinfo['server_files'].items(): + if uuid not in self.server_status: + for localfile in images.values(): + try: + print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid) + self.delete_file(localfile['source file']) + except paramiko.ssh_exception.SSHException as e: + print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e)) + del self.localinfo['server_files'][uuid] + self.localinfo_dirty = True + + def insert_task(self, task, *aditional): + try: + self.queueLock.acquire() + task = self.taskQueue.put( (task,) + aditional, timeout=5) + self.queueLock.release() + return 1, None + except Queue.Full: + return -1, "timeout inserting a task over host " + self.name + + def run(self): + while True: + self.load_localinfo() + self.load_hostinfo() + self.load_servers_from_db() + self.delete_unused_files() + while True: + self.queueLock.acquire() + if not self.taskQueue.empty(): + task = self.taskQueue.get() + else: + task = None + self.queueLock.release() + + if task is None: + now=time.time() + if self.localinfo_dirty: + self.save_localinfo() + elif self.next_update_server_status < now: + self.update_servers_status() + self.next_update_server_status = now + 5 + elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]=0: + break + elif task[0] == 'image': + pass + elif task[0] == 'exit': + print self.name, ": processing task exit" + self.terminate() + return 0 + elif task[0] == 'reload': + print self.name, ": processing task reload terminating and relaunching" + self.terminate() + break + elif task[0] == 'edit-iface': + print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3]) + self.edit_iface(task[1], task[2], task[3]) + elif task[0] == 'restore-iface': + print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2]) + self.restore_iface(task[1], task[2]) + elif task[0] == 'new-ovsbridge': + print self.name, ": Creating compute OVS bridge" + self.create_ovs_bridge() + elif task[0] == 'new-vxlan': + print self.name, ": Creating vxlan tunnel=" + task[1] + ", remote ip=" + task[2] + self.create_ovs_vxlan_tunnel(task[1], task[2]) + elif task[0] == 'del-ovsbridge': + print self.name, ": Deleting OVS bridge" + self.delete_ovs_bridge() + elif task[0] == 'del-vxlan': + print self.name, ": Deleting vxlan " + task[1] + " tunnel" + self.delete_ovs_vxlan_tunnel(task[1]) + elif task[0] == 'create-ovs-bridge-port': + print self.name, ": Adding port ovim-" + task[1] + " to OVS bridge" + self.create_ovs_bridge_port(task[1]) + elif task[0] == 'del-ovs-port': + print self.name, ": Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]) + self.delete_bridge_port_attached_to_ovs(task[1], task[2]) + else: + print self.name, ": unknown task", task + + def server_forceoff(self, wait_until_finished=False): + while len(self.pending_terminate_server)>0: + now = time.time() + if self.pending_terminate_server[0][0]>now: + if wait_until_finished: + time.sleep(1) + continue + else: + return + req={'uuid':self.pending_terminate_server[0][1], + 'action':{'terminate':'force'}, + 'status': None + } + self.action_on_server(req) + self.pending_terminate_server.pop(0) + + def terminate(self): + try: + self.server_forceoff(True) + if self.localinfo_dirty: + self.save_localinfo() + if not self.test: + self.ssh_conn.close() + except Exception as e: + text = str(e) + print self.name, ": terminate Exception:", text + print self.name, ": exit from host_thread" + + def get_local_iface_name(self, generic_name): + if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]: + return self.hostinfo["iface_names"][generic_name] + return generic_name + + def create_xml_server(self, server, dev_list, server_metadata={}): + """Function that implements the generation of the VM XML definition. + Additional devices are in dev_list list + The main disk is upon dev_list[0]""" + + #get if operating system is Windows + windows_os = False + os_type = server_metadata.get('os_type', None) + if os_type == None and 'metadata' in dev_list[0]: + os_type = dev_list[0]['metadata'].get('os_type', None) + if os_type != None and os_type.lower() == "windows": + windows_os = True + #get type of hard disk bus + bus_ide = True if windows_os else False + bus = server_metadata.get('bus', None) + if bus == None and 'metadata' in dev_list[0]: + bus = dev_list[0]['metadata'].get('bus', None) + if bus != None: + bus_ide = True if bus=='ide' else False + + self.xml_level = 0 + + text = "" + #get topology + topo = server_metadata.get('topology', None) + if topo == None and 'metadata' in dev_list[0]: + topo = dev_list[0]['metadata'].get('topology', None) + #name + name = server.get('name','') + "_" + server['uuid'] + name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58 + text += self.inc_tab() + "" + name+ "" + #uuid + text += self.tab() + "" + server['uuid'] + "" + + numa={} + if 'extended' in server and server['extended']!=None and 'numas' in server['extended']: + numa = server['extended']['numas'][0] + #memory + use_huge = False + memory = int(numa.get('memory',0))*1024*1024 #in KiB + if memory==0: + memory = int(server['ram'])*1024; + else: + if not self.develop_mode: + use_huge = True + if memory==0: + return -1, 'No memory assigned to instance' + memory = str(memory) + text += self.tab() + "" +memory+"" + text += self.tab() + "" +memory+ "" + if use_huge: + text += self.tab()+''+ \ + self.inc_tab() + ''+ \ + self.dec_tab()+ '' + + #cpu + use_cpu_pinning=False + vcpus = int(server.get("vcpus",0)) + cpu_pinning = [] + if 'cores-source' in numa: + use_cpu_pinning=True + for index in range(0, len(numa['cores-source'])): + cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] ) + vcpus += 1 + if 'threads-source' in numa: + use_cpu_pinning=True + for index in range(0, len(numa['threads-source'])): + cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] ) + vcpus += 1 + if 'paired-threads-source' in numa: + use_cpu_pinning=True + for index in range(0, len(numa['paired-threads-source'])): + cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] ) + cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] ) + vcpus += 2 + + if use_cpu_pinning and not self.develop_mode: + text += self.tab()+"" +str(len(cpu_pinning)) +"" + \ + self.tab()+'' + self.xml_level += 1 + for i in range(0, len(cpu_pinning)): + text += self.tab() + "" + text += self.dec_tab()+''+ \ + self.tab() + '' +\ + self.inc_tab() + "" +\ + self.dec_tab() + '' + else: + if vcpus==0: + return -1, "Instance without number of cpus" + text += self.tab()+"" + str(vcpus) + "" + + #boot + boot_cdrom = False + for dev in dev_list: + if dev['type']=='cdrom' : + boot_cdrom = True + break + text += self.tab()+ '' + \ + self.inc_tab() + "hvm" + if boot_cdrom: + text += self.tab() + "" + text += self.tab() + "" + \ + self.dec_tab()+'' + #features + text += self.tab()+''+\ + self.inc_tab()+'' +\ + self.tab()+'' +\ + self.tab()+''+ \ + self.dec_tab() +'' + if topo == "oneSocket:hyperthreading": + if vcpus % 2 != 0: + return -1, 'Cannot expose hyperthreading with an odd number of vcpus' + text += self.tab() + " " % vcpus/2 + elif windows_os or topo == "oneSocket": + text += self.tab() + " " % vcpus + else: + text += self.tab() + "" + text += self.tab() + "" +\ + self.tab() + "preserve" + \ + self.tab() + "restart" + \ + self.tab() + "restart" + text += self.tab() + "" + \ + self.inc_tab() + "/usr/libexec/qemu-kvm" + \ + self.tab() + "" +\ + self.inc_tab() + "" + \ + self.dec_tab() + "" +\ + self.tab() + "" + \ + self.inc_tab()+ "" + \ + self.dec_tab()+'' + if windows_os: + text += self.tab() + "" + \ + self.tab() + "" + \ + self.tab() + "" + \ + self.tab() + "" + \ + self.tab() + "" + \ + self.tab() + "" + \ + self.tab() + "" #TODO revisar + +#> self.tab()+'\n' +\ +#> self.dec_tab()+'\n' +\ +#> self.tab()+'\n' + if windows_os: + text += self.tab() + "" + else: + #If image contains 'GRAPH' include graphics + #if 'GRAPH' in image: + text += self.tab() + "" +\ + self.inc_tab() + "" +\ + self.dec_tab() + "" + + vd_index = 'a' + for dev in dev_list: + bus_ide_dev = bus_ide + if dev['type']=='cdrom' or dev['type']=='disk': + if dev['type']=='cdrom': + bus_ide_dev = True + text += self.tab() + "" + if 'file format' in dev: + text += self.inc_tab() + "" + if 'source file' in dev: + text += self.tab() + "" + #elif v['type'] == 'block': + # text += self.tab() + "" + #else: + # return -1, 'Unknown disk type ' + v['type'] + vpci = dev.get('vpci',None) + if vpci == None: + vpci = dev['metadata'].get('vpci',None) + text += self.pci2xml(vpci) + + if bus_ide_dev: + text += self.tab() + "" #TODO allows several type of disks + else: + text += self.tab() + "" + text += self.dec_tab() + '' + vd_index = chr(ord(vd_index)+1) + elif dev['type']=='xml': + dev_text = dev['xml'] + if 'vpci' in dev: + dev_text = dev_text.replace('__vpci__', dev['vpci']) + if 'source file' in dev: + dev_text = dev_text.replace('__file__', dev['source file']) + if 'file format' in dev: + dev_text = dev_text.replace('__format__', dev['source file']) + if '__dev__' in dev_text: + dev_text = dev_text.replace('__dev__', vd_index) + vd_index = chr(ord(vd_index)+1) + text += dev_text + else: + return -1, 'Unknown device type ' + dev['type'] + + net_nb=0 + bridge_interfaces = server.get('networks', []) + for v in bridge_interfaces: + #Get the brifge name + self.db_lock.acquire() + result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} ) + self.db_lock.release() + if result <= 0: + print "create_xml_server ERROR getting nets",result, content + return -1, content + #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM + #I know it is not secure + #for v in sorted(desc['network interfaces'].itervalues()): + model = v.get("model", None) + if content[0]['provider']=='default': + text += self.tab() + "" + \ + self.inc_tab() + "" + elif content[0]['provider'][0:7]=='macvtap': + text += self.tab()+"" + \ + self.inc_tab() + "" + \ + self.tab() + "" + if windows_os: + text += self.tab() + "" + elif model==None: + model = "virtio" + elif content[0]['provider'][0:6]=='bridge': + text += self.tab() + "" + \ + self.inc_tab()+"" + if windows_os: + text += self.tab() + "" +\ + self.tab() + "" + elif model==None: + model = "virtio" + elif content[0]['provider'][0:3] == "OVS": + vlan = content[0]['provider'].replace('OVS:', '') + text += self.tab() + "" + \ + self.inc_tab() + "" + else: + return -1, 'Unknown Bridge net provider ' + content[0]['provider'] + if model!=None: + text += self.tab() + "" + if v.get('mac_address', None) != None: + text+= self.tab() +"" + text += self.pci2xml(v.get('vpci',None)) + text += self.dec_tab()+'' + + net_nb += 1 + + interfaces = numa.get('interfaces', []) + + net_nb=0 + for v in interfaces: + if self.develop_mode: #map these interfaces to bridges + text += self.tab() + "" + \ + self.inc_tab()+"" + if windows_os: + text += self.tab() + "" +\ + self.tab() + "" + else: + text += self.tab() + "" #e1000 is more probable to be supported than 'virtio' + if v.get('mac_address', None) != None: + text+= self.tab() +"" + text += self.pci2xml(v.get('vpci',None)) + text += self.dec_tab()+'' + continue + + if v['dedicated'] == 'yes': #passthrought + text += self.tab() + "" + \ + self.inc_tab() + "" + self.inc_tab() + text += self.pci2xml(v['source']) + text += self.dec_tab()+'' + text += self.pci2xml(v.get('vpci',None)) + if windows_os: + text += self.tab() + "" + text += self.dec_tab()+'' + net_nb += 1 + else: #sriov_interfaces + #skip not connected interfaces + if v.get("net_id") == None: + continue + text += self.tab() + "" + self.inc_tab() + if v.get('mac_address', None) != None: + text+= self.tab() + "" + text+= self.tab()+'' + self.inc_tab() + text += self.pci2xml(v['source']) + text += self.dec_tab()+'' + if v.get('vlan',None) != None: + text += self.tab() + " " + text += self.pci2xml(v.get('vpci',None)) + if windows_os: + text += self.tab() + "" + text += self.dec_tab()+'' + + + text += self.dec_tab()+''+\ + self.dec_tab()+'' + return 0, text + + def pci2xml(self, pci): + '''from a pci format text XXXX:XX:XX.X generates the xml content of
+ alows an empty pci text''' + if pci is None: + return "" + first_part = pci.split(':') + second_part = first_part[2].split('.') + return self.tab() + "
" + + def tab(self): + """Return indentation according to xml_level""" + return "\n" + (' '*self.xml_level) + + def inc_tab(self): + """Increment and return indentation according to xml_level""" + self.xml_level += 1 + return self.tab() + + def dec_tab(self): + """Decrement and return indentation according to xml_level""" + self.xml_level -= 1 + return self.tab() + + def create_ovs_bridge(self): + """ + Create a bridge in compute OVS to allocate VMs + :return: True if success + """ + if self.test: + return + command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return True + else: + return False + + def delete_port_to_ovs_bridge(self, vlan, net_uuid): + """ + Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed + :param vlan: vlan port id + :param net_uuid: network id + :return: + """ + + if self.test: + return + + port_name = 'ovim-' + vlan + command = 'sudo ovs-vsctl del-port br-int ' + port_name + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return True + else: + return False + + def delete_dhcp_server(self, vlan, net_uuid, dhcp_path): + """ + Delete dhcp server process lining in namespace + :param vlan: segmentation id + :param net_uuid: network uuid + :param dhcp_path: conf fiel path that live in namespace side + :return: + """ + if self.test: + return + if not self.is_dhcp_port_free(vlan, net_uuid): + return True + + net_namespace = 'ovim-' + vlan + dhcp_path = os.path.join(dhcp_path, net_namespace) + pid_file = os.path.join(dhcp_path, 'dnsmasq.pid') + + command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + # if len(content) == 0: + # return True + # else: + # return False + + def is_dhcp_port_free(self, host_id, net_uuid): + """ + Check if any port attached to the a net in a vxlan mesh across computes nodes + :param host_id: host id + :param net_uuid: network id + :return: True if is not free + """ + self.db_lock.acquire() + result, content = self.db.get_table( + FROM='ports', + WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid} + ) + self.db_lock.release() + + if len(content) > 0: + return False + else: + return True + + def is_port_free(self, host_id, net_uuid): + """ + Check if there not ovs ports of a network in a compute host. + :param host_id: host id + :param net_uuid: network id + :return: True if is not free + """ + + self.db_lock.acquire() + result, content = self.db.get_table( + FROM='ports as p join instances as i on p.instance_id=i.uuid', + WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid} + ) + self.db_lock.release() + + if len(content) > 0: + return False + else: + return True + + def add_port_to_ovs_bridge(self, vlan): + """ + Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge + :param vlan: vlan port id + :return: True if success + """ + + if self.test: + return + + port_name = 'ovim-' + vlan + command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + vlan + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return True + else: + return False + + def delete_dhcp_port(self, vlan, net_uuid): + """ + Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself. + :param vlan: segmentation id + :param net_uuid: network id + :return: True if success + """ + + if self.test: + return + + if not self.is_dhcp_port_free(vlan, net_uuid): + return True + self.delete_dhcp_interfaces(vlan) + return True + + def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid): + """ + Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself. + :param vlan: + :param net_uuid: + :return: True if success + """ + if self.test: + return + + if not self.is_port_free(vlan, net_uuid): + return True + self.delete_port_to_ovs_bridge(vlan, net_uuid) + self.delete_linux_bridge(vlan) + return True + + def delete_linux_bridge(self, vlan): + """ + Delete a linux bridge in a scpecific compute. + :param vlan: vlan port id + :return: True if success + """ + + if self.test: + return + + port_name = 'ovim-' + vlan + command = 'sudo ip link set dev veth0-' + vlan + ' down' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + # + # if len(content) != 0: + # return False + + command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return True + else: + return False + + def create_ovs_bridge_port(self, vlan): + """ + Generate a linux bridge and attache the port to a OVS bridge + :param vlan: vlan port id + :return: + """ + if self.test: + return + self.create_linux_bridge(vlan) + self.add_port_to_ovs_bridge(vlan) + + def create_linux_bridge(self, vlan): + """ + Create a linux bridge with STP active + :param vlan: netowrk vlan id + :return: + """ + + if self.test: + return + + port_name = 'ovim-' + vlan + command = 'sudo brctl show | grep ' + port_name + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + # if exist nothing to create + # if len(content) == 0: + # return False + + command = 'sudo brctl addbr ' + port_name + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + # if len(content) == 0: + # return True + # else: + # return False + + command = 'sudo brctl stp ' + port_name + ' on' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + # if len(content) == 0: + # return True + # else: + # return False + command = 'sudo ip link set dev ' + port_name + ' up' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + if len(content) == 0: + return True + else: + return False + + def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path): + """ + Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address + :param ip: IP address asigned to a VM + :param mac: VM vnic mac to be macthed with the IP received + :param vlan: Segmentation id + :param netmask: netmask value + :param path: dhcp conf file path that live in namespace side + :return: True if success + """ + + if self.test: + return + + net_namespace = 'ovim-' + vlan + dhcp_path = os.path.join(dhcp_path, net_namespace) + dhcp_hostsdir = os.path.join(dhcp_path, net_namespace) + + if not ip: + return False + + ip_data = mac.upper() + ',' + ip + + command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"' + + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + if len(content) == 0: + return True + else: + return False + + def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path): + """ + Delete into dhcp conf file the ip assigned to a specific MAC address + + :param ip: IP address asigned to a VM + :param mac: VM vnic mac to be macthed with the IP received + :param vlan: Segmentation id + :param dhcp_path: dhcp conf file path that live in namespace side + :return: + """ + + if self.test: + return + + net_namespace = 'ovim-' + vlan + dhcp_path = os.path.join(dhcp_path, net_namespace) + dhcp_hostsdir = os.path.join(dhcp_path, net_namespace) + + if not ip: + return False + + ip_data = mac.upper() + ',' + ip + + command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + if len(content) == 0: + return True + else: + return False + + def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway): + """ + Generate a linux bridge and attache the port to a OVS bridge + :param self: + :param vlan: Segmentation id + :param ip_range: IP dhcp range + :param netmask: network netmask + :param dhcp_path: dhcp conf file path that live in namespace side + :param gateway: Gateway address for dhcp net + :return: True if success + """ + + if self.test: + return + + interface = 'tap-' + vlan + net_namespace = 'ovim-' + vlan + dhcp_path = os.path.join(dhcp_path, net_namespace) + leases_path = os.path.join(dhcp_path, "dnsmasq.leases") + pid_file = os.path.join(dhcp_path, 'dnsmasq.pid') + + dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask + + command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + pid_path = os.path.join(dhcp_path, 'dnsmasq.pid') + command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + # check if pid is runing + pid_status_path = content + if content: + command = "ps aux | awk '{print $2 }' | grep " + pid_status_path + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if not content: + command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \ + '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \ + ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \ + ' --listen-address ' + gateway + + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.readline() + + if len(content) == 0: + return True + else: + return False + + def delete_dhcp_interfaces(self, vlan): + """ + Create a linux bridge with STP active + :param vlan: netowrk vlan id + :return: + """ + + if self.test: + return + + net_namespace = 'ovim-' + vlan + command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + vlan + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' down' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip link set dev ovs-tap-' + vlan + ' down' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + def create_dhcp_interfaces(self, vlan, ip, netmask): + """ + Create a linux bridge with STP active + :param vlan: segmentation id + :param ip: Ip included in the dhcp range for the tap interface living in namesapce side + :param netmask: dhcp net CIDR + :return: True if success + """ + + if self.test: + return + + net_namespace = 'ovim-' + vlan + namespace_interface = 'tap-' + vlan + + command = 'sudo ip netns add ' + net_namespace + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip link add tap-' + vlan + ' type veth peer name ovs-tap-' + vlan + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + vlan + ' tag=' + vlan + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip link set tap-' + vlan + ' netns ' + net_namespace + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' up' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip link set dev ovs-tap-' + vlan + ' up' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \ + + ' ' + ip + ' netmask ' + netmask + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + + if len(content) == 0: + return True + else: + return False + + def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip): + """ + Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level + :param vxlan_interface: vlxan inteface name. + :param remote_ip: tunnel endpoint remote compute ip. + :return: + """ + if self.test: + return + command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \ + ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \ + ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + print content + if len(content) == 0: + return True + else: + return False + + def delete_ovs_vxlan_tunnel(self, vxlan_interface): + """ + Delete a vlxan tunnel port from a OVS brdige. + :param vxlan_interface: vlxan name to be delete it. + :return: True if success. + """ + if self.test: + return + command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + print content + if len(content) == 0: + return True + else: + return False + + def delete_ovs_bridge(self): + """ + Delete a OVS bridge from a compute. + :return: True if success + """ + if self.test: + return + command = 'sudo ovs-vsctl del-br br-int' + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return True + else: + return False + + def get_file_info(self, path): + command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path + print self.name, ': command:', command + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + return None # file does not exist + else: + return content.split(" ") #(permission, 1, owner, group, size, date, file) + + def qemu_get_info(self, path): + command = 'qemu-img info ' + path + print self.name, ': command:', command + (_, stdout, stderr) = self.ssh_conn.exec_command(command) + content = stdout.read() + if len(content) == 0: + error = stderr.read() + print self.name, ": get_qemu_info error ", error + raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error) + else: + try: + return yaml.load(content) + except yaml.YAMLError as exc: + text = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) + print self.name, ": get_qemu_info yaml format Exception", text + raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text) + + def qemu_change_backing(self, inc_file, new_backing_file): + command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file + print self.name, ': command:', command + (_, _, stderr) = self.ssh_conn.exec_command(command) + content = stderr.read() + if len(content) == 0: + return 0 + else: + print self.name, ": qemu_change_backing error: ", content + return -1 + + def get_notused_filename(self, proposed_name, suffix=''): + '''Look for a non existing file_name in the host + proposed_name: proposed file name, includes path + suffix: suffix to be added to the name, before the extention + ''' + extension = proposed_name.rfind(".") + slash = proposed_name.rfind("/") + if extension < 0 or extension < slash: # no extension + extension = len(proposed_name) + target_name = proposed_name[:extension] + suffix + proposed_name[extension:] + info = self.get_file_info(target_name) + if info is None: + return target_name + + index=0 + while info is not None: + target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:] + index+=1 + info = self.get_file_info(target_name) + return target_name + + def get_notused_path(self, proposed_path, suffix=''): + '''Look for a non existing path at database for images + proposed_path: proposed file name, includes path + suffix: suffix to be added to the name, before the extention + ''' + extension = proposed_path.rfind(".") + if extension < 0: + extension = len(proposed_path) + if suffix != None: + target_path = proposed_path[:extension] + suffix + proposed_path[extension:] + index=0 + while True: + r,_=self.db.get_table(FROM="images",WHERE={"path":target_path}) + if r<=0: + return target_path + target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:] + index+=1 + + + def delete_file(self, file_name): + command = 'rm -f '+file_name + print self.name, ': command:', command + (_, _, stderr) = self.ssh_conn.exec_command(command) + error_msg = stderr.read() + if len(error_msg) > 0: + raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg) + + def copy_file(self, source, destination, perserve_time=True): + if source[0:4]=="http": + command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format( + dst=destination, src=source, dst_result=destination + ".result" ) + else: + command = 'cp --no-preserve=mode' + if perserve_time: + command += ' --preserve=timestamps' + command += " '{}' '{}'".format(source, destination) + print self.name, ': command:', command + (_, _, stderr) = self.ssh_conn.exec_command(command) + error_msg = stderr.read() + if len(error_msg) > 0: + raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg) + + def copy_remote_file(self, remote_file, use_incremental): + ''' Copy a file from the repository to local folder and recursively + copy the backing files in case the remote file is incremental + Read and/or modified self.localinfo['files'] that contain the + unmodified copies of images in the local path + params: + remote_file: path of remote file + use_incremental: None (leave the decision to this function), True, False + return: + local_file: name of local file + qemu_info: dict with quemu information of local file + use_incremental_out: True, False; same as use_incremental, but if None a decision is taken + ''' + + use_incremental_out = use_incremental + new_backing_file = None + local_file = None + file_from_local = True + + #in case incremental use is not decided, take the decision depending on the image + #avoid the use of incremental if this image is already incremental + if remote_file[0:4] == "http": + file_from_local = False + if file_from_local: + qemu_remote_info = self.qemu_get_info(remote_file) + if use_incremental_out==None: + use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info) + #copy recursivelly the backing files + if file_from_local and 'backing file' in qemu_remote_info: + new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True) + + #check if remote file is present locally + if use_incremental_out and remote_file in self.localinfo['files']: + local_file = self.localinfo['files'][remote_file] + local_file_info = self.get_file_info(local_file) + if file_from_local: + remote_file_info = self.get_file_info(remote_file) + if local_file_info == None: + local_file = None + elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]): + #local copy of file not valid because date or size are different. + #TODO DELETE local file if this file is not used by any active virtual machine + try: + self.delete_file(local_file) + del self.localinfo['files'][remote_file] + except Exception: + pass + local_file = None + else: #check that the local file has the same backing file, or there are not backing at all + qemu_info = self.qemu_get_info(local_file) + if new_backing_file != qemu_info.get('backing file'): + local_file = None + + + if local_file == None: #copy the file + img_name= remote_file.split('/') [-1] + img_local = self.image_path + '/' + img_name + local_file = self.get_notused_filename(img_local) + self.copy_file(remote_file, local_file, use_incremental_out) + + if use_incremental_out: + self.localinfo['files'][remote_file] = local_file + if new_backing_file: + self.qemu_change_backing(local_file, new_backing_file) + qemu_info = self.qemu_get_info(local_file) + + return local_file, qemu_info, use_incremental_out + + def launch_server(self, conn, server, rebuild=False, domain=None): + if self.test: + time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real + return 0, 'Success' + + server_id = server['uuid'] + paused = server.get('paused','no') + try: + if domain!=None and rebuild==False: + domain.resume() + #self.server_status[server_id] = 'ACTIVE' + return 0, 'Success' + + self.db_lock.acquire() + result, server_data = self.db.get_instance(server_id) + self.db_lock.release() + if result <= 0: + print self.name, ": launch_server ERROR getting server from DB",result, server_data + return result, server_data + + #0: get image metadata + server_metadata = server.get('metadata', {}) + use_incremental = None + + if "use_incremental" in server_metadata: + use_incremental = False if server_metadata["use_incremental"]=="no" else True + + server_host_files = self.localinfo['server_files'].get( server['uuid'], {}) + if rebuild: + #delete previous incremental files + for file_ in server_host_files.values(): + self.delete_file(file_['source file'] ) + server_host_files={} + + #1: obtain aditional devices (disks) + #Put as first device the main disk + devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ] + if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']: + devices += server_data['extended']['devices'] + + for dev in devices: + if dev['image_id'] == None: + continue + + self.db_lock.acquire() + result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'), + WHERE={'uuid': dev['image_id']}) + self.db_lock.release() + if result <= 0: + error_text = "ERROR", result, content, "when getting image", dev['image_id'] + print self.name, ": launch_server", error_text + return -1, error_text + if content[0]['metadata'] is not None: + dev['metadata'] = json.loads(content[0]['metadata']) + else: + dev['metadata'] = {} + + if dev['image_id'] in server_host_files: + dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path + dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2 + continue + + #2: copy image to host + remote_file = content[0]['path'] + use_incremental_image = use_incremental + if dev['metadata'].get("use_incremental") == "no": + use_incremental_image = False + local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image) + + #create incremental image + if use_incremental_image: + local_file_inc = self.get_notused_filename(local_file, '.inc') + command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file + print 'command:', command + (_, _, stderr) = self.ssh_conn.exec_command(command) + error_msg = stderr.read() + if len(error_msg) > 0: + raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg) + local_file = local_file_inc + qemu_info = {'file format':'qcow2'} + + server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']} + + dev['source file'] = local_file + dev['file format'] = qemu_info['file format'] + + self.localinfo['server_files'][ server['uuid'] ] = server_host_files + self.localinfo_dirty = True + + #3 Create XML + result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file + if result <0: + print self.name, ": create xml server error:", xml + return -2, xml + print self.name, ": create xml:", xml + atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0 + #4 Start the domain + if not rebuild: #ensures that any pending destroying server is done + self.server_forceoff(True) + #print self.name, ": launching instance" #, xml + conn.createXML(xml, atribute) + #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE' + + return 0, 'Success' + + except paramiko.ssh_exception.SSHException as e: + text = e.args[0] + print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text) + if "SSH session not active" in text: + self.ssh_connect() + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text) + except Exception as e: + text = str(e) + print self.name, ": launch_server(%s) Exception: %s" %(server_id, text) + return -1, text + + def update_servers_status(self): + # # virDomainState + # VIR_DOMAIN_NOSTATE = 0 + # VIR_DOMAIN_RUNNING = 1 + # VIR_DOMAIN_BLOCKED = 2 + # VIR_DOMAIN_PAUSED = 3 + # VIR_DOMAIN_SHUTDOWN = 4 + # VIR_DOMAIN_SHUTOFF = 5 + # VIR_DOMAIN_CRASHED = 6 + # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended + + if self.test or len(self.server_status)==0: + return + + try: + conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + domains= conn.listAllDomains() + domain_dict={} + for domain in domains: + uuid = domain.UUIDString() ; + libvirt_status = domain.state() + #print libvirt_status + if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN: + new_status = "ACTIVE" + elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED: + new_status = "PAUSED" + elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF: + new_status = "INACTIVE" + elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED: + new_status = "ERROR" + else: + new_status = None + domain_dict[uuid] = new_status + conn.close() + except host_thread.lvirt_module.libvirtError as e: + print self.name, ": get_state() Exception '", e.get_error_message() + return + + for server_id, current_status in self.server_status.iteritems(): + new_status = None + if server_id in domain_dict: + new_status = domain_dict[server_id] + else: + new_status = "INACTIVE" + + if new_status == None or new_status == current_status: + continue + if new_status == 'INACTIVE' and current_status == 'ERROR': + continue #keep ERROR status, because obviously this machine is not running + #change status + print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status + STATUS={'progress':100, 'status':new_status} + if new_status == 'ERROR': + STATUS['last_error'] = 'machine has crashed' + self.db_lock.acquire() + r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False) + self.db_lock.release() + if r>=0: + self.server_status[server_id] = new_status + + def action_on_server(self, req, last_retry=True): + '''Perform an action on a req + Attributes: + req: dictionary that contain: + server properties: 'uuid','name','tenant_id','status' + action: 'action' + host properties: 'user', 'ip_name' + return (error, text) + 0: No error. VM is updated to new state, + -1: Invalid action, as trying to pause a PAUSED VM + -2: Error accessing host + -3: VM nor present + -4: Error at DB access + -5: Error while trying to perform action. VM is updated to ERROR + ''' + server_id = req['uuid'] + conn = None + new_status = None + old_status = req['status'] + last_error = None + + if self.test: + if 'terminate' in req['action']: + new_status = 'deleted' + elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']: + if req['status']!='ERROR': + time.sleep(5) + new_status = 'INACTIVE' + elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE' + elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE' + elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED' + elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE' + elif 'rebuild' in req['action']: + time.sleep(random.randint(20,150)) + new_status = 'ACTIVE' + elif 'createImage' in req['action']: + time.sleep(5) + self.create_image(None, req) + else: + try: + conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + try: + dom = conn.lookupByUUIDString(server_id) + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text: + dom = None + else: + print self.name, ": action_on_server(",server_id,") libvirt exception:", text + raise e + + if 'forceOff' in req['action']: + if dom == None: + print self.name, ": action_on_server(",server_id,") domain not running" + else: + try: + print self.name, ": sending DESTROY to server", server_id + dom.destroy() + except Exception as e: + if "domain is not running" not in e.get_error_message(): + print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message() + last_error = 'action_on_server Exception while destroy: ' + e.get_error_message() + new_status = 'ERROR' + + elif 'terminate' in req['action']: + if dom == None: + print self.name, ": action_on_server(",server_id,") domain not running" + new_status = 'deleted' + else: + try: + if req['action']['terminate'] == 'force': + print self.name, ": sending DESTROY to server", server_id + dom.destroy() + new_status = 'deleted' + else: + print self.name, ": sending SHUTDOWN to server", server_id + dom.shutdown() + self.pending_terminate_server.append( (time.time()+10,server_id) ) + except Exception as e: + print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message() + last_error = 'action_on_server Exception while destroy: ' + e.get_error_message() + new_status = 'ERROR' + if "domain is not running" in e.get_error_message(): + try: + dom.undefine() + new_status = 'deleted' + except Exception: + print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message() + last_error = 'action_on_server Exception2 while undefine:', e.get_error_message() + #Exception: 'virDomainDetachDevice() failed' + if new_status=='deleted': + if server_id in self.server_status: + del self.server_status[server_id] + if req['uuid'] in self.localinfo['server_files']: + for file_ in self.localinfo['server_files'][ req['uuid'] ].values(): + try: + self.delete_file(file_['source file']) + except Exception: + pass + del self.localinfo['server_files'][ req['uuid'] ] + self.localinfo_dirty = True + + elif 'shutoff' in req['action'] or 'shutdown' in req['action']: + try: + if dom == None: + print self.name, ": action_on_server(",server_id,") domain not running" + else: + dom.shutdown() +# new_status = 'INACTIVE' + #TODO: check status for changing at database + except Exception as e: + new_status = 'ERROR' + print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message() + last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message() + + elif 'rebuild' in req['action']: + if dom != None: + dom.destroy() + r = self.launch_server(conn, req, True, None) + if r[0] <0: + new_status = 'ERROR' + last_error = r[1] + else: + new_status = 'ACTIVE' + elif 'start' in req['action']: + # The instance is only create in DB but not yet at libvirt domain, needs to be create + rebuild = True if req['action']['start'] == 'rebuild' else False + r = self.launch_server(conn, req, rebuild, dom) + if r[0] <0: + new_status = 'ERROR' + last_error = r[1] + else: + new_status = 'ACTIVE' + + elif 'resume' in req['action']: + try: + if dom == None: + pass + else: + dom.resume() +# new_status = 'ACTIVE' + except Exception as e: + print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message() + + elif 'pause' in req['action']: + try: + if dom == None: + pass + else: + dom.suspend() +# new_status = 'PAUSED' + except Exception as e: + print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message() + + elif 'reboot' in req['action']: + try: + if dom == None: + pass + else: + dom.reboot() + print self.name, ": action_on_server(",server_id,") reboot:" + #new_status = 'ACTIVE' + except Exception as e: + print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message() + elif 'createImage' in req['action']: + self.create_image(dom, req) + + + conn.close() + except host_thread.lvirt_module.libvirtError as e: + if conn is not None: conn.close() + text = e.get_error_message() + new_status = "ERROR" + last_error = text + print self.name, ": action_on_server(",server_id,") Exception '", text + if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text: + print self.name, ": action_on_server(",server_id,") Exception removed from host" + #end of if self.test + if new_status == None: + return 1 + + print self.name, ": action_on_server(",server_id,") new status", new_status, last_error + UPDATE = {'progress':100, 'status':new_status} + + if new_status=='ERROR': + if not last_retry: #if there will be another retry do not update database + return -1 + elif 'terminate' in req['action']: + #PUT a log in the database + print self.name, ": PANIC deleting server", server_id, last_error + self.db_lock.acquire() + self.db.new_row('logs', + {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic', + 'description':'PANIC deleting server from host '+self.name+': '+last_error} + ) + self.db_lock.release() + if server_id in self.server_status: + del self.server_status[server_id] + return -1 + else: + UPDATE['last_error'] = last_error + if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') : + self.db_lock.acquire() + self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True) + self.server_status[server_id] = new_status + self.db_lock.release() + if new_status == 'ERROR': + return -1 + return 1 + + + def restore_iface(self, name, mac, lib_conn=None): + ''' make an ifdown, ifup to restore default parameter of na interface + Params: + mac: mac address of the interface + lib_conn: connection to the libvirt, if None a new connection is created + Return 0,None if ok, -1,text if fails + ''' + conn=None + ret = 0 + error_text=None + if self.test: + print self.name, ": restore_iface '%s' %s" % (name, mac) + return 0, None + try: + if not lib_conn: + conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + else: + conn = lib_conn + + #wait to the pending VM deletion + #TODO.Revise self.server_forceoff(True) + + iface = conn.interfaceLookupByMACString(mac) + iface.destroy() + iface.create() + print self.name, ": restore_iface '%s' %s" % (name, mac) + except host_thread.lvirt_module.libvirtError as e: + error_text = e.get_error_message() + print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text) + ret=-1 + finally: + if lib_conn is None and conn is not None: + conn.close() + return ret, error_text + + + def create_image(self,dom, req): + if self.test: + if 'path' in req['action']['createImage']: + file_dst = req['action']['createImage']['path'] + else: + createImage=req['action']['createImage'] + img_name= createImage['source']['path'] + index=img_name.rfind('/') + file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2') + image_status='ACTIVE' + else: + for retry in (0,1): + try: + server_id = req['uuid'] + createImage=req['action']['createImage'] + file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file'] + if 'path' in req['action']['createImage']: + file_dst = req['action']['createImage']['path'] + else: + img_name= createImage['source']['path'] + index=img_name.rfind('/') + file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2') + + self.copy_file(file_orig, file_dst) + qemu_info = self.qemu_get_info(file_orig) + if 'backing file' in qemu_info: + for k,v in self.localinfo['files'].items(): + if v==qemu_info['backing file']: + self.qemu_change_backing(file_dst, k) + break + image_status='ACTIVE' + break + except paramiko.ssh_exception.SSHException as e: + image_status='ERROR' + error_text = e.args[0] + print self.name, "': create_image(",server_id,") ssh Exception:", error_text + if "SSH session not active" in error_text and retry==0: + self.ssh_connect() + except Exception as e: + image_status='ERROR' + error_text = str(e) + print self.name, "': create_image(",server_id,") Exception:", error_text + + #TODO insert a last_error at database + self.db_lock.acquire() + self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst}, + {'uuid':req['new_image']['uuid']}, log=True) + self.db_lock.release() + + def edit_iface(self, port_id, old_net, new_net): + #This action imply remove and insert interface to put proper parameters + if self.test: + time.sleep(1) + else: + #get iface details + self.db_lock.acquire() + r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id', + WHERE={'port_id': port_id}) + self.db_lock.release() + if r<0: + print self.name, ": edit_iface(",port_id,") DDBB error:", c + return + elif r==0: + print self.name, ": edit_iface(",port_id,") por not found" + return + port=c[0] + if port["model"]!="VF": + print self.name, ": edit_iface(",port_id,") ERROR model must be VF" + return + #create xml detach file + xml=[] + self.xml_level = 2 + xml.append("") + xml.append(" ") + xml.append(" "+ self.pci2xml(port['pci'])+"\n ") + xml.append('') + + + try: + conn=None + conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + dom = conn.lookupByUUIDString(port["instance_id"]) + if old_net: + text="\n".join(xml) + print self.name, ": edit_iface detaching SRIOV interface", text + dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE) + if new_net: + xml[-1] =" " + self.xml_level = 1 + xml.append(self.pci2xml(port.get('vpci',None)) ) + xml.append('') + text="\n".join(xml) + print self.name, ": edit_iface attaching SRIOV interface", text + dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE) + + except host_thread.lvirt_module.libvirtError as e: + text = e.get_error_message() + print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text + + finally: + if conn is not None: conn.close() + + +def create_server(server, db, db_lock, only_of_ports): + #print "server" + #print "server" + #print server + #print "server" + #print "server" + #try: +# host_id = server.get('host_id', None) + extended = server.get('extended', None) + +# print '----------------------' +# print json.dumps(extended, indent=4) + + requirements={} + requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]} + requirements['ram'] = server['flavor'].get('ram', 0) + if requirements['ram']== None: + requirements['ram'] = 0 + requirements['vcpus'] = server['flavor'].get('vcpus', 0) + if requirements['vcpus']== None: + requirements['vcpus'] = 0 + #If extended is not defined get requirements from flavor + if extended is None: + #If extended is defined in flavor convert to dictionary and use it + if 'extended' in server['flavor'] and server['flavor']['extended'] != None: + json_acceptable_string = server['flavor']['extended'].replace("'", "\"") + extended = json.loads(json_acceptable_string) + else: + extended = None + #print json.dumps(extended, indent=4) + + #For simplicity only one numa VM are supported in the initial implementation + if extended != None: + numas = extended.get('numas', []) + if len(numas)>1: + return (-2, "Multi-NUMA VMs are not supported yet") + #elif len(numas)<1: + # return (-1, "At least one numa must be specified") + + #a for loop is used in order to be ready to multi-NUMA VMs + request = [] + for numa in numas: + numa_req = {} + numa_req['memory'] = numa.get('memory', 0) + if 'cores' in numa: + numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved + numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved + numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads + elif 'paired-threads' in numa: + numa_req['proc_req_nb'] = numa['paired-threads'] + numa_req['proc_req_type'] = 'paired-threads' + numa_req['proc_req_list'] = numa.get('paired-threads-id', None) + elif 'threads' in numa: + numa_req['proc_req_nb'] = numa['threads'] + numa_req['proc_req_type'] = 'threads' + numa_req['proc_req_list'] = numa.get('threads-id', None) + else: + numa_req['proc_req_nb'] = 0 # by default + numa_req['proc_req_type'] = 'threads' + + + + #Generate a list of sriov and another for physical interfaces + interfaces = numa.get('interfaces', []) + sriov_list = [] + port_list = [] + for iface in interfaces: + iface['bandwidth'] = int(iface['bandwidth']) + if iface['dedicated'][:3]=='yes': + port_list.append(iface) + else: + sriov_list.append(iface) + + #Save lists ordered from more restrictive to less bw requirements + numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True) + numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True) + + + request.append(numa_req) + + # print "----------\n"+json.dumps(request[0], indent=4) + # print '----------\n\n' + + #Search in db for an appropriate numa for each requested numa + #at the moment multi-NUMA VMs are not supported + if len(request)>0: + requirements['numa'].update(request[0]) + if requirements['numa']['memory']>0: + requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory + elif requirements['ram']==0: + return (-1, "Memory information not set neither at extended field not at ram") + if requirements['numa']['proc_req_nb']>0: + requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus + elif requirements['vcpus']==0: + return (-1, "Processor information not set neither at extended field not at vcpus") + + + db_lock.acquire() + result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports) + db_lock.release() + + if result == -1: + return (-1, content) + + numa_id = content['numa_id'] + host_id = content['host_id'] + + #obtain threads_id and calculate pinning + cpu_pinning = [] + reserved_threads=[] + if requirements['numa']['proc_req_nb']>0: + db_lock.acquire() + result, content = db.get_table(FROM='resources_core', + SELECT=('id','core_id','thread_id'), + WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} ) + db_lock.release() + if result <= 0: + print content + return -1, content + + #convert rows to a dictionary indexed by core_id + cores_dict = {} + for row in content: + if not row['core_id'] in cores_dict: + cores_dict[row['core_id']] = [] + cores_dict[row['core_id']].append([row['thread_id'],row['id']]) + + #In case full cores are requested + paired = 'N' + if requirements['numa']['proc_req_type'] == 'cores': + #Get/create the list of the vcpu_ids + vcpu_id_list = requirements['numa']['proc_req_list'] + if vcpu_id_list == None: + vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb'])) + + for threads in cores_dict.itervalues(): + #we need full cores + if len(threads) != 2: + continue + + #set pinning for the first thread + cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] ) + + #reserve so it is not used the second thread + reserved_threads.append(threads[1][1]) + + if len(vcpu_id_list) == 0: + break + + #In case paired threads are requested + elif requirements['numa']['proc_req_type'] == 'paired-threads': + paired = 'Y' + #Get/create the list of the vcpu_ids + if requirements['numa']['proc_req_list'] != None: + vcpu_id_list = [] + for pair in requirements['numa']['proc_req_list']: + if len(pair)!=2: + return -1, "Field paired-threads-id not properly specified" + return + vcpu_id_list.append(pair[0]) + vcpu_id_list.append(pair[1]) + else: + vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb'])) + + for threads in cores_dict.itervalues(): + #we need full cores + if len(threads) != 2: + continue + #set pinning for the first thread + cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]]) + + #set pinning for the second thread + cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]]) + + if len(vcpu_id_list) == 0: + break + + #In case normal threads are requested + elif requirements['numa']['proc_req_type'] == 'threads': + #Get/create the list of the vcpu_ids + vcpu_id_list = requirements['numa']['proc_req_list'] + if vcpu_id_list == None: + vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb'])) + + for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])): + threads = cores_dict[threads_index] + #set pinning for the first thread + cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]]) + + #if exists, set pinning for the second thread + if len(threads) == 2 and len(vcpu_id_list) != 0: + cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]]) + + if len(vcpu_id_list) == 0: + break + + #Get the source pci addresses for the selected numa + used_sriov_ports = [] + for port in requirements['numa']['sriov_list']: + db_lock.acquire() + result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} ) + db_lock.release() + if result <= 0: + print content + return -1, content + for row in content: + if row['id'] in used_sriov_ports or row['id']==port['port_id']: + continue + port['pci'] = row['pci'] + if 'mac_address' not in port: + port['mac_address'] = row['mac'] + del port['mac'] + port['port_id']=row['id'] + port['Mbps_used'] = port['bandwidth'] + used_sriov_ports.append(row['id']) + break + + for port in requirements['numa']['port_list']: + port['Mbps_used'] = None + if port['dedicated'] != "yes:sriov": + port['mac_address'] = port['mac'] + del port['mac'] + continue + db_lock.acquire() + result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} ) + db_lock.release() + if result <= 0: + print content + return -1, content + port['Mbps_used'] = content[0]['Mbps'] + for row in content: + if row['id'] in used_sriov_ports or row['id']==port['port_id']: + continue + port['pci'] = row['pci'] + if 'mac_address' not in port: + port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports + del port['mac'] + port['port_id']=row['id'] + used_sriov_ports.append(row['id']) + break + + # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4) + # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4) + + server['host_id'] = host_id + + + #Generate dictionary for saving in db the instance resources + resources = {} + resources['bridged-ifaces'] = [] + + numa_dict = {} + numa_dict['interfaces'] = [] + + numa_dict['interfaces'] += requirements['numa']['port_list'] + numa_dict['interfaces'] += requirements['numa']['sriov_list'] + + #Check bridge information + unified_dataplane_iface=[] + unified_dataplane_iface += requirements['numa']['port_list'] + unified_dataplane_iface += requirements['numa']['sriov_list'] + + for control_iface in server.get('networks', []): + control_iface['net_id']=control_iface.pop('uuid') + #Get the brifge name + db_lock.acquire() + result, content = db.get_table(FROM='nets', + SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp', + 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'), + WHERE={'uuid': control_iface['net_id']}) + db_lock.release() + if result < 0: + pass + elif result==0: + return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id'] + else: + network=content[0] + if control_iface.get("type", 'virtual') == 'virtual': + if network['type']!='bridge_data' and network['type']!='bridge_man': + return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id'] + resources['bridged-ifaces'].append(control_iface) + if network.get("provider") and network["provider"][0:3] == "OVS": + control_iface["type"] = "instance:ovs" + else: + control_iface["type"] = "instance:bridge" + if network.get("vlan"): + control_iface["vlan"] = network["vlan"] + + if network.get("enable_dhcp") == 'true': + control_iface["enable_dhcp"] = network.get("enable_dhcp") + control_iface["dhcp_first_ip"] = network["dhcp_first_ip"] + control_iface["dhcp_last_ip"] = network["dhcp_last_ip"] + control_iface["cidr"] = network["cidr"] + else: + if network['type']!='data' and network['type']!='ptp': + return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id'] + #dataplane interface, look for it in the numa tree and asign this network + iface_found=False + for dataplane_iface in numa_dict['interfaces']: + if dataplane_iface['name'] == control_iface.get("name"): + if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \ + (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \ + (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") : + return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \ + (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"]) + dataplane_iface['uuid'] = control_iface['net_id'] + if dataplane_iface['dedicated'] == "no": + dataplane_iface['vlan'] = network['vlan'] + if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"): + dataplane_iface['mac_address'] = control_iface.get("mac_address") + if control_iface.get("vpci"): + dataplane_iface['vpci'] = control_iface.get("vpci") + iface_found=True + break + if not iface_found: + return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name") + + resources['host_id'] = host_id + resources['image_id'] = server['image_id'] + resources['flavor_id'] = server['flavor_id'] + resources['tenant_id'] = server['tenant_id'] + resources['ram'] = requirements['ram'] + resources['vcpus'] = requirements['vcpus'] + resources['status'] = 'CREATING' + + if 'description' in server: resources['description'] = server['description'] + if 'name' in server: resources['name'] = server['name'] + + resources['extended'] = {} #optional + resources['extended']['numas'] = [] + numa_dict['numa_id'] = numa_id + numa_dict['memory'] = requirements['numa']['memory'] + numa_dict['cores'] = [] + + for core in cpu_pinning: + numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired}) + for core in reserved_threads: + numa_dict['cores'].append({'id': core}) + resources['extended']['numas'].append(numa_dict) + if extended!=None and 'devices' in extended: #TODO allow extra devices without numa + resources['extended']['devices'] = extended['devices'] + + + print '===================================={' + print json.dumps(resources, indent=4) + print '====================================}' + + return 0, resources +