From 56d877dad3d7ce6a36e233a77d1ee7dc8295e233 Mon Sep 17 00:00:00 2001 From: tierno Date: Mon, 15 Jan 2018 13:59:05 +0100 Subject: [PATCH] vim thread logging enhancement Change-Id: I2d6c3dd819e5fd45b37cea6828df53a038409f11 Signed-off-by: tierno --- openmanod | 2 +- osm_ro/nfvo.py | 2 +- osm_ro/vim_thread.py | 215 +++++++++++++++++++++++-------------------- 3 files changed, 118 insertions(+), 101 deletions(-) diff --git a/openmanod b/openmanod index b9858b47..5bff6513 100755 --- a/openmanod +++ b/openmanod @@ -48,7 +48,7 @@ import osm_ro __author__ = "Alfonso Tierno, Gerardo Garcia, Pablo Montes" __date__ = "$26-aug-2014 11:09:29$" -__version__ = "0.5.47-r557" +__version__ = "0.5.48-r558" version_date = "Jan 2018" database_version = 27 # expected database schema version diff --git a/osm_ro/nfvo.py b/osm_ro/nfvo.py index a6d3ba3e..0a357816 100644 --- a/osm_ro/nfvo.py +++ b/osm_ro/nfvo.py @@ -3761,7 +3761,7 @@ def new_datacenter(mydb, datacenter_descriptor): except (IOError, ImportError): # if module_info and module_info[0]: # file.close(module_info[0]) - raise NfvoException("Incorrect datacenter type '{}'. Plugin '{}'.py not installed".format(datacenter_type, module), HTTP_Bad_Request) + raise NfvoException("Incorrect datacenter type '{}'. Plugin '{}.py' not installed".format(datacenter_type, module), HTTP_Bad_Request) datacenter_id = mydb.new_row("datacenters", datacenter_descriptor, add_uuid=True, confidential_data=True) return datacenter_id diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index e401502f..ce0fc7f5 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -134,61 +134,73 @@ class vim_thread(threading.Thread): Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions :return: None """ - action_completed = False - task_list = [] - old_action_key = None - - old_item_id = "" - old_item = "" - old_created_at = 0.0 - database_limit = 200 - while True: - # get 200 (database_limit) entries each time - with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, - "item_id>=": old_item_id}, - ORDER_BY=("item_id", "item", "created_at",), - LIMIT=database_limit) - for task in vim_actions: - item = task["item"] - item_id = task["item_id"] - - # skip the first entries that are already processed in the previous pool of 200 - if old_item_id: - if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at: - old_item_id = False # next one will be a new un-processed task - continue + try: + action_completed = False + task_list = [] + old_action_key = None + + old_item_id = "" + old_item = "" + old_created_at = 0.0 + database_limit = 200 + while True: + # get 200 (database_limit) entries each time + with self.db_lock: + vim_actions = self.db.get_rows(FROM="vim_actions", + WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + "item_id>=": old_item_id}, + ORDER_BY=("item_id", "item", "created_at",), + LIMIT=database_limit) + for task in vim_actions: + item = task["item"] + item_id = task["item_id"] + + # skip the first entries that are already processed in the previous pool of 200 + if old_item_id: + if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at: + old_item_id = False # next one will be a new un-processed task + continue - action_key = item + item_id - if old_action_key != action_key: - if not action_completed and task_list: - # This will fill needed task parameters into memory, and insert the task if needed in - # self.pending_tasks or self.refresh_tasks - self._insert_pending_tasks(task_list) - task_list = [] - old_action_key = action_key - action_completed = False - elif action_completed: - continue + action_key = item + item_id + if old_action_key != action_key: + if not action_completed and task_list: + # This will fill needed task parameters into memory, and insert the task if needed in + # self.pending_tasks or self.refresh_tasks + try: + self._insert_pending_tasks(task_list) + except Exception as e: + self.logger.critical( + "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e), + exc_info=True) + task_list = [] + old_action_key = action_key + action_completed = False + elif action_completed: + continue - if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": - task_list.append(task) - elif task["action"] == "DELETE": - # action completed because deleted and status is not SCHEDULED. Not needed anything - action_completed = True - if len(vim_actions) == database_limit: - # update variables for get the next database iteration - old_item_id = item_id - old_item = item - old_created_at = task["created_at"] - else: - break - # Last actions group need to be inserted too - if not action_completed and task_list: - self._insert_pending_tasks(task_list) - self.logger.debug("reloaded vim actions pending:{} refresh:{}".format( - len(self.pending_tasks), len(self.refresh_tasks))) + if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": + task_list.append(task) + elif task["action"] == "DELETE": + # action completed because deleted and status is not SCHEDULED. Not needed anything + action_completed = True + if len(vim_actions) == database_limit: + # update variables for get the next database iteration + old_item_id = item_id + old_item = item + old_created_at = task["created_at"] + else: + break + # Last actions group need to be inserted too + if not action_completed and task_list: + try: + self._insert_pending_tasks(task_list) + except Exception as e: + self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e), + exc_info=True) + self.logger.debug("reloaded vim actions pending:{} refresh:{}".format( + len(self.pending_tasks), len(self.refresh_tasks))) + except Exception as e: + self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True) def _refres_elements(self): """Call VIM to get VMs and networks status until 10 elements""" @@ -217,8 +229,8 @@ class vim_thread(threading.Thread): net_to_refresh_list.append(task["vim_id"]) net_to_refresh_dict[task["vim_id"]] = task else: - error_text = "unknown task {}".format(task["item"]) - self.logger.error(error_text) + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True) items_to_refresh += 1 if items_to_refresh == 10: break @@ -229,7 +241,7 @@ class vim_thread(threading.Thread): vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list) except vimconn.vimconnException as e: # Mark all tasks at VIM_ERROR status - self.logger.error("vimconnException Exception when trying to refresh vms " + str(e)) + self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e)) vim_dict = {} for vim_id in vm_to_refresh_list: vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -238,15 +250,16 @@ class vim_thread(threading.Thread): # look for task task_need_update = False task = vm_to_refresh_dict[vim_id] - self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info)) + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info)) # check and update interfaces task_warning_msg = "" for interface in vim_info.get("interfaces", ()): vim_interface_id = interface["vim_interface_id"] if vim_interface_id not in task["extra"]["interfaces"]: - self.logger.critical("Interface not found {} on task info {}".format( - vim_interface_id, task["extra"]["interfaces"]), exc_info=True) + self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format( + task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True) continue task_interface = task["extra"]["interfaces"][vim_interface_id] task_vim_interface = task["vim_interfaces"].get(vim_interface_id) @@ -259,9 +272,9 @@ class vim_thread(threading.Thread): task_interface["sdn_port_id"] = None task_need_update = True except ovimException as e: - error_text =" ovimException deleting external_port={} {}".format( + error_text = "ovimException deleting external_port={}: {}".format( task_interface["sdn_port_id"], e) - self.logger.error(error_text, exc_info=True) + self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) task_warning_msg += error_text # TODO Set error_msg at instance_nets instead of instance VMs @@ -283,12 +296,12 @@ class vim_thread(threading.Thread): task_interface["sdn_port_id"] = sdn_port_id task_need_update = True except (ovimException, Exception) as e: - error_text = "ovimException creating new_external_port compute_node={} "\ - "pci={} vlan={} {}".format( + error_text = "ovimException creating new_external_port compute_node={}"\ + " pci={} vlan={} {}".format( interface["compute_node"], interface["pci"], interface.get("vlan"), e) - self.logger.error(error_text, exc_info=True) + self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) task_warning_msg += error_text # TODO Set error_msg at instance_nets instead of instance VMs @@ -301,8 +314,7 @@ class vim_thread(threading.Thread): "sdn_port_id": task_interface.get("sdn_port_id"), "compute_node": interface.get("compute_node"), "pci": interface.get("pci"), - "vlan": interface.get("vlan"), - }, + "vlan": interface.get("vlan")}, WHERE={'uuid': task_interface["iface_id"]}) task["vim_interfaces"][vim_interface_id] = interface @@ -347,7 +359,7 @@ class vim_thread(threading.Thread): vim_dict = self.vim.refresh_nets_status(net_to_refresh_list) except vimconn.vimconnException as e: # Mark all tasks at VIM_ERROR status - self.logger.error("vimconnException Exception when trying to refresh nets " + str(e)) + self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e)) vim_dict = {} for vim_id in net_to_refresh_list: vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -355,7 +367,8 @@ class vim_thread(threading.Thread): for vim_id, vim_info in vim_dict.items(): # look for task task = net_to_refresh_dict[vim_id] - self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info)) + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info)) task_vim_info = task.get("vim_info") task_vim_status = task["extra"].get("vim_status") @@ -368,8 +381,8 @@ class vim_thread(threading.Thread): with self.db_lock: sdn_net = self.ovim.show_network(task_sdn_net_id) except (ovimException, Exception) as e: - text_error = "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id) - self.logger.error(text_error, exc_info = True) + text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e) + self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True) sdn_net = {"status": "ERROR", "error_msg": text_error} if sdn_net["status"] == "ERROR": if not vim_info.get("error_msg"): @@ -402,7 +415,7 @@ class vim_thread(threading.Thread): UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), "error_msg": task.get("error_msg"), "modified_at": now}, WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) + 'task_index': task['task_index']}) if task["extra"].get("vim_status") == "BUILD": self._insert_refresh(task, now + self.REFRESH_BUILD) else: @@ -428,7 +441,7 @@ class vim_thread(threading.Thread): else: index = len(self.refresh_tasks) self.refresh_tasks.append(task) - self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format( + self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format( task_id, task_name, task["modified_at"], index)) def _remove_refresh(self, task_name, vim_id): @@ -444,7 +457,7 @@ class vim_thread(threading.Thread): break else: return False - if index_to_delete != None: + if not index_to_delete: del self.refresh_tasks[index_to_delete] return True @@ -495,7 +508,7 @@ class vim_thread(threading.Thread): result = True database_update = None elif not self.vim: - task["status"] == "ERROR" + task["status"] = "ERROR" task["error_msg"] = self.error_status result = False database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} @@ -524,7 +537,11 @@ class vim_thread(threading.Thread): result = False task["error_msg"] = str(e) task["status"] = "FAILED" - database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]} + database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} + if task["item"] == 'instance_vms': + database_update["vim_vm_id"] = None + elif task["item"] == 'instance_nets': + database_update["vim_net_id"] = None if task["action"] == "DELETE": action_key = task["item"] + task["item_id"] @@ -532,10 +549,10 @@ class vim_thread(threading.Thread): elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"): self._insert_refresh(task) - self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format( - task["instance_action_id"], task["task_index"], task["item"], task["action"], - task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), - task["params"])) + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format( + task_id, task["item"], task["action"], task["status"], + task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"])) try: now = time.time() with self.db_lock: @@ -556,14 +573,13 @@ class vim_thread(threading.Thread): UPDATE=database_update, WHERE={"uuid": task["item_id"]}) except db_base_Exception as e: - self.logger.error("Error updating database %s", str(e), exc_info=True) + self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True) if nb_created == 10: break return nb_processed def _insert_pending_tasks(self, vim_actions_list): - now = time.time() for task in vim_actions_list: if task["datacenter_vim_id"] != self.datacenter_tenant_id: continue @@ -640,7 +656,7 @@ class vim_thread(threading.Thread): def del_task(self, task): with self.task_lock: if task["status"] == "SCHEDULED": - task["status"] == "SUPERSEDED" + task["status"] = "SUPERSEDED" return True else: # task["status"] == "processing" self.task_lock.release() @@ -651,6 +667,7 @@ class vim_thread(threading.Thread): while True: self._reload_vim_actions() reload_thread = False + while True: try: while not self.task_queue.empty(): @@ -676,14 +693,11 @@ class vim_thread(threading.Thread): self.logger.debug("Finishing") - def terminate(self, task): - return True, None - def _look_for_task(self, instance_action_id, task_id): task_index = task_id.split("-")[-1] with self.db_lock: tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id, - "task_index": task_index}) + "task_index": task_index}) if not tasks: return None task = tasks[0] @@ -699,18 +713,18 @@ class vim_thread(threading.Thread): task["extra"] = {} return task - def _format_vim_error_msg(self, error_text, max_length=1024): + @staticmethod + def _format_vim_error_msg(error_text, max_length=1024): if error_text and len(error_text) >= max_length: return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:] return error_text def new_vm(self, task): + task_id = task["instance_action_id"] + "." + str(task["task_index"]) try: params = task["params"] - task_id = task["instance_action_id"] + "." + str(task["task_index"]) depends = task.get("depends") net_list = params[5] - error_text = "" for net in net_list: if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id task_dependency = task["depends"].get(net["net_id"]) @@ -733,14 +747,15 @@ class vim_thread(threading.Thread): for iface in net_list: task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]} with self.db_lock: - result = self.db.get_rows(SELECT=('sdn_net_id',), + result = self.db.get_rows( + SELECT=('sdn_net_id',), FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', WHERE={'ii.uuid': iface["uuid"]}) if result: task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id'] else: - self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id, - iface["uuid"], exc_info=True) + self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id, + iface["uuid"]), exc_info=True) task["vim_info"] = {} task["vim_interfaces"] = {} @@ -754,7 +769,7 @@ class vim_thread(threading.Thread): return True, instance_element_update except (vimconn.vimconnException, VimThreadException) as e: - self.logger.error("Error creating VM, task=%s: %s", task_id, str(e)) + self.logger.error("task={} new-VM: {}".format(task_id, e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text task["status"] = "FAILED" @@ -763,6 +778,7 @@ class vim_thread(threading.Thread): return False, instance_element_update def del_vm(self, task): + task_id = task["instance_action_id"] + "." + str(task["task_index"]) vm_vim_id = task["vim_id"] interfaces = task["extra"].get("interfaces", ()) try: @@ -772,8 +788,8 @@ class vim_thread(threading.Thread): with self.db_lock: self.ovim.delete_port(iface["sdn_port_id"]) except ovimException as e: - self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format( - iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True) + self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format( + task_id, iface["sdn_port_id"], e), exc_info=True) # TODO Set error_msg at instance_nets self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items")) @@ -825,15 +841,15 @@ class vim_thread(threading.Thread): return instance_element_update def get_net(self, task): + task_id = task["instance_action_id"] + "." + str(task["task_index"]) try: - task_id = task["instance_action_id"] + "." + str(task["task_index"]) params = task["params"] filter_param = params[0] instance_element_update = self._get_net_internal(task, filter_param) return True, instance_element_update except (vimconn.vimconnException, VimThreadException) as e: - self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e)) + self.logger.error("task={} get-net: {}".format(task_id, e)) task["status"] = "FAILED" task["vim_id"] = None task["error_msg"] = self._format_vim_error_msg(str(e)) @@ -844,8 +860,9 @@ class vim_thread(threading.Thread): def new_net(self, task): vim_net_id = None sdn_net_id = None + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + action_text = "" try: - task_id = task["instance_action_id"] + "." + str(task["task_index"]) # FIND if task["extra"].get("find"): action_text = "finding" @@ -886,7 +903,7 @@ class vim_thread(threading.Thread): "created": True, "error_msg": None} return True, instance_element_update except (vimconn.vimconnException, ovimException) as e: - self.logger.error("Error {} NET, task={}: {}".format(action_text, task_id, e)) + self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e)) task["status"] = "FAILED" task["vim_id"] = vim_net_id task["error_msg"] = self._format_vim_error_msg(str(e)) -- 2.25.1