X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_openvim%2Fhost_thread.py;h=00d253e02408f3c93c6989c327ec8c9c13a65252;hb=caeb224da9840534e46ae0e8f5e194575521c668;hp=80d99864672fb3d66b2d5187969aba9b312151d0;hpb=47fa483a5a13bbfaf1dd5461b08a4402c4d3df03;p=osm%2Fopenvim.git diff --git a/osm_openvim/host_thread.py b/osm_openvim/host_thread.py index 80d9986..00d253e 100644 --- a/osm_openvim/host_thread.py +++ b/osm_openvim/host_thread.py @@ -34,20 +34,21 @@ import threading import time import Queue import paramiko -from jsonschema import validate as js_v, exceptions as js_e -#import libvirt +# import subprocess +# import libvirt import imp -from vim_schema import localinfo_schema, hostinfo_schema import random import os import logging +from jsonschema import validate as js_v, exceptions as js_e +from vim_schema import localinfo_schema, hostinfo_schema class host_thread(threading.Thread): lvirt_module = None def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, - develop_bridge_iface, logger_name=None, debug=None): + develop_bridge_iface, password=None, keyfile = None, logger_name=None, debug=None): '''Init a thread. Arguments: 'id' number of thead @@ -62,6 +63,8 @@ class host_thread(threading.Thread): self.db = db self.db_lock = db_lock self.test = test + self.password = password + self.keyfile = keyfile self.localinfo_dirty = False if not test and not host_thread.lvirt_module: @@ -82,6 +85,7 @@ class host_thread(threading.Thread): self.develop_mode = develop_mode self.develop_bridge_iface = develop_bridge_iface self.image_path = image_path + self.empty_image_path = image_path self.host_id = host_id self.version = version @@ -97,33 +101,38 @@ class host_thread(threading.Thread): self.queueLock = threading.Lock() self.taskQueue = Queue.Queue(2000) self.ssh_conn = None + self.lvirt_conn_uri = "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format( + user=self.user, host=self.host) + if keyfile: + self.lvirt_conn_uri += "&keyfile=" + keyfile def ssh_connect(self): try: - #Connect SSH + # Connect SSH self.ssh_conn = paramiko.SSHClient() self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_conn.load_system_host_keys() - self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None) + self.ssh_conn.connect(self.host, username=self.user, password=self.password, key_filename=self.keyfile, + timeout=10) #, None) except paramiko.ssh_exception.SSHException as e: text = e.args[0] self.logger.error("ssh_connect ssh Exception: " + text) - + def load_localinfo(self): if not self.test: try: - #Connect SSH + # Connect SSH self.ssh_connect() - + command = 'mkdir -p ' + self.image_path - #print self.name, ': command:', command + # print self.name, ': command:', command (_, stdout, stderr) = self.ssh_conn.exec_command(command) content = stderr.read() if len(content) > 0: self.logger.error("command: '%s' stderr: '%s'", command, content) command = 'cat ' + self.image_path + '/.openvim.yaml' - #print self.name, ': command:', command + # print self.name, ': command:', command (_, stdout, stderr) = self.ssh_conn.exec_command(command) content = stdout.read() if len(content) == 0: @@ -136,7 +145,7 @@ class host_thread(threading.Thread): self.localinfo['server_files'] = {} self.logger.debug("localinfo load from host") return - + except paramiko.ssh_exception.SSHException as e: text = e.args[0] self.logger.error("load_localinfo ssh Exception: " + text) @@ -214,13 +223,14 @@ class host_thread(threading.Thread): tries-=1 try: - command = 'cat > ' + self.image_path + '/.openvim.yaml' + command = 'cat > ' + self.image_path + '/.openvim.yaml' self.logger.debug("command:" + command) (stdin, _, _) = self.ssh_conn.exec_command(command) yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True) + self.localinfo_dirty = False break #while tries - + except paramiko.ssh_exception.SSHException as e: text = e.args[0] self.logger.error("save_localinfo ssh Exception: " + text) @@ -427,8 +437,7 @@ class host_thread(threading.Thread): if topo == None and 'metadata' in dev_list[0]: topo = dev_list[0]['metadata'].get('topology', None) #name - name = server.get('name','') + "_" + server['uuid'] - name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58 + name = server.get('name', '')[:28] + "_" + server['uuid'][:28] #qemu impose a length limit of 59 chars or not start. Using 58 text += self.inc_tab() + "" + name+ "" #uuid text += self.tab() + "" + server['uuid'] + "" @@ -511,7 +520,7 @@ class host_thread(threading.Thread): if topo == "oneSocket:hyperthreading": if vcpus % 2 != 0: return -1, 'Cannot expose hyperthreading with an odd number of vcpus' - text += self.tab() + " " % vcpus/2 + text += self.tab() + " " % (vcpus/2) elif windows_os or topo == "oneSocket": text += self.tab() + " " % vcpus else: @@ -567,7 +576,7 @@ class host_thread(threading.Thread): #else: # return -1, 'Unknown disk type ' + v['type'] vpci = dev.get('vpci',None) - if vpci == None: + if vpci == None and 'metadata' in dev: vpci = dev['metadata'].get('vpci',None) text += self.pci2xml(vpci) @@ -813,7 +822,7 @@ class host_thread(threading.Thread): self.db_lock.acquire() result, content = self.db.get_table( FROM='ports', - WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid} + WHERE={'type': 'instance:ovs', 'net_id': net_uuid} ) self.db_lock.release() @@ -1215,6 +1224,11 @@ class host_thread(threading.Thread): (_, stdout, _) = self.ssh_conn.exec_command(command) content = stdout.read() + command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev lo up' + self.logger.debug("command: " + command) + (_, stdout, _) = self.ssh_conn.exec_command(command) + content = stdout.read() + command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \ + ' ' + ip_listen_address + ' netmask ' + netmask self.logger.debug("command: " + command) @@ -1345,6 +1359,24 @@ class host_thread(threading.Thread): else: self.logger.error("qemu_change_backing error: " + content) return -1 + + def qemu_create_empty_disk(self, dev): + + if not dev and 'source' not in dev and 'file format' not in dev and 'image_size' not in dev: + self.logger.error("qemu_create_empty_disk error: missing image parameter") + return -1 + + empty_disk_path = dev['source file'] + + command = 'qemu-img create -f qcow2 ' + empty_disk_path + ' ' + str(dev['image_size']) + 'G' + self.logger.debug("command: " + command) + (_, _, stderr) = self.ssh_conn.exec_command(command) + content = stderr.read() + if len(content) == 0: + return 0 + else: + self.logger.error("qemu_create_empty_disk error: " + content) + return -1 def get_notused_filename(self, proposed_name, suffix=''): '''Look for a non existing file_name in the host @@ -1516,31 +1548,45 @@ class host_thread(threading.Thread): devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ] if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']: devices += server_data['extended']['devices'] - + empty_path = None for dev in devices: - if dev['image_id'] == None: + image_id = dev.get('image_id') + if not image_id: + import uuid + uuid_empty = str(uuid.uuid4()) + empty_path = self.empty_image_path + uuid_empty + '.qcow2' # local path for empty disk + + dev['source file'] = empty_path + dev['file format'] = 'qcow2' + self.qemu_create_empty_disk(dev) + server_host_files[uuid_empty] = {'source file': empty_path, + 'file format': dev['file format']} + continue - - self.db_lock.acquire() - result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'), - WHERE={'uuid': dev['image_id']}) - self.db_lock.release() - if result <= 0: - error_text = "ERROR", result, content, "when getting image", dev['image_id'] - self.logger.error("launch_server " + error_text) - return -1, error_text - if content[0]['metadata'] is not None: - dev['metadata'] = json.loads(content[0]['metadata']) else: - dev['metadata'] = {} - - if dev['image_id'] in server_host_files: - dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path - dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2 - continue + self.db_lock.acquire() + result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'), + WHERE={'uuid': image_id}) + self.db_lock.release() + if result <= 0: + error_text = "ERROR", result, content, "when getting image", dev['image_id'] + self.logger.error("launch_server " + error_text) + return -1, error_text + if content[0]['metadata'] is not None: + dev['metadata'] = json.loads(content[0]['metadata']) + else: + dev['metadata'] = {} + + if image_id in server_host_files: + dev['source file'] = server_host_files[image_id]['source file'] #local path + dev['file format'] = server_host_files[image_id]['file format'] # raw or qcow2 + continue #2: copy image to host - remote_file = content[0]['path'] + if image_id: + remote_file = content[0]['path'] + else: + remote_file = empty_path use_incremental_image = use_incremental if dev['metadata'].get("use_incremental") == "no": use_incremental_image = False @@ -1605,12 +1651,12 @@ class host_thread(threading.Thread): # VIR_DOMAIN_SHUTOFF = 5 # VIR_DOMAIN_CRASHED = 6 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended - + if self.test or len(self.server_status)==0: - return - + return + try: - conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + conn = host_thread.lvirt_module.open(self.lvirt_conn_uri) domains= conn.listAllDomains() domain_dict={} for domain in domains: @@ -1699,7 +1745,7 @@ class host_thread(threading.Thread): self.create_image(None, req) else: try: - conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + conn = host_thread.lvirt_module.open(self.lvirt_conn_uri) try: dom = conn.lookupByUUIDString(server_id) except host_thread.lvirt_module.libvirtError as e: @@ -1893,7 +1939,7 @@ class host_thread(threading.Thread): return 0, None try: if not lib_conn: - conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + conn = host_thread.lvirt_module.open(self.lvirt_conn_uri) else: conn = lib_conn @@ -1990,12 +2036,12 @@ class host_thread(threading.Thread): xml.append("") xml.append(" ") xml.append(" "+ self.pci2xml(port['pci'])+"\n ") - xml.append('') + xml.append('') try: conn=None - conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system") + conn = host_thread.lvirt_module.open(self.lvirt_conn_uri) dom = conn.lookupByUUIDString(port["instance_id"]) if old_net: text="\n".join(xml)