From: tierno Date: Mon, 3 Jul 2017 15:04:54 +0000 (+0200) Subject: Unify ssh_command. Allow remote ssh with paramiko and localhost with subprocess X-Git-Tag: v3.0.0rc14~6 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fopenvim.git;a=commitdiff_plain;h=c67abe2ff35fcdd5b85db97915a7833ab31beb56 Unify ssh_command. Allow remote ssh with paramiko and localhost with subprocess Get real IP address from name for the vxlan remote_ip Change-Id: I8363654e6402b81c2e7ac513ab32e6a5aa325b56 Signed-off-by: tierno --- diff --git a/openvimd b/openvimd index b50187d..b4f6283 100755 --- a/openvimd +++ b/openvimd @@ -232,11 +232,11 @@ if __name__ == "__main__": exit(1) if config_dic['network_type'] == 'ovs' \ - and config_dic['ovs_controller_ip'] == 'localhost' \ - and not (config_dic['mode'] == 'test' or config_dic['mode'] == "OF only"): + and config_dic['ovs_controller_ip'][:4] == '127.': + # and not (config_dic['mode'] == 'test' or config_dic['mode'] == "OF only"): - error_msg = "Error: invalid value '{}' for ovs_controller_ip at {}. " \ - "Use a valid IP address".format(config_dic['ovs_controller_ip'], config_file) + error_msg = "Error: invalid value '{}' for ovs_controller_ip at {}. Use 'localhost' word instead "\ + "of a loopback IP address".format(config_dic['ovs_controller_ip'], config_file) print ("!! {} ".format(error_msg)) logger.error(error_msg) diff --git a/osm_openvim/host_thread.py b/osm_openvim/host_thread.py index 6fe8331..6c3c7e2 100644 --- a/osm_openvim/host_thread.py +++ b/osm_openvim/host_thread.py @@ -34,7 +34,7 @@ import threading import time import Queue import paramiko -# import subprocess +import subprocess # import libvirt import imp import random @@ -43,19 +43,21 @@ import logging from jsonschema import validate as js_v, exceptions as js_e from vim_schema import localinfo_schema, hostinfo_schema +class RunCommandException(Exception): + pass class host_thread(threading.Thread): lvirt_module = None def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface, password=None, keyfile = None, logger_name=None, debug=None): - '''Init a thread. - Arguments: - 'id' number of thead - 'name' name of thread - 'host','user': host ip or name to manage and user - 'db', 'db_lock': database class and lock to use it in exclusion - ''' + """Init a thread to communicate with compute node or ovs_controller. + :param host_id: host identity + :param name: name of the thread + :param host: host ip or name to manage and user + :param user, password, keyfile: user and credentials to connect to host + :param db, db_lock': database class and lock to use it in exclusion + """ threading.Thread.__init__(self) self.name = name self.host = host @@ -64,7 +66,7 @@ class host_thread(threading.Thread): self.db_lock = db_lock self.test = test self.password = password - self.keyfile = keyfile + self.keyfile = keyfile self.localinfo_dirty = False if not test and not host_thread.lvirt_module: @@ -90,7 +92,7 @@ class host_thread(threading.Thread): self.version = version self.xml_level = 0 - #self.pending ={} + # self.pending ={} self.server_status = {} #dictionary with pairs server_uuid:server_status self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed @@ -101,11 +103,76 @@ class host_thread(threading.Thread): self.queueLock = threading.Lock() self.taskQueue = Queue.Queue(2000) self.ssh_conn = None - self.connectivity = True + self.run_command_session = None + self.error = None + self.localhost = True if host == 'localhost' else False self.lvirt_conn_uri = "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format( user=self.user, host=self.host) if keyfile: self.lvirt_conn_uri += "&keyfile=" + keyfile + self.remote_ip = None + self.local_ip = None + + def run_command(self, command, keep_session=False): + """Run a command passed as a str on a localhost or at remote machine. + :param command: text with the command to execute. + :param keep_session: if True it returns a for sending input with '.write("text\n")'. + A command with keep_session=True MUST be followed by a command with keep_session=False in order to + close the session and get the output + :return: the output of the command if 'keep_session=False' or the object if 'keep_session=True' + :raises: RunCommandException if command fails + """ + if self.run_command_session and keep_session: + raise RunCommandException("Internal error. A command with keep_session=True must be followed by another " + "command with keep_session=False to close session") + try: + if self.localhost: + if self.run_command_session: + p = self.run_command_session + self.run_command_session = None + (output, outerror) = p.communicate() + returncode = p.returncode + p.stdin.close() + elif keep_session: + p = subprocess.Popen(('bash', "-c", command), stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.run_command_session = p + return p.stdin + else: + output = subprocess.check_output(('bash', "-c", command)) + returncode = 0 + else: + if self.run_command_session: + (i, o, e) = self.run_command_session + self.run_command_session = None + i.channel.shutdown_write() + else: + if not self.ssh_conn: + self.ssh_connect() + (i, o, e) = self.ssh_conn.exec_command(command, timeout=10) + if keep_session: + self.run_command_session = (i, o, e) + return i + returncode = o.channel.recv_exit_status() + output = o.read() + outerror = e.read() + if returncode != 0: + text = "run_command='{}' Error='{}'".format(command, outerror) + self.logger.error(text) + raise RunCommandException(text) + + self.logger.debug("run_command='{}' result='{}'".format(command, output)) + return output + + except RunCommandException: + raise + except subprocess.CalledProcessError as e: + text = "run_command Exception '{}' '{}'".format(str(e), e.output) + except (paramiko.ssh_exception.SSHException, Exception) as e: + text = "run_command='{}' Exception='{}'".format(command, str(e)) + self.ssh_conn = None + self.run_command_session = None + raise RunCommandException(text) def ssh_connect(self): try: @@ -114,14 +181,18 @@ class host_thread(threading.Thread): self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_conn.load_system_host_keys() self.ssh_conn.connect(self.host, username=self.user, password=self.password, key_filename=self.keyfile, - timeout=10) #, None) - except paramiko.ssh_exception.SSHException as e: - text = e.args[0] - self.logger.error("ssh_connect ssh Exception: " + text) + timeout=10) # auth_timeout=10) + self.remote_ip = self.ssh_conn.get_transport().sock.getpeername()[0] + self.local_ip = self.ssh_conn.get_transport().sock.getsockname()[0] + except (paramiko.ssh_exception.SSHException, Exception) as e: + text = 'ssh connect Exception: {}'.format(e) + self.ssh_conn = None + self.error = text + raise def check_connectivity(self): if not self.test: - + # TODO change to run_command try: if not self.ssh_conn: self.ssh_connect() @@ -144,34 +215,17 @@ class host_thread(threading.Thread): def load_localinfo(self): if not self.test: try: - # Connect SSH - self.ssh_connect() - - command = 'mkdir -p ' + self.image_path - # print self.name, ': command:', command - (_, stdout, stderr) = self.ssh_conn.exec_command(command) - content = stderr.read() - if len(content) > 0: - self.logger.error("command: '%s' stderr: '%s'", command, content) - - command = 'cat ' + self.image_path + '/.openvim.yaml' - # print self.name, ': command:', command - (_, stdout, stderr) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - self.logger.error("command: '%s' stderr='%s'", command, stderr.read()) - raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command)) - self.localinfo = yaml.load(content) + self.run_command('sudo mkdir -p ' + self.image_path) + result = self.run_command('cat {}/.openvim.yaml'.format(self.image_path)) + self.localinfo = yaml.load(result) js_v(self.localinfo, localinfo_schema) self.localinfo_dirty = False if 'server_files' not in self.localinfo: self.localinfo['server_files'] = {} - self.logger.debug("localinfo load from host") + self.logger.debug("localinfo loaded from host") return - - except paramiko.ssh_exception.SSHException as e: - text = e.args[0] - self.logger.error("load_localinfo ssh Exception: " + text) + except RunCommandException as e: + self.logger.error("load_localinfo Exception: " + str(e)) except host_thread.lvirt_module.libvirtError as e: text = e.get_error_message() self.logger.error("load_localinfo libvirt Exception: " + text) @@ -189,34 +243,22 @@ class host_thread(threading.Thread): text = str(e) self.logger.error("load_localinfo Exception: " + text) - #not loaded, insert a default data and force saving by activating dirty flag + # not loaded, insert a default data and force saving by activating dirty flag self.localinfo = {'files':{}, 'server_files':{} } - #self.localinfo_dirty=True + # self.localinfo_dirty=True self.localinfo_dirty=False def load_hostinfo(self): if self.test: - return; + return try: - #Connect SSH - self.ssh_connect() - - - command = 'cat ' + self.image_path + '/hostinfo.yaml' - #print self.name, ': command:', command - (_, stdout, stderr) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - self.logger.error("command: '%s' stderr: '%s'", command, stderr.read()) - raise paramiko.ssh_exception.SSHException("Error empty file ") - self.hostinfo = yaml.load(content) + result = self.run_command('cat {}/hostinfo.yaml'.format(self.image_path)) + self.hostinfo = yaml.load(result) js_v(self.hostinfo, hostinfo_schema) - self.logger.debug("hostlinfo load from host " + str(self.hostinfo)) + self.logger.debug("hostinfo load from host " + str(self.hostinfo)) return - - except paramiko.ssh_exception.SSHException as e: - text = e.args[0] - self.logger.error("load_hostinfo ssh Exception: " + text) + except RunCommandException as e: + self.logger.error("load_hostinfo ssh Exception: " + str(e)) except host_thread.lvirt_module.libvirtError as e: text = e.get_error_message() self.logger.error("load_hostinfo libvirt Exception: " + text) @@ -229,7 +271,7 @@ class host_thread(threading.Thread): except js_e.ValidationError as e: text = "" if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'" - self.logger.error("load_hostinfo format Exception: %s %s", text, e.message) + self.logger.error("load_hostinfo format Exception: %s %s", text, str(e)) except Exception as e: text = str(e) self.logger.error("load_hostinfo Exception: " + text) @@ -246,19 +288,17 @@ class host_thread(threading.Thread): tries-=1 try: - command = 'cat > ' + self.image_path + '/.openvim.yaml' - self.logger.debug("command:" + command) - (stdin, _, _) = self.ssh_conn.exec_command(command) - yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True) + command = 'cat > {}/.openvim.yaml'.format(self.image_path) + in_stream = self.run_command(command, keep_session=True) + yaml.safe_dump(self.localinfo, in_stream, explicit_start=True, indent=4, default_flow_style=False, + tags=False, encoding='utf-8', allow_unicode=True) + result = self.run_command(command, keep_session=False) # to end session self.localinfo_dirty = False break #while tries - except paramiko.ssh_exception.SSHException as e: - text = e.args[0] - self.logger.error("save_localinfo ssh Exception: " + text) - if "SSH session not active" in text: - self.ssh_connect() + except RunCommandException as e: + self.logger.error("save_localinfo ssh Exception: " + str(e)) except host_thread.lvirt_module.libvirtError as e: text = e.get_error_message() self.logger.error("save_localinfo libvirt Exception: " + text) @@ -308,7 +348,7 @@ class host_thread(threading.Thread): try: self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid) self.delete_file(localfile['source file']) - except paramiko.ssh_exception.SSHException as e: + except RunCommandException as e: self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e)) del self.localinfo['server_files'][uuid] self.localinfo_dirty = True @@ -760,15 +800,10 @@ class host_thread(threading.Thread): try: - command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true' - self.logger.debug("command: " + command) - (_, stdout, _) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - return True - else: - return False - except paramiko.ssh_exception.SSHException as e: + self.run_command('sudo', 'ovs-vsctl', '--may-exist', 'add-br', 'br-int', '--', 'set', 'Bridge', 'br-int', + 'stp_enable=true') + return True + except RunCommandException as e: self.logger.error("create_ovs_bridge ssh Exception: " + str(e)) if "SSH session not active" in str(e): self.ssh_connect() @@ -787,17 +822,10 @@ class host_thread(threading.Thread): try: port_name = 'ovim-' + str(vlan) command = 'sudo ovs-vsctl del-port br-int ' + port_name - self.logger.debug("command: " + command) - (_, stdout, _) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - return True - else: - return False - except paramiko.ssh_exception.SSHException as e: + self.run_command(command) + return True + except RunCommandException as e: self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e)) - if "SSH session not active" in str(e): - self.ssh_connect() return False def delete_dhcp_server(self, vlan, net_uuid, dhcp_path): @@ -1656,6 +1684,10 @@ class host_thread(threading.Thread): """ if self.test or not self.connectivity: return True + if remote_ip == 'localhost': + if self.localhost: + return # TODO: Cannot create a vxlan between localhost and localhost + remote_ip = self.local_ip try: command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \ ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \ @@ -1722,43 +1754,32 @@ class host_thread(threading.Thread): def get_file_info(self, path): command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path - self.logger.debug("command: " + command) - (_, stdout, _) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - return None # file does not exist - else: + try: + content = self.run_command(command) return content.split(" ") # (permission, 1, owner, group, size, date, file) + except RunCommandException as e: + return None # file does not exist def qemu_get_info(self, path): command = 'qemu-img info ' + path - self.logger.debug("command: " + command) - (_, stdout, stderr) = self.ssh_conn.exec_command(command) - content = stdout.read() - if len(content) == 0: - error = stderr.read() - self.logger.error("get_qemu_info error " + error) - raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error) - else: - try: - return yaml.load(content) - except yaml.YAMLError as exc: - text = "" - if hasattr(exc, 'problem_mark'): - mark = exc.problem_mark - text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) - self.logger.error("get_qemu_info yaml format Exception " + text) - raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text) + content = self.run_command(command) + try: + return yaml.load(content) + except yaml.YAMLError as exc: + text = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + text = " at position: (%s:%s)" % (mark.line+1, mark.column+1) + self.logger.error("get_qemu_info yaml format Exception " + text) + raise RunCommandException("Error getting qemu_info yaml format" + text) def qemu_change_backing(self, inc_file, new_backing_file): - command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file - self.logger.debug("command: " + command) - (_, _, stderr) = self.ssh_conn.exec_command(command) - content = stderr.read() - if len(content) == 0: + command = 'qemu-img rebase -u -b {} {}'.format(new_backing_file, inc_file) + try: + self.run_command(command) return 0 - else: - self.logger.error("qemu_change_backing error: " + content) + except RunCommandException as e: + self.logger.error("qemu_change_backing error: " + str(e)) return -1 def qemu_create_empty_disk(self, dev): @@ -1769,14 +1790,12 @@ class host_thread(threading.Thread): empty_disk_path = dev['source file'] - command = 'qemu-img create -f qcow2 ' + empty_disk_path + ' ' + str(dev['image_size']) + 'G' - self.logger.debug("command: " + command) - (_, _, stderr) = self.ssh_conn.exec_command(command) - content = stderr.read() - if len(content) == 0: + command = 'qemu-img create -f qcow2 {} {}G'.format(empty_disk_path, dev['image_size']) + try: + self.run_command(command) return 0 - else: - self.logger.error("qemu_create_empty_disk error: " + content) + except RunCommandException as e: + self.logger.error("qemu_create_empty_disk error: " + str(e)) return -1 def get_notused_filename(self, proposed_name, suffix=''): @@ -1820,12 +1839,8 @@ class host_thread(threading.Thread): def delete_file(self, file_name): - command = 'rm -f '+file_name - self.logger.debug("command: " + command) - (_, _, stderr) = self.ssh_conn.exec_command(command) - error_msg = stderr.read() - if len(error_msg) > 0: - raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg) + command = 'rm -f ' + file_name + self.run_command(command) def copy_file(self, source, destination, perserve_time=True): if source[0:4]=="http": @@ -1835,12 +1850,8 @@ class host_thread(threading.Thread): command = 'cp --no-preserve=mode' if perserve_time: command += ' --preserve=timestamps' - command += " '{}' '{}'".format(source, destination) - self.logger.debug("command: " + command) - (_, _, stderr) = self.ssh_conn.exec_command(command) - error_msg = stderr.read() - if len(error_msg) > 0: - raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg) + command += " '{}' '{}'".format(source, destination) + self.run_command(command) def copy_remote_file(self, remote_file, use_incremental): ''' Copy a file from the repository to local folder and recursively @@ -1996,14 +2007,10 @@ class host_thread(threading.Thread): #create incremental image if use_incremental_image: local_file_inc = self.get_notused_filename(local_file, '.inc') - command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file - self.logger.debug("command: " + command) - (_, _, stderr) = self.ssh_conn.exec_command(command) - error_msg = stderr.read() - if len(error_msg) > 0: - raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg) + command = 'qemu-img create -f qcow2 {} -o backing_file={}'.format(local_file_inc, local_file) + self.run_command(command) local_file = local_file_inc - qemu_info = {'file format':'qcow2'} + qemu_info = {'file format': 'qcow2'} server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']} diff --git a/osm_openvim/httpserver.py b/osm_openvim/httpserver.py index e95c820..4437d96 100644 --- a/osm_openvim/httpserver.py +++ b/osm_openvim/httpserver.py @@ -38,6 +38,7 @@ import datetime import hashlib import os import imp +import socket from netaddr import IPNetwork, IPAddress, all_matching_cidrs #import only if needed because not needed in test mode. To allow an easier installation import RADclass from jsonschema import validate as js_v, exceptions as js_e @@ -637,6 +638,11 @@ def http_post_hosts(): for numa in host.get("numas", ()): if "hugepages_consumed" in numa: del numa["hugepages_consumed"] + for core in numa.get("cores", ()): + if "instance_id" in core: + del core["instance_id"] + if "v_thread_id" in core: + del core["v_thread_id"] result, content = my.db.new_host(host) if result >= 0: if content['admin_state_up']: @@ -660,7 +666,7 @@ def http_post_hosts(): create_dhcp_ovs_bridge() config_dic['host_threads'][content['uuid']].insert_task("new-ovsbridge") # create vlxan bwt OVS controller and computes - create_vxlan_mesh(content['uuid']) + create_vxlan_mesh(content['uuid'], my.logger) # return host data change_keys_http2db(content, http2db_host, reverse=True) @@ -744,12 +750,12 @@ def delete_mac_dhcp(vm_ip, vlan, mac): dhcp_controller.delete_mac_dhcp_server(vm_ip, mac, vlan, dhcp_path) -def create_vxlan_mesh(host_id): +def create_vxlan_mesh(host_id, logger=None): """ Create vxlan mesh across all openvimc controller and computes. - :param host_id: host identifier - :param host_id: host identifier - :return: + :param host_id: Added compute node id. Anyway vlan is created by all compute nodes + :param logger: To log errors + :return: None """ dhcp_compute_name = get_vxlan_interface("dhcp") existing_hosts = get_hosts() @@ -761,22 +767,27 @@ def create_vxlan_mesh(host_id): dhcp_controller = http_controller.ovim.get_dhcp_controller() for compute in computes_available: + try: + if compute['ip_name'] != 'localhost': + remote_ip = socket.gethostbyname(compute['ip_name']) + else: + remote_ip = 'localhost' + except socket.error as e: + if logger: + logger.error("Cannot get compute node remote ip from '{}'. Skipping: {}".format( + compute['ip_name'], e)) + continue + # vxlan ovs_controller <=> compute node vxlan_interface_name = get_vxlan_interface(compute['id'][:8]) config_dic['host_threads'][compute['id']].insert_task("new-vxlan", dhcp_compute_name, dhcp_controller.host) - dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, compute['ip_name']) - - # vlxan mesh creation between openvim computes - for count, compute_owner in enumerate(computes_available): - for compute in computes_available: - if compute_owner['id'] == compute['id']: - pass - else: - vxlan_interface_name = get_vxlan_interface(compute_owner['id'][:8]) - dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, compute_owner['ip_name']) - config_dic['host_threads'][compute['id']].insert_task("new-vxlan", - vxlan_interface_name, - compute_owner['ip_name']) - + dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, remote_ip) + # vxlan from others compute node to cthis ompute node + for compute_src in computes_available: + if compute_src['id'] == compute['id']: + continue + config_dic['host_threads'][compute_src['id']].insert_task("new-vxlan", + vxlan_interface_name, + remote_ip) def delete_vxlan_mesh(host_id): """ @@ -847,7 +858,7 @@ def http_put_host_id(host_id): if config_dic['network_type'] == 'ovs': # create mesh with new host data config_dic['host_threads'][host_id].insert_task("new-ovsbridge") - create_vxlan_mesh(host_id) + create_vxlan_mesh(host_id, my.logger) #print data return format_out(data) diff --git a/osm_openvim/vim_schema.py b/osm_openvim/vim_schema.py index 4689a89..9440872 100644 --- a/osm_openvim/vim_schema.py +++ b/osm_openvim/vim_schema.py @@ -298,7 +298,9 @@ host_data_schema={ "properties": { "core_id": integer0_schema, "thread_id": integer0_schema, - "status": {"type": "string", "enum": ["noteligible"]} + "status": {"type": "string", "enum": ["noteligible"]}, + "instance_id": {"type": "string"}, # ignored, just for compatibility with host-list + "v_thread_id": {"type": "integer"} # ignored, just for compatibility with host-list }, "additionalProperties": False, "required": ["core_id", "thread_id"] @@ -344,7 +346,7 @@ host_data_schema={ } }, "additionalProperties": False, - "required": ["name", "ip_name"] + "required": ["name", "user", "ip_name"] } host_edit_schema={