X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=9ed04d7bd6e894e01a6bd7710065a2e263d5edf2;hp=ee62af4f166df04e5d04802f7e1b79390c26ab6e;hb=78f474e69fde9d64e8716978b5ea38f9f5aace48;hpb=e9a26f282aa9a82e19484514c69b18b381594ab4 diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index ee62af4f..9ed04d7b 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -32,6 +32,7 @@ import queue from shutil import rmtree import threading import time +import traceback from unittest.mock import Mock from importlib_metadata import entry_points @@ -210,6 +211,7 @@ class VimInteractionNet(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-net={} created={}".format( @@ -225,7 +227,7 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -265,11 +267,11 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update["vim_name"] = vim_info.get("name") if vim_info["status"] in ("ERROR", "VIM_ERROR"): - if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): - ro_vim_item_update["vim_details"] = vim_info.get("error_msg") + if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): + ro_vim_item_update["vim_message"] = vim_info.get("error_msg") elif vim_info["status"] == "DELETED": ro_vim_item_update["vim_id"] = None - ro_vim_item_update["vim_details"] = "Deleted externally" + ro_vim_item_update["vim_message"] = "Deleted externally" else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] @@ -281,7 +283,7 @@ class VimInteractionNet(VimInteractionBase): ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") + ro_vim_item_update.get("vim_message") if ro_vim_item_update.get("vim_status") != "ACTIVE" else "", ) @@ -296,7 +298,7 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -307,7 +309,7 @@ class VimInteractionNet(VimInteractionBase): net_vim_id, ro_task["vim_info"]["created_items"] ) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-net={}: {}".format( @@ -316,7 +318,7 @@ class VimInteractionNet(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -326,7 +328,7 @@ class VimInteractionNet(VimInteractionBase): task_id, ro_task["target_id"], net_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -369,17 +371,41 @@ class VimInteractionVdu(VimInteractionBase): if params_copy["flavor_id"].startswith("TASK-"): params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] + affinity_group_list = params_copy["affinity_group_list"] + for affinity_group in affinity_group_list: + # change task_id into affinity_group_id + if "affinity_group_id" in affinity_group and affinity_group[ + "affinity_group_id" + ].startswith("TASK-"): + affinity_group_id = task_depends[ + affinity_group["affinity_group_id"] + ] + + if not affinity_group_id: + raise NsWorkerException( + "found for {}".format(affinity_group["affinity_group_id"]) + ) + + affinity_group["affinity_group_id"] = affinity_group_id + vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] + # add to created items previous_created_volumes (healing) + if task.get("previous_created_volumes"): + for k, v in task["previous_created_volumes"].items(): + created_items[k] = v + ro_vim_item_update = { "vim_id": vim_vm_id, "vim_status": "BUILD", "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, "interfaces_vim_ids": interfaces, "interfaces": [], + "interfaces_backup": [], } self.logger.debug( "task={} {} new-vm={} created={}".format( @@ -395,7 +421,7 @@ class VimInteractionVdu(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -407,18 +433,25 @@ class VimInteractionVdu(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } try: + self.logger.debug( + "delete_vminstance: vm_vim_id={} created_items={}".format( + vm_vim_id, ro_task["vim_info"]["created_items"] + ) + ) if vm_vim_id or ro_task["vim_info"]["created_items"]: target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_vminstance( - vm_vim_id, ro_task["vim_info"]["created_items"] + vm_vim_id, + ro_task["vim_info"]["created_items"], + ro_task["vim_info"].get("volumes_to_hold", []), ) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-vm={}: {}".format( @@ -427,7 +460,7 @@ class VimInteractionVdu(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -437,7 +470,7 @@ class VimInteractionVdu(VimInteractionBase): task_id, ro_task["target_id"], vm_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -525,11 +558,11 @@ class VimInteractionVdu(VimInteractionBase): ro_vim_item_update["vim_name"] = vim_info.get("name") if vim_info["status"] in ("ERROR", "VIM_ERROR"): - if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): - ro_vim_item_update["vim_details"] = vim_info.get("error_msg") + if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): + ro_vim_item_update["vim_message"] = vim_info.get("error_msg") elif vim_info["status"] == "DELETED": ro_vim_item_update["vim_id"] = None - ro_vim_item_update["vim_details"] = "Deleted externally" + ro_vim_item_update["vim_message"] = "Deleted externally" else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] @@ -541,7 +574,7 @@ class VimInteractionVdu(VimInteractionBase): ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") + ro_vim_item_update.get("vim_message") if ro_vim_item_update.get("vim_status") != "ACTIVE" else "", ) @@ -578,6 +611,7 @@ class VimInteractionVdu(VimInteractionBase): except (vimconn.VimConnException, NsWorkerException) as e: retries += 1 + self.logger.debug(traceback.format_exc()) if retries < self.max_retries_inject_ssh_key: return ( "BUILD", @@ -591,7 +625,7 @@ class VimInteractionVdu(VimInteractionBase): self.logger.error( "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) ) - ro_vim_item_update = {"vim_details": str(e)} + ro_vim_item_update = {"vim_message": str(e)} return "FAILED", ro_vim_item_update, db_task_update @@ -630,6 +664,7 @@ class VimInteractionImage(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-image={} created={}".format( @@ -645,7 +680,7 @@ class VimInteractionImage(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -659,7 +694,7 @@ class VimInteractionFlavor(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -668,7 +703,7 @@ class VimInteractionFlavor(VimInteractionBase): target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_flavor(flavor_vim_id) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-flavor={}: {}".format( @@ -677,7 +712,7 @@ class VimInteractionFlavor(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -687,7 +722,7 @@ class VimInteractionFlavor(VimInteractionBase): task_id, ro_task["target_id"], flavor_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -723,6 +758,7 @@ class VimInteractionFlavor(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-flavor={} created={}".format( @@ -738,12 +774,161 @@ class VimInteractionFlavor(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update +class VimInteractionAffinityGroup(VimInteractionBase): + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + affinity_group_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + + try: + if affinity_group_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_affinity_group(affinity_group_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( + ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-affinity-or-anti-affinity-group={} {}".format( + task_id, + ro_task["target_id"], + affinity_group_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + affinity_group_vim_id = None + affinity_group_data = None + + if task.get("params"): + affinity_group_data = task["params"].get("affinity_group_data") + + if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): + try: + param_affinity_group_id = task["params"]["affinity_group_data"].get( + "vim-affinity-group-id" + ) + affinity_group_vim_id = target_vim.get_affinity_group( + param_affinity_group_id + ).get("id") + except vimconn.VimConnNotFoundException: + self.logger.error( + "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" + "could not be found at VIM. Creating a new one.".format( + task_id, ro_task["target_id"], param_affinity_group_id + ) + ) + + if not affinity_group_vim_id and affinity_group_data: + affinity_group_vim_id = target_vim.new_affinity_group( + affinity_group_data + ) + created = True + + ro_vim_item_update = { + "vim_id": affinity_group_vim_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( + task_id, ro_task["target_id"], affinity_group_vim_id, created + ) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} new-affinity-or-anti-affinity-group:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + +class VimInteractionUpdateVdu(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + action = task["params"].get("action") + context = {action: action} + target_vim.action_vminstance(vim_vm_id, context) + # created = True + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) + ) + return "DONE", ro_vim_item_update, db_task_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} VM Migration:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + class VimInteractionSdnNet(VimInteractionBase): @staticmethod def _match_pci(port_pci, mapping): @@ -1064,6 +1249,7 @@ class VimInteractionSdnNet(VimInteractionBase): "created_items": created_items, "connected_ports": connected_ports, "vim_details": sdn_info, + "vim_message": None, "last_update": last_update, } @@ -1078,7 +1264,7 @@ class VimInteractionSdnNet(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -1090,7 +1276,7 @@ class VimInteractionSdnNet(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -1106,7 +1292,7 @@ class VimInteractionSdnNet(VimInteractionBase): isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value ): - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" else: self.logger.error( "ro_task={} vim={} del-sdn-net={}: {}".format( @@ -1118,7 +1304,7 @@ class VimInteractionSdnNet(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -1128,13 +1314,94 @@ class VimInteractionSdnNet(VimInteractionBase): task_id, ro_task["target_id"], sdn_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) return "DONE", ro_vim_item_update_ok +class VimInteractionMigration(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + target_vim = self.my_vims[ro_task["target_id"]] + vim_interfaces = [] + created = False + created_items = {} + refreshed_vim_info = {} + + try: + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + migrate_host = task["params"].get("migrate_host") + _, migrated_compute_node = target_vim.migrate_instance( + vim_vm_id, migrate_host + ) + + if migrated_compute_node: + # When VM is migrated, vdu["vim_info"] needs to be updated + vdu_old_vim_info = task["params"]["vdu_vim_info"].get( + ro_task["target_id"] + ) + + # Refresh VM to get new vim_info + vm_to_refresh_list = [vim_vm_id] + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + refreshed_vim_info = vim_dict[vim_vm_id] + + if refreshed_vim_info.get("interfaces"): + for old_iface in vdu_old_vim_info.get("interfaces"): + iface = next( + ( + iface + for iface in refreshed_vim_info["interfaces"] + if old_iface["vim_interface_id"] + == iface["vim_interface_id"] + ), + None, + ) + vim_interfaces.append(iface) + + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "ACTIVE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + + if refreshed_vim_info and refreshed_vim_info.get("status") not in ( + "ERROR", + "VIM_ERROR", + ): + ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] + + if vim_interfaces: + ro_vim_item_update["interfaces"] = vim_interfaces + + self.logger.debug( + "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) + ) + + return "DONE", ro_vim_item_update, db_task_update + + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} VM Migration:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + class NsWorker(threading.Thread): REFRESH_BUILD = 5 # 5 seconds REFRESH_ACTIVE = 60 # 1 minute @@ -1179,6 +1446,15 @@ class NsWorker(threading.Thread): "sdn_net": VimInteractionSdnNet( self.db, self.my_vims, self.db_vims, self.logger ), + "update": VimInteractionUpdateVdu( + self.db, self.my_vims, self.db_vims, self.logger + ), + "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( + self.db, self.my_vims, self.db_vims, self.logger + ), + "migrate": VimInteractionMigration( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None # lists of tasks to delete because nsrs or vnfrs has been deleted from db @@ -1501,6 +1777,24 @@ class NsWorker(threading.Thread): try: while True: + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task( + None, + None, + None, + "TASK_WF", + "task_locked_time=" + + str(self.task_locked_time) + + " " + + "time_last_task_processed=" + + str(self.time_last_task_processed) + + " " + + "now=" + + str(now), + ) + """ locked = self.db.set_one( "ro_tasks", q_filter={ @@ -1541,6 +1835,128 @@ class NsWorker(threading.Thread): return None + def _get_db_all_tasks(self): + """ + Read all content of table ro_tasks to log it + :return: None + """ + try: + # Checking the content of the BD: + + # read and return + ro_task = self.db.get_list("ro_tasks") + for rt in ro_task: + self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS") + return ro_task + + except DbException as e: + self.logger.error("Database exception at _get_db_all_tasks: {}".format(e)) + except Exception as e: + self.logger.critical( + "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True + ) + + return None + + def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event): + """ + Generate a log with the following format: + + Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by; + target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id; + task_array_index;task_id;task_action;task_item;task_args + + Example: + + TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013; + 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a + ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False, + 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None, + 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED; + 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2; + CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}} + """ + try: + line = [] + i = 0 + if ro_task is not None and isinstance(ro_task, dict): + for t in ro_task["tasks"]: + line.clear() + line.append(mark) + line.append(event) + line.append(ro_task.get("_id", "")) + line.append(str(ro_task.get("locked_at", ""))) + line.append(str(ro_task.get("modified_at", ""))) + line.append(str(ro_task.get("created_at", ""))) + line.append(str(ro_task.get("to_check_at", ""))) + line.append(str(ro_task.get("locked_by", ""))) + line.append(str(ro_task.get("target_id", ""))) + line.append(str(ro_task.get("vim_info", {}).get("refresh_at", ""))) + line.append(str(ro_task.get("vim_info", ""))) + line.append(str(ro_task.get("tasks", ""))) + if isinstance(t, dict): + line.append(str(t.get("status", ""))) + line.append(str(t.get("action_id", ""))) + line.append(str(i)) + line.append(str(t.get("task_id", ""))) + line.append(str(t.get("action", ""))) + line.append(str(t.get("item", ""))) + line.append(str(t.get("find_params", ""))) + line.append(str(t.get("params", ""))) + else: + line.extend([""] * 2) + line.append(str(i)) + line.extend([""] * 5) + + i += 1 + self.logger.debug(";".join(line)) + elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict): + i = 0 + while True: + st = "tasks.{}.status".format(i) + if st not in db_ro_task_update: + break + line.clear() + line.append(mark) + line.append(event) + line.append(db_ro_task_update.get("_id", "")) + line.append(str(db_ro_task_update.get("locked_at", ""))) + line.append(str(db_ro_task_update.get("modified_at", ""))) + line.append("") + line.append(str(db_ro_task_update.get("to_check_at", ""))) + line.append(str(db_ro_task_update.get("locked_by", ""))) + line.append("") + line.append(str(db_ro_task_update.get("vim_info.refresh_at", ""))) + line.append("") + line.append(str(db_ro_task_update.get("vim_info", ""))) + line.append(str(str(db_ro_task_update).count(".status"))) + line.append(db_ro_task_update.get(st, "")) + line.append("") + line.append(str(i)) + line.extend([""] * 3) + i += 1 + self.logger.debug(";".join(line)) + + elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict): + line.clear() + line.append(mark) + line.append(event) + line.append(db_ro_task_delete.get("_id", "")) + line.append("") + line.append(db_ro_task_delete.get("modified_at", "")) + line.extend([""] * 13) + self.logger.debug(";".join(line)) + + else: + line.clear() + line.append(mark) + line.append(event) + line.extend([""] * 16) + self.logger.debug(";".join(line)) + + except Exception as e: + self.logger.error("Error logging ro_task: {}".format(e)) + def _delete_task(self, ro_task, task_index, task_depends, db_update): """ Determine if this task need to be done or superseded @@ -1552,6 +1968,7 @@ class NsWorker(threading.Thread): "created_items", False ) + self.logger.warning("Needed delete: {}".format(needed_delete)) if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? @@ -1575,6 +1992,9 @@ class NsWorker(threading.Thread): needed_delete = False if needed_delete: + self.logger.warning( + "Deleting ro_task={} task_index={}".format(ro_task, task_index) + ) return self.item2class[my_task["item"]].delete(ro_task, task_index) else: return "SUPERSEDED", None @@ -1587,7 +2007,7 @@ class NsWorker(threading.Thread): exc_info=True, ) - return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} + return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} def _create_task(self, ro_task, task_index, task_depends, db_update): """ @@ -1625,7 +2045,7 @@ class NsWorker(threading.Thread): ) task_status = "FAILED" - ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} # TODO update ro_vim_item_update return task_status, ro_vim_item_update @@ -1677,8 +2097,9 @@ class NsWorker(threading.Thread): fail_on_empty=False, ) + self.logger.warning("ro_task_dependency={}".format(ro_task_dependency)) if ro_task_dependency: - for task_index, task in ro_task_dependency["tasks"]: + for task_index, task in enumerate(ro_task_dependency["tasks"]): if task["task_id"] == task_id: return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) @@ -1713,6 +2134,11 @@ class NsWorker(threading.Thread): ro_task["vim_info"]["refresh_at"] = next_refresh try: + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") + """ # 0: get task_status_create lock_object = None task_status_create = None @@ -1773,6 +2199,11 @@ class NsWorker(threading.Thread): dependency_task = dependency_ro_task["tasks"][ dependency_task_index ] + self.logger.warning( + "dependency_ro_task={} dependency_task_index={}".format( + dependency_ro_task, dependency_task_index + ) + ) if dependency_task["status"] == "SCHEDULED": dependency_not_completed = True @@ -1793,7 +2224,7 @@ class NsWorker(threading.Thread): dependency_task["item"], dependency_task_id, dependency_ro_task["vim_info"].get( - "vim_details" + "vim_message" ), ) self.logger.error( @@ -1809,6 +2240,11 @@ class NsWorker(threading.Thread): ] = dependency_ro_task["vim_info"]["vim_id"] if dependency_not_completed: + self.logger.warning( + "DEPENDENCY NOT COMPLETED {}".format( + dependency_ro_task["vim_info"]["vim_id"] + ) + ) # TODO set at vim_info.vim_details that it is waiting continue @@ -1874,12 +2310,17 @@ class NsWorker(threading.Thread): task["item"] ].refresh(ro_task) _update_refresh(new_status) + else: + # The refresh is updated to avoid set the value of "refresh_at" to + # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, + # because it can happen that in this case the task is never processed + _update_refresh(task["status"]) except Exception as e: new_status = "FAILED" db_vim_info_update = { "vim_status": "VIM_ERROR", - "vim_details": str(e), + "vim_message": str(e), } if not isinstance( @@ -1958,6 +2399,14 @@ class NsWorker(threading.Thread): db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + db_ro_task_update_log = db_ro_task_update.copy() + db_ro_task_update_log["_id"] = q_filter["_id"] + self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") + """ + if not self.db.set_one( "ro_tasks", update_dict=db_ro_task_update, @@ -1966,6 +2415,17 @@ class NsWorker(threading.Thread): ): del db_ro_task_update["to_check_at"] del q_filter["to_check_at"] + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task( + None, + db_ro_task_update_log, + None, + "TASK_WF", + "SET_TASK " + str(q_filter), + ) + """ self.db.set_one( "ro_tasks", q_filter=q_filter, @@ -1994,7 +2454,15 @@ class NsWorker(threading.Thread): path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k - in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces") + in ( + "vim_id", + "vim_details", + "vim_message", + "vim_name", + "vim_status", + "interfaces", + "interfaces_backup", + ) } if path_vim_status.startswith("vdur."): @@ -2047,6 +2515,21 @@ class NsWorker(threading.Thread): ).split(";")[0] self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) + + # If interfaces exists, it backups VDU interfaces in the DB for healing operations + if ro_vim_item_update.get("interfaces"): + search_key = path_vim_status + ".interfaces" + if update_dict.get(search_key): + interfaces_backup_update = { + path_vim_status + ".interfaces_backup": update_dict[search_key] + } + + self.db.set_one( + table, + q_filter={"_id": _id}, + update_dict=interfaces_backup_update, + ) + else: update_dict = {path_item + ".status": "DELETED"} self.db.set_one( @@ -2180,8 +2663,15 @@ class NsWorker(threading.Thread): if self.tasks_to_delete: self._process_delete_db_tasks() busy = False + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + _ = self._get_db_all_tasks() + """ ro_task = self._get_db_task() if ro_task: + self.logger.warning("Task to process: {}".format(ro_task)) + time.sleep(1) self._process_pending_tasks(ro_task) busy = True if not busy: