X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=vim_thread.py;h=780d6e5d0f6c9c285d28139a87fc4a0618243c9f;hb=95a1ae548b039be307efe8956dbc464081d9e445;hp=5460523b0b7e5e412230ba1a95d638e7f608d0d9;hpb=42026a022e4ff54d4670ea65b821838a70243771;p=osm%2FRO.git diff --git a/vim_thread.py b/vim_thread.py index 5460523b..780d6e5d 100644 --- a/vim_thread.py +++ b/vim_thread.py @@ -33,97 +33,215 @@ import time import Queue import logging import vimconn - +from db_base import db_base_Exception # from logging import Logger # import auxiliary_functions as af -# TODO: insert a logging system + +def is_task_id(id): + return True if id[:5] == "TASK." else False class vim_thread(threading.Thread): - def __init__(self, vimconn, name=None): - '''Init a thread. + def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None, db=None, db_lock=None): + """Init a thread. Arguments: 'id' number of thead 'name' name of thread 'host','user': host ip or name to manage and user 'db', 'db_lock': database class and lock to use it in exclusion - ''' + """ + self.tasksResult = {} + """ It will contain a dictionary with + task_id: + status: enqueued,done,error,deleted,processing + result: VIM result, + """ threading.Thread.__init__(self) self.vim = vimconn + self.datacenter_name = datacenter_name + self.datacenter_tenant_id = datacenter_tenant_id if not name: - self.name = vimconn["id"] + "-" + vimconn["config"]["datacenter_tenant_id"] + self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"] else: self.name = name self.logger = logging.getLogger('openmano.vim.'+self.name) + self.db = db + self.db_lock = db_lock - self.queueLock = threading.Lock() - self.taskQueue = Queue.Queue(2000) + self.task_lock = task_lock + self.task_queue = Queue.Queue(2000) - - def insert_task(self, task, *aditional): + def insert_task(self, task): try: - self.queueLock.acquire() - task = self.taskQueue.put( (task,) + aditional, timeout=5) - self.queueLock.release() - return 1, None + self.task_queue.put(task, False) + return task["id"] except Queue.Full: - return -1, "timeout inserting a task over host " + self.name + raise vimconn.vimconnException(self.name + ": timeout inserting a task") + + def del_task(self, task): + with self.task_lock: + if task["status"] == "enqueued": + task["status"] == "deleted" + return True + else: # task["status"] == "processing" + self.task_lock.release() + return False def run(self): self.logger.debug("Starting") while True: #TODO reload service while True: - self.queueLock.acquire() - if not self.taskQueue.empty(): - task = self.taskQueue.get() + if not self.task_queue.empty(): + task = self.task_queue.get() + self.task_lock.acquire() + if task["status"] == "deleted": + self.task_lock.release() + continue + task["status"] == "processing" + self.task_lock.release() else: - task = None - self.queueLock.release() - - if task is None: now=time.time() time.sleep(1) - continue - - if task[0] == 'instance': - pass - elif task[0] == 'image': - pass - elif task[0] == 'exit': - print self.name, ": processing task exit" - self.terminate() + continue + self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"], + str(task["params"]))) + if task["name"] == 'exit' or task["name"] == 'reload': + result, content = self.terminate(task) + elif task["name"] == 'new-vm': + result, content = self.new_vm(task) + elif task["name"] == 'del-vm': + result, content = self.del_vm(task) + elif task["name"] == 'new-net': + result, content = self.new_net(task) + elif task["name"] == 'del-net': + result, content = self.del_net(task) + else: + error_text = "unknown task {}".format(task["name"]) + self.logger.error(error_text) + result = False + content = error_text + + with self.task_lock: + task["status"] = "done" if result else "error" + task["result"] = content + self.task_queue.task_done() + + if task["name"] == 'exit': return 0 - elif task[0] == 'reload': - print self.name, ": processing task reload terminating and relaunching" - self.terminate() + elif task["name"] == 'reload': break - elif task[0] == 'edit-iface': - pass - elif task[0] == 'restore-iface': - pass - elif task[0] == 'new-ovsbridge': - pass - elif task[0] == 'new-vxlan': - pass - elif task[0] == 'del-ovsbridge': - pass - elif task[0] == 'del-vxlan': - pass - elif task[0] == 'create-ovs-bridge-port': - pass - elif task[0] == 'del-ovs-port': - pass - else: - self.logger.error("unknown task %s", str(task)) self.logger.debug("Finishing") - def terminate(self): - pass + def terminate(self, task): + return True, None + + def _format_vim_error_msg(self, error_text): + if len(error_text) >= 1024: + return error_text[:516] + " ... " + error_text[-500:] + return error_text + + def new_net(self, task): + try: + task_id = task["id"] + params = task["params"] + net_id = self.vim.new_network(*params) + try: + with self.db_lock: + self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id}, WHERE={"vim_net_id": task_id}) + except db_base_Exception as e: + self.logger.error("Error updating database %s", str(e)) + return True, net_id + except vimconn.vimconnException as e: + self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e)) + try: + with self.db_lock: + self.db.update_rows("instance_nets", + UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"}, + WHERE={"vim_net_id": task_id}) + except db_base_Exception as e: + self.logger.error("Error updating database %s", str(e)) + return False, str(e) + + def new_vm(self, task): + try: + params = task["params"] + task_id = task["id"] + depends = task.get("depends") + net_list = params[5] + for net in net_list: + if is_task_id(net["net_id"]): # change task_id into network_id + try: + task_net = depends[net["net_id"]] + with self.task_lock: + if task_net["status"] == "error": + return False, "Cannot create VM because depends on a network that cannot be created: " + \ + str(task_net["result"]) + elif task_net["status"] == "enqueued" or task_net["status"] == "processing": + return False, "Cannot create VM because depends on a network still not created" + network_id = task_net["result"] + net["net_id"] = network_id + except Exception as e: + return False, "Error trying to map from task_id={} to task result: {}".format(net["net_id"], + str(e)) + vm_id = self.vim.new_vminstance(*params) + try: + with self.db_lock: + self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id}) + except db_base_Exception as e: + self.logger.error("Error updating database %s", str(e)) + return True, vm_id + except vimconn.vimconnException as e: + self.logger.error("Error creating VM, task=%s: %s", str(task_id), str(e)) + try: + with self.db_lock: + self.db.update_rows("instance_vms", + UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"}, + WHERE={"vim_vm_id": task_id}) + except db_base_Exception as e: + self.logger.error("Error updating database %s", str(e)) + return False, str(e) + + def del_vm(self, task): + vm_id = task["params"] + if is_task_id(vm_id): + try: + task_create = task["depends"][vm_id] + with self.task_lock: + if task_create["status"] == "error": + return True, "VM was not created. It has error: " + str(task_create["result"]) + elif task_create["status"] == "enqueued" or task_create["status"] == "processing": + return False, "Cannot delete VM because still creating" + vm_id = task_create["result"] + except Exception as e: + return False, "Error trying to get task_id='{}':".format(vm_id, str(e)) + try: + return True, self.vim.delete_vminstance(vm_id) + except vimconn.vimconnException as e: + return False, str(e) + + def del_net(self, task): + net_id = task["params"] + if is_task_id(net_id): + try: + task_create = task["depends"][net_id] + with self.task_lock: + if task_create["status"] == "error": + return True, "net was not created. It has error: " + str(task_create["result"]) + elif task_create["status"] == "enqueued" or task_create["status"] == "processing": + return False, "Cannot delete net because still creating" + net_id = task_create["result"] + except Exception as e: + return False, "Error trying to get task_id='{}':".format(net_id, str(e)) + try: + return True, self.vim.delete_network(net_id) + except vimconn.vimconnException as e: + return False, str(e) +