X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=d2b00d65beaa9a262b91542ef07eee49d7518ac6;hb=bf3e9a819a7c0153fa58b5140d1898147740977a;hp=ab58a1f290d9be8f975fe5c1c06aaaaff3c049d2;hpb=123de18fe672551ac782aec4188150d3fe76c961;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index ab58a1f2..d2b00d65 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -32,6 +32,8 @@ import queue from shutil import rmtree import threading import time +import traceback +from typing import Dict from unittest.mock import Mock from importlib_metadata import entry_points @@ -210,6 +212,7 @@ class VimInteractionNet(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-net={} created={}".format( @@ -225,7 +228,7 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -265,11 +268,11 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update["vim_name"] = vim_info.get("name") if vim_info["status"] in ("ERROR", "VIM_ERROR"): - if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): - ro_vim_item_update["vim_details"] = vim_info.get("error_msg") + if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): + ro_vim_item_update["vim_message"] = vim_info.get("error_msg") elif vim_info["status"] == "DELETED": ro_vim_item_update["vim_id"] = None - ro_vim_item_update["vim_details"] = "Deleted externally" + ro_vim_item_update["vim_message"] = "Deleted externally" else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] @@ -281,7 +284,7 @@ class VimInteractionNet(VimInteractionBase): ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") + ro_vim_item_update.get("vim_message") if ro_vim_item_update.get("vim_status") != "ACTIVE" else "", ) @@ -296,7 +299,7 @@ class VimInteractionNet(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -307,7 +310,7 @@ class VimInteractionNet(VimInteractionBase): net_vim_id, ro_task["vim_info"]["created_items"] ) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-net={}: {}".format( @@ -316,7 +319,7 @@ class VimInteractionNet(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -326,7 +329,7 @@ class VimInteractionNet(VimInteractionBase): task_id, ro_task["target_id"], net_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -389,14 +392,21 @@ class VimInteractionVdu(VimInteractionBase): vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] + # add to created items previous_created_volumes (healing) + if task.get("previous_created_volumes"): + for k, v in task["previous_created_volumes"].items(): + created_items[k] = v + ro_vim_item_update = { "vim_id": vim_vm_id, "vim_status": "BUILD", "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, "interfaces_vim_ids": interfaces, "interfaces": [], + "interfaces_backup": [], } self.logger.debug( "task={} {} new-vm={} created={}".format( @@ -406,13 +416,14 @@ class VimInteractionVdu(VimInteractionBase): return "BUILD", ro_vim_item_update except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) self.logger.error( "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) ) ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -424,18 +435,25 @@ class VimInteractionVdu(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } try: + self.logger.debug( + "delete_vminstance: vm_vim_id={} created_items={}".format( + vm_vim_id, ro_task["vim_info"]["created_items"] + ) + ) if vm_vim_id or ro_task["vim_info"]["created_items"]: target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_vminstance( - vm_vim_id, ro_task["vim_info"]["created_items"] + vm_vim_id, + ro_task["vim_info"]["created_items"], + ro_task["vim_info"].get("volumes_to_hold", []), ) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-vm={}: {}".format( @@ -444,7 +462,7 @@ class VimInteractionVdu(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -454,7 +472,7 @@ class VimInteractionVdu(VimInteractionBase): task_id, ro_task["target_id"], vm_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -486,8 +504,10 @@ class VimInteractionVdu(VimInteractionBase): vim_info_info = yaml.safe_load(vim_info["vim_info"]) if vim_info_info.get("name"): vim_info["name"] = vim_info_info["name"] - except Exception: - pass + except Exception as vim_info_error: + self.logger.exception( + f"{vim_info_error} occured while getting the vim_info from yaml" + ) except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error( @@ -542,11 +562,11 @@ class VimInteractionVdu(VimInteractionBase): ro_vim_item_update["vim_name"] = vim_info.get("name") if vim_info["status"] in ("ERROR", "VIM_ERROR"): - if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): - ro_vim_item_update["vim_details"] = vim_info.get("error_msg") + if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): + ro_vim_item_update["vim_message"] = vim_info.get("error_msg") elif vim_info["status"] == "DELETED": ro_vim_item_update["vim_id"] = None - ro_vim_item_update["vim_details"] = "Deleted externally" + ro_vim_item_update["vim_message"] = "Deleted externally" else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] @@ -558,7 +578,7 @@ class VimInteractionVdu(VimInteractionBase): ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") + ro_vim_item_update.get("vim_message") if ro_vim_item_update.get("vim_status") != "ACTIVE" else "", ) @@ -595,6 +615,7 @@ class VimInteractionVdu(VimInteractionBase): except (vimconn.VimConnException, NsWorkerException) as e: retries += 1 + self.logger.debug(traceback.format_exc()) if retries < self.max_retries_inject_ssh_key: return ( "BUILD", @@ -608,7 +629,7 @@ class VimInteractionVdu(VimInteractionBase): self.logger.error( "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) ) - ro_vim_item_update = {"vim_details": str(e)} + ro_vim_item_update = {"vim_message": str(e)} return "FAILED", ro_vim_item_update, db_task_update @@ -647,6 +668,7 @@ class VimInteractionImage(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-image={} created={}".format( @@ -662,7 +684,7 @@ class VimInteractionImage(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -676,7 +698,7 @@ class VimInteractionFlavor(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -685,7 +707,7 @@ class VimInteractionFlavor(VimInteractionBase): target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_flavor(flavor_vim_id) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-flavor={}: {}".format( @@ -694,7 +716,7 @@ class VimInteractionFlavor(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -704,7 +726,7 @@ class VimInteractionFlavor(VimInteractionBase): task_id, ro_task["target_id"], flavor_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -726,7 +748,7 @@ class VimInteractionFlavor(VimInteractionBase): flavor_data = task["find_params"]["flavor_data"] vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) except vimconn.VimConnNotFoundException: - pass + self.logger.warning("VimConnNotFoundException occured.") if not vim_flavor_id and task.get("params"): # CREATE @@ -740,6 +762,7 @@ class VimInteractionFlavor(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-flavor={} created={}".format( @@ -755,7 +778,7 @@ class VimInteractionFlavor(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -769,7 +792,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -778,7 +801,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_affinity_group(affinity_group_vim_id) except vimconn.VimConnNotFoundException: - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" except vimconn.VimConnException as e: self.logger.error( "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( @@ -787,7 +810,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -797,7 +820,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): task_id, ro_task["target_id"], affinity_group_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) @@ -845,6 +868,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): "created": created, "created_items": created_items, "vim_details": None, + "vim_message": None, } self.logger.debug( "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( @@ -861,12 +885,54 @@ class VimInteractionAffinityGroup(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update +class VimInteractionUpdateVdu(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + action = task["params"].get("action") + context = {action: action} + target_vim.action_vminstance(vim_vm_id, context) + # created = True + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) + ) + return "DONE", ro_vim_item_update, db_task_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} VM Migration:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + class VimInteractionSdnNet(VimInteractionBase): @staticmethod def _match_pci(port_pci, mapping): @@ -989,11 +1055,15 @@ class VimInteractionSdnNet(VimInteractionBase): try: # CREATE params = task["params"] - vlds_to_connect = params["vlds"] - associated_vim = params["target_vim"] + vlds_to_connect = params.get("vlds", []) + associated_vim = params.get("target_vim") # external additional ports additional_ports = params.get("sdn-ports") or () - _, _, vim_account_id = associated_vim.partition(":") + _, _, vim_account_id = ( + (None, None, None) + if associated_vim is None + else associated_vim.partition(":") + ) if associated_vim: # get associated VIM @@ -1187,6 +1257,7 @@ class VimInteractionSdnNet(VimInteractionBase): "created_items": created_items, "connected_ports": connected_ports, "vim_details": sdn_info, + "vim_message": None, "last_update": last_update, } @@ -1201,7 +1272,7 @@ class VimInteractionSdnNet(VimInteractionBase): ro_vim_item_update = { "vim_status": "VIM_ERROR", "created": created, - "vim_details": str(e), + "vim_message": str(e), } return "FAILED", ro_vim_item_update @@ -1213,7 +1284,7 @@ class VimInteractionSdnNet(VimInteractionBase): ro_vim_item_update_ok = { "vim_status": "DELETED", "created": False, - "vim_details": "DELETED", + "vim_message": "DELETED", "vim_id": None, } @@ -1229,7 +1300,7 @@ class VimInteractionSdnNet(VimInteractionBase): isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value ): - ro_vim_item_update_ok["vim_details"] = "already deleted" + ro_vim_item_update_ok["vim_message"] = "already deleted" else: self.logger.error( "ro_task={} vim={} del-sdn-net={}: {}".format( @@ -1241,7 +1312,7 @@ class VimInteractionSdnNet(VimInteractionBase): ) ro_vim_item_update = { "vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e), + "vim_message": "Error while deleting: {}".format(e), } return "FAILED", ro_vim_item_update @@ -1251,25 +1322,198 @@ class VimInteractionSdnNet(VimInteractionBase): task_id, ro_task["target_id"], sdn_vim_id, - ro_vim_item_update_ok.get("vim_details", ""), + ro_vim_item_update_ok.get("vim_message", ""), ) ) return "DONE", ro_vim_item_update_ok -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class VimInteractionMigration(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + target_vim = self.my_vims[ro_task["target_id"]] + vim_interfaces = [] + created = False + created_items = {} + refreshed_vim_info = {} + + try: + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + migrate_host = task["params"].get("migrate_host") + _, migrated_compute_node = target_vim.migrate_instance( + vim_vm_id, migrate_host + ) + + if migrated_compute_node: + # When VM is migrated, vdu["vim_info"] needs to be updated + vdu_old_vim_info = task["params"]["vdu_vim_info"].get( + ro_task["target_id"] + ) + + # Refresh VM to get new vim_info + vm_to_refresh_list = [vim_vm_id] + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + refreshed_vim_info = vim_dict[vim_vm_id] + + if refreshed_vim_info.get("interfaces"): + for old_iface in vdu_old_vim_info.get("interfaces"): + iface = next( + ( + iface + for iface in refreshed_vim_info["interfaces"] + if old_iface["vim_interface_id"] + == iface["vim_interface_id"] + ), + None, + ) + vim_interfaces.append(iface) + + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "ACTIVE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + + if refreshed_vim_info and refreshed_vim_info.get("status") not in ( + "ERROR", + "VIM_ERROR", + ): + ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] + + if vim_interfaces: + ro_vim_item_update["interfaces"] = vim_interfaces + + self.logger.debug( + "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) + ) + + return "DONE", ro_vim_item_update, db_task_update + + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} VM Migration:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + +class VimInteractionResize(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + created = False + target_flavor_uuid = None + created_items = {} + refreshed_vim_info = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + flavor_dict = task["params"].get("flavor_dict") + self.logger.info("flavor_dict %s", flavor_dict) + + try: + target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict) + except Exception as e: + self.logger.info("Cannot find any flavor matching %s.", str(e)) + try: + target_flavor_uuid = target_vim.new_flavor(flavor_dict) + except Exception as e: + self.logger.error("Error creating flavor at VIM %s.", str(e)) + + if target_flavor_uuid is not None: + resized_status = target_vim.resize_instance( + vim_vm_id, target_flavor_uuid + ) + + if resized_status: + # Refresh VM to get new vim_info + vm_to_refresh_list = [vim_vm_id] + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + refreshed_vim_info = vim_dict[vim_vm_id] + + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + + if refreshed_vim_info and refreshed_vim_info.get("status") not in ( + "ERROR", + "VIM_ERROR", + ): + ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] + + self.logger.debug( + "task={} {} resize done".format(task_id, ro_task["target_id"]) + ) + return "DONE", ro_vim_item_update, db_task_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + @property + def error(self): + return self.conf["period"]["refresh_error"] + + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ - :param worker_index: thread index :param config: general configuration of RO, among others the process_id with the docker id where it runs :param plugins: global shared dict with the loaded plugins @@ -1281,7 +1525,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1302,9 +1548,18 @@ class NsWorker(threading.Thread): "sdn_net": VimInteractionSdnNet( self.db, self.my_vims, self.db_vims, self.logger ), + "update": VimInteractionUpdateVdu( + self.db, self.my_vims, self.db_vims, self.logger + ), "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( self.db, self.my_vims, self.db_vims, self.logger ), + "migrate": VimInteractionMigration( + self.db, self.my_vims, self.db_vims, self.logger + ), + "verticalscale": VimInteractionResize( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None # lists of tasks to delete because nsrs or vnfrs has been deleted from db @@ -1351,7 +1606,9 @@ class NsWorker(threading.Thread): try: mkdir(file_name) except FileExistsError: - pass + self.logger.exception( + "FileExistsError occured while processing vim_config." + ) file_name = file_name + "/ca_cert" @@ -1404,7 +1661,8 @@ class NsWorker(threading.Thread): self.logger.info("Unloaded {}".format(target_id)) rmtree("{}:{}".format(target_id, self.worker_index)) except FileNotFoundError: - pass # this is raised by rmtree if folder does not exist + # This is raised by rmtree if folder does not exist. + self.logger.exception("FileNotFoundError occured while unloading VIM.") except Exception as e: self.logger.error("Cannot unload {}: {}".format(target_id, e)) @@ -1576,7 +1834,7 @@ class NsWorker(threading.Thread): persistent_info={}, ) else: # sdn - plugin_name = "rosdn_" + vim["type"] + plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) step = "Loading plugin '{}'".format(plugin_name) vim_module_conn = self._load_plugin(plugin_name, "sdn") step = "Loading {}'".format(target_id) @@ -1652,6 +1910,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -1722,7 +1981,7 @@ class NsWorker(threading.Thread): 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False, 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None, - 'vim_details': None, 'refresh_at': None};1;SCHEDULED; + 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED; 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2; CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}} """ @@ -1818,6 +2077,7 @@ class NsWorker(threading.Thread): "created_items", False ) + self.logger.debug("Needed delete: {}".format(needed_delete)) if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? @@ -1841,6 +2101,9 @@ class NsWorker(threading.Thread): needed_delete = False if needed_delete: + self.logger.debug( + "Deleting ro_task={} task_index={}".format(ro_task, task_index) + ) return self.item2class[my_task["item"]].delete(ro_task, task_index) else: return "SUPERSEDED", None @@ -1853,7 +2116,7 @@ class NsWorker(threading.Thread): exc_info=True, ) - return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} + return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} def _create_task(self, ro_task, task_index, task_depends, db_update): """ @@ -1891,7 +2154,7 @@ class NsWorker(threading.Thread): ) task_status = "FAILED" - ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} # TODO update ro_vim_item_update return task_status, ro_vim_item_update @@ -1943,12 +2206,59 @@ class NsWorker(threading.Thread): fail_on_empty=False, ) + self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) if ro_task_dependency: - for task_index, task in ro_task_dependency["tasks"]: + for task_index, task in enumerate(ro_task_dependency["tasks"]): if task["task_id"] == task_id: return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and than updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -1966,13 +2276,16 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh @@ -1984,6 +2297,8 @@ class NsWorker(threading.Thread): if self.logger.getEffectiveLevel() == logging.DEBUG: self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ + # Check if vim status refresh is enabled again + self.update_vm_refresh() # 0: get task_status_create lock_object = None task_status_create = None @@ -2044,6 +2359,11 @@ class NsWorker(threading.Thread): dependency_task = dependency_ro_task["tasks"][ dependency_task_index ] + self.logger.debug( + "dependency_ro_task={} dependency_task_index={}".format( + dependency_ro_task, dependency_task_index + ) + ) if dependency_task["status"] == "SCHEDULED": dependency_not_completed = True @@ -2064,7 +2384,7 @@ class NsWorker(threading.Thread): dependency_task["item"], dependency_task_id, dependency_ro_task["vim_info"].get( - "vim_details" + "vim_message" ), ) self.logger.error( @@ -2080,6 +2400,11 @@ class NsWorker(threading.Thread): ] = dependency_ro_task["vim_info"]["vim_id"] if dependency_not_completed: + self.logger.warning( + "DEPENDENCY NOT COMPLETED {}".format( + dependency_ro_task["vim_info"]["vim_id"] + ) + ) # TODO set at vim_info.vim_details that it is waiting continue @@ -2137,11 +2462,9 @@ class NsWorker(threading.Thread): # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): - new_status, db_vim_info_update = self.item2class[ + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: + (new_status, db_vim_info_update,) = self.item2class[ task["item"] ].refresh(ro_task) _update_refresh(new_status) @@ -2155,7 +2478,7 @@ class NsWorker(threading.Thread): new_status = "FAILED" db_vim_info_update = { "vim_status": "VIM_ERROR", - "vim_details": str(e), + "vim_message": str(e), } if not isinstance( @@ -2289,7 +2612,15 @@ class NsWorker(threading.Thread): path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k - in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces") + in ( + "vim_id", + "vim_details", + "vim_message", + "vim_name", + "vim_status", + "interfaces", + "interfaces_backup", + ) } if path_vim_status.startswith("vdur."): @@ -2342,6 +2673,21 @@ class NsWorker(threading.Thread): ).split(";")[0] self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) + + # If interfaces exists, it backups VDU interfaces in the DB for healing operations + if ro_vim_item_update.get("interfaces"): + search_key = path_vim_status + ".interfaces" + if update_dict.get(search_key): + interfaces_backup_update = { + path_vim_status + ".interfaces_backup": update_dict[search_key] + } + + self.db.set_one( + table, + q_filter={"_id": _id}, + update_dict=interfaces_backup_update, + ) + else: update_dict = {path_item + ".status": "DELETED"} self.db.set_one( @@ -2482,6 +2828,8 @@ class NsWorker(threading.Thread): """ ro_task = self._get_db_task() if ro_task: + self.logger.debug("Task to process: {}".format(ro_task)) + time.sleep(1) self._process_pending_tasks(ro_task) busy = True if not busy: