import time
import Queue
import paramiko
-# import subprocess
+import subprocess
# import libvirt
import imp
import random
from jsonschema import validate as js_v, exceptions as js_e
from vim_schema import localinfo_schema, hostinfo_schema
+class RunCommandException(Exception):
+ pass
class host_thread(threading.Thread):
lvirt_module = None
def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
develop_bridge_iface, password=None, keyfile = None, logger_name=None, debug=None):
- '''Init a thread.
- Arguments:
- 'id' number of thead
- 'name' name of thread
- 'host','user': host ip or name to manage and user
- 'db', 'db_lock': database class and lock to use it in exclusion
- '''
+ """Init a thread to communicate with compute node or ovs_controller.
+ :param host_id: host identity
+ :param name: name of the thread
+ :param host: host ip or name to manage and user
+ :param user, password, keyfile: user and credentials to connect to host
+ :param db, db_lock': database class and lock to use it in exclusion
+ """
threading.Thread.__init__(self) = name = host
self.db_lock = db_lock
self.test = test
self.password = password
- self.keyfile = keyfile
+ self.keyfile = keyfile
self.localinfo_dirty = False
if not test and not host_thread.lvirt_module:
self.version = version
self.xml_level = 0
- #self.pending ={}
+ # self.pending ={}
self.server_status = {} #dictionary with pairs server_uuid:server_status
self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
self.queueLock = threading.Lock()
self.taskQueue = Queue.Queue(2000)
self.ssh_conn = None
- self.connectivity = True
+ self.run_command_session = None
+ self.error = None
+ self.localhost = True if host == 'localhost' else False
self.lvirt_conn_uri = "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format(
if keyfile:
self.lvirt_conn_uri += "&keyfile=" + keyfile
+ self.remote_ip = None
+ self.local_ip = None
+ def run_command(self, command, keep_session=False):
+ """Run a command passed as a str on a localhost or at remote machine.
+ :param command: text with the command to execute.
+ :param keep_session: if True it returns a <stdin> for sending input with '<stdin>.write("text\n")'.
+ A command with keep_session=True MUST be followed by a command with keep_session=False in order to
+ close the session and get the output
+ :return: the output of the command if 'keep_session=False' or the <stdin> object if 'keep_session=True'
+ :raises: RunCommandException if command fails
+ """
+ if self.run_command_session and keep_session:
+ raise RunCommandException("Internal error. A command with keep_session=True must be followed by another "
+ "command with keep_session=False to close session")
+ try:
+ if self.localhost:
+ if self.run_command_session:
+ p = self.run_command_session
+ self.run_command_session = None
+ (output, outerror) = p.communicate()
+ returncode = p.returncode
+ p.stdin.close()
+ elif keep_session:
+ p = subprocess.Popen(('bash', "-c", command), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.run_command_session = p
+ return p.stdin
+ else:
+ output = subprocess.check_output(('bash', "-c", command))
+ returncode = 0
+ else:
+ if self.run_command_session:
+ (i, o, e) = self.run_command_session
+ self.run_command_session = None
+ else:
+ if not self.ssh_conn:
+ self.ssh_connect()
+ (i, o, e) = self.ssh_conn.exec_command(command, timeout=10)
+ if keep_session:
+ self.run_command_session = (i, o, e)
+ return i
+ returncode =
+ output =
+ outerror =
+ if returncode != 0:
+ text = "run_command='{}' Error='{}'".format(command, outerror)
+ self.logger.error(text)
+ raise RunCommandException(text)
+ self.logger.debug("run_command='{}' result='{}'".format(command, output))
+ return output
+ except RunCommandException:
+ raise
+ except subprocess.CalledProcessError as e:
+ text = "run_command Exception '{}' '{}'".format(str(e), e.output)
+ except (paramiko.ssh_exception.SSHException, Exception) as e:
+ text = "run_command='{}' Exception='{}'".format(command, str(e))
+ self.ssh_conn = None
+ self.run_command_session = None
+ raise RunCommandException(text)
def ssh_connect(self):
self.ssh_conn.connect(, username=self.user, password=self.password, key_filename=self.keyfile,
- timeout=10) #, None)
- except paramiko.ssh_exception.SSHException as e:
- text = e.args[0]
- self.logger.error("ssh_connect ssh Exception: " + text)
+ timeout=10) # auth_timeout=10)
+ self.remote_ip = self.ssh_conn.get_transport().sock.getpeername()[0]
+ self.local_ip = self.ssh_conn.get_transport().sock.getsockname()[0]
+ except (paramiko.ssh_exception.SSHException, Exception) as e:
+ text = 'ssh connect Exception: {}'.format(e)
+ self.ssh_conn = None
+ self.error = text
+ raise
def check_connectivity(self):
if not self.test:
+ # TODO change to run_command
if not self.ssh_conn:
def load_localinfo(self):
if not self.test:
- # Connect SSH
- self.ssh_connect()
- command = 'mkdir -p ' + self.image_path
- # print, ': command:', command
- (_, stdout, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) > 0:
- self.logger.error("command: '%s' stderr: '%s'", command, content)
- command = 'cat ' + self.image_path + '/.openvim.yaml'
- # print, ': command:', command
- (_, stdout, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- self.logger.error("command: '%s' stderr='%s'", command,
- raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
- self.localinfo = yaml.load(content)
+ self.run_command('sudo mkdir -p ' + self.image_path)
+ result = self.run_command('cat {}/.openvim.yaml'.format(self.image_path))
+ self.localinfo = yaml.load(result)
js_v(self.localinfo, localinfo_schema)
self.localinfo_dirty = False
if 'server_files' not in self.localinfo:
self.localinfo['server_files'] = {}
- self.logger.debug("localinfo load from host")
+ self.logger.debug("localinfo loaded from host")
- except paramiko.ssh_exception.SSHException as e:
- text = e.args[0]
- self.logger.error("load_localinfo ssh Exception: " + text)
+ except RunCommandException as e:
+ self.logger.error("load_localinfo Exception: " + str(e))
except host_thread.lvirt_module.libvirtError as e:
text = e.get_error_message()
self.logger.error("load_localinfo libvirt Exception: " + text)
text = str(e)
self.logger.error("load_localinfo Exception: " + text)
- #not loaded, insert a default data and force saving by activating dirty flag
+ # not loaded, insert a default data and force saving by activating dirty flag
self.localinfo = {'files':{}, 'server_files':{} }
- #self.localinfo_dirty=True
+ # self.localinfo_dirty=True
def load_hostinfo(self):
if self.test:
- return;
+ return
- #Connect SSH
- self.ssh_connect()
- command = 'cat ' + self.image_path + '/hostinfo.yaml'
- #print, ': command:', command
- (_, stdout, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- self.logger.error("command: '%s' stderr: '%s'", command,
- raise paramiko.ssh_exception.SSHException("Error empty file ")
- self.hostinfo = yaml.load(content)
+ result = self.run_command('cat {}/hostinfo.yaml'.format(self.image_path))
+ self.hostinfo = yaml.load(result)
js_v(self.hostinfo, hostinfo_schema)
- self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
+ self.logger.debug("hostinfo load from host " + str(self.hostinfo))
- except paramiko.ssh_exception.SSHException as e:
- text = e.args[0]
- self.logger.error("load_hostinfo ssh Exception: " + text)
+ except RunCommandException as e:
+ self.logger.error("load_hostinfo ssh Exception: " + str(e))
except host_thread.lvirt_module.libvirtError as e:
text = e.get_error_message()
self.logger.error("load_hostinfo libvirt Exception: " + text)
except js_e.ValidationError as e:
text = ""
if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
- self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
+ self.logger.error("load_hostinfo format Exception: %s %s", text, str(e))
except Exception as e:
text = str(e)
self.logger.error("load_hostinfo Exception: " + text)
- command = 'cat > ' + self.image_path + '/.openvim.yaml'
- self.logger.debug("command:" + command)
- (stdin, _, _) = self.ssh_conn.exec_command(command)
- yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
+ command = 'cat > {}/.openvim.yaml'.format(self.image_path)
+ in_stream = self.run_command(command, keep_session=True)
+ yaml.safe_dump(self.localinfo, in_stream, explicit_start=True, indent=4, default_flow_style=False,
+ tags=False, encoding='utf-8', allow_unicode=True)
+ result = self.run_command(command, keep_session=False) # to end session
self.localinfo_dirty = False
break #while tries
- except paramiko.ssh_exception.SSHException as e:
- text = e.args[0]
- self.logger.error("save_localinfo ssh Exception: " + text)
- if "SSH session not active" in text:
- self.ssh_connect()
+ except RunCommandException as e:
+ self.logger.error("save_localinfo ssh Exception: " + str(e))
except host_thread.lvirt_module.libvirtError as e:
text = e.get_error_message()
self.logger.error("save_localinfo libvirt Exception: " + text)
self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
self.delete_file(localfile['source file'])
- except paramiko.ssh_exception.SSHException as e:
+ except RunCommandException as e:
self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
del self.localinfo['server_files'][uuid]
self.localinfo_dirty = True
- command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
- self.logger.debug("command: " + command)
- (_, stdout, _) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- return True
- else:
- return False
- except paramiko.ssh_exception.SSHException as e:
+ self.run_command('sudo', 'ovs-vsctl', '--may-exist', 'add-br', 'br-int', '--', 'set', 'Bridge', 'br-int',
+ 'stp_enable=true')
+ return True
+ except RunCommandException as e:
self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
if "SSH session not active" in str(e):
port_name = 'ovim-' + str(vlan)
command = 'sudo ovs-vsctl del-port br-int ' + port_name
- self.logger.debug("command: " + command)
- (_, stdout, _) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- return True
- else:
- return False
- except paramiko.ssh_exception.SSHException as e:
+ self.run_command(command)
+ return True
+ except RunCommandException as e:
self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
- if "SSH session not active" in str(e):
- self.ssh_connect()
return False
def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
if self.test or not self.connectivity:
return True
+ if remote_ip == 'localhost':
+ if self.localhost:
+ return # TODO: Cannot create a vxlan between localhost and localhost
+ remote_ip = self.local_ip
command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
def get_file_info(self, path):
command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
- self.logger.debug("command: " + command)
- (_, stdout, _) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- return None # file does not exist
- else:
+ try:
+ content = self.run_command(command)
return content.split(" ") # (permission, 1, owner, group, size, date, file)
+ except RunCommandException as e:
+ return None # file does not exist
def qemu_get_info(self, path):
command = 'qemu-img info ' + path
- self.logger.debug("command: " + command)
- (_, stdout, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
- error =
- self.logger.error("get_qemu_info error " + error)
- raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
- else:
- try:
- return yaml.load(content)
- except yaml.YAMLError as exc:
- text = ""
- if hasattr(exc, 'problem_mark'):
- mark = exc.problem_mark
- text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
- self.logger.error("get_qemu_info yaml format Exception " + text)
- raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
+ content = self.run_command(command)
+ try:
+ return yaml.load(content)
+ except yaml.YAMLError as exc:
+ text = ""
+ if hasattr(exc, 'problem_mark'):
+ mark = exc.problem_mark
+ text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
+ self.logger.error("get_qemu_info yaml format Exception " + text)
+ raise RunCommandException("Error getting qemu_info yaml format" + text)
def qemu_change_backing(self, inc_file, new_backing_file):
- command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
- self.logger.debug("command: " + command)
- (_, _, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
+ command = 'qemu-img rebase -u -b {} {}'.format(new_backing_file, inc_file)
+ try:
+ self.run_command(command)
return 0
- else:
- self.logger.error("qemu_change_backing error: " + content)
+ except RunCommandException as e:
+ self.logger.error("qemu_change_backing error: " + str(e))
return -1
def qemu_create_empty_disk(self, dev):
empty_disk_path = dev['source file']
- command = 'qemu-img create -f qcow2 ' + empty_disk_path + ' ' + str(dev['image_size']) + 'G'
- self.logger.debug("command: " + command)
- (_, _, stderr) = self.ssh_conn.exec_command(command)
- content =
- if len(content) == 0:
+ command = 'qemu-img create -f qcow2 {} {}G'.format(empty_disk_path, dev['image_size'])
+ try:
+ self.run_command(command)
return 0
- else:
- self.logger.error("qemu_create_empty_disk error: " + content)
+ except RunCommandException as e:
+ self.logger.error("qemu_create_empty_disk error: " + str(e))
return -1
def get_notused_filename(self, proposed_name, suffix=''):
def delete_file(self, file_name):
- command = 'rm -f '+file_name
- self.logger.debug("command: " + command)
- (_, _, stderr) = self.ssh_conn.exec_command(command)
- error_msg =
- if len(error_msg) > 0:
- raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
+ command = 'rm -f ' + file_name
+ self.run_command(command)
def copy_file(self, source, destination, perserve_time=True):
if source[0:4]=="http":
command = 'cp --no-preserve=mode'
if perserve_time:
command += ' --preserve=timestamps'
- command += " '{}' '{}'".format(source, destination)
- self.logger.debug("command: " + command)
- (_, _, stderr) = self.ssh_conn.exec_command(command)
- error_msg =
- if len(error_msg) > 0:
- raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
+ command += " '{}' '{}'".format(source, destination)
+ self.run_command(command)
def copy_remote_file(self, remote_file, use_incremental):
''' Copy a file from the repository to local folder and recursively
#create incremental image
if use_incremental_image:
local_file_inc = self.get_notused_filename(local_file, '.inc')
- command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
- self.logger.debug("command: " + command)
- (_, _, stderr) = self.ssh_conn.exec_command(command)
- error_msg =
- if len(error_msg) > 0:
- raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
+ command = 'qemu-img create -f qcow2 {} -o backing_file={}'.format(local_file_inc, local_file)
+ self.run_command(command)
local_file = local_file_inc
- qemu_info = {'file format':'qcow2'}
+ qemu_info = {'file format': 'qcow2'}
server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
import hashlib
import os
import imp
+import socket
from netaddr import IPNetwork, IPAddress, all_matching_cidrs
#import only if needed because not needed in test mode. To allow an easier installation import RADclass
from jsonschema import validate as js_v, exceptions as js_e
for numa in host.get("numas", ()):
if "hugepages_consumed" in numa:
del numa["hugepages_consumed"]
+ for core in numa.get("cores", ()):
+ if "instance_id" in core:
+ del core["instance_id"]
+ if "v_thread_id" in core:
+ del core["v_thread_id"]
result, content = my.db.new_host(host)
if result >= 0:
if content['admin_state_up']:
# create vlxan bwt OVS controller and computes
- create_vxlan_mesh(content['uuid'])
+ create_vxlan_mesh(content['uuid'], my.logger)
# return host data
change_keys_http2db(content, http2db_host, reverse=True)
dhcp_controller.delete_mac_dhcp_server(vm_ip, mac, vlan, dhcp_path)
-def create_vxlan_mesh(host_id):
+def create_vxlan_mesh(host_id, logger=None):
Create vxlan mesh across all openvimc controller and computes.
- :param host_id: host identifier
- :param host_id: host identifier
- :return:
+ :param host_id: Added compute node id. Anyway vlan is created by all compute nodes
+ :param logger: To log errors
+ :return: None
dhcp_compute_name = get_vxlan_interface("dhcp")
existing_hosts = get_hosts()
dhcp_controller = http_controller.ovim.get_dhcp_controller()
for compute in computes_available:
+ try:
+ if compute['ip_name'] != 'localhost':
+ remote_ip = socket.gethostbyname(compute['ip_name'])
+ else:
+ remote_ip = 'localhost'
+ except socket.error as e:
+ if logger:
+ logger.error("Cannot get compute node remote ip from '{}'. Skipping: {}".format(
+ compute['ip_name'], e))
+ continue
+ # vxlan ovs_controller <=> compute node
vxlan_interface_name = get_vxlan_interface(compute['id'][:8])
config_dic['host_threads'][compute['id']].insert_task("new-vxlan", dhcp_compute_name,
- dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, compute['ip_name'])
- # vlxan mesh creation between openvim computes
- for count, compute_owner in enumerate(computes_available):
- for compute in computes_available:
- if compute_owner['id'] == compute['id']:
- pass
- else:
- vxlan_interface_name = get_vxlan_interface(compute_owner['id'][:8])
- dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, compute_owner['ip_name'])
- config_dic['host_threads'][compute['id']].insert_task("new-vxlan",
- vxlan_interface_name,
- compute_owner['ip_name'])
+ dhcp_controller.create_ovs_vxlan_tunnel(vxlan_interface_name, remote_ip)
+ # vxlan from others compute node to cthis ompute node
+ for compute_src in computes_available:
+ if compute_src['id'] == compute['id']:
+ continue
+ config_dic['host_threads'][compute_src['id']].insert_task("new-vxlan",
+ vxlan_interface_name,
+ remote_ip)
def delete_vxlan_mesh(host_id):
if config_dic['network_type'] == 'ovs':
# create mesh with new host data
- create_vxlan_mesh(host_id)
+ create_vxlan_mesh(host_id, my.logger)
#print data
return format_out(data)