X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=edf1bbffde904ae0e548a8577f955495751d3b78;hb=60036703a8343474a54d60d2dbff89109c6b0a34;hp=030707126c7bc44a1ac3270b19531ab866d57e49;hpb=603a77b8455560b3fa836dd8c18e99ba034474c8;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 03070712..edf1bbff 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -32,6 +32,8 @@ import queue from shutil import rmtree import threading import time +import traceback +from typing import Dict from unittest.mock import Mock from importlib_metadata import entry_points @@ -370,6 +372,23 @@ class VimInteractionVdu(VimInteractionBase): if params_copy["flavor_id"].startswith("TASK-"): params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] + affinity_group_list = params_copy["affinity_group_list"] + for affinity_group in affinity_group_list: + # change task_id into affinity_group_id + if "affinity_group_id" in affinity_group and affinity_group[ + "affinity_group_id" + ].startswith("TASK-"): + affinity_group_id = task_depends[ + affinity_group["affinity_group_id"] + ] + + if not affinity_group_id: + raise NsWorkerException( + "found for {}".format(affinity_group["affinity_group_id"]) + ) + + affinity_group["affinity_group_id"] = affinity_group_id + vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] @@ -579,6 +598,7 @@ class VimInteractionVdu(VimInteractionBase): except (vimconn.VimConnException, NsWorkerException) as e: retries += 1 + self.logger.debug(traceback.format_exc()) if retries < self.max_retries_inject_ssh_key: return ( "BUILD", @@ -745,6 +765,112 @@ class VimInteractionFlavor(VimInteractionBase): return "FAILED", ro_vim_item_update +class VimInteractionAffinityGroup(VimInteractionBase): + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + affinity_group_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + + try: + if affinity_group_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_affinity_group(affinity_group_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( + ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-affinity-or-anti-affinity-group={} {}".format( + task_id, + ro_task["target_id"], + affinity_group_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + affinity_group_vim_id = None + affinity_group_data = None + + if task.get("params"): + affinity_group_data = task["params"].get("affinity_group_data") + + if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): + try: + param_affinity_group_id = task["params"]["affinity_group_data"].get( + "vim-affinity-group-id" + ) + affinity_group_vim_id = target_vim.get_affinity_group( + param_affinity_group_id + ).get("id") + except vimconn.VimConnNotFoundException: + self.logger.error( + "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" + "could not be found at VIM. Creating a new one.".format( + task_id, ro_task["target_id"], param_affinity_group_id + ) + ) + + if not affinity_group_vim_id and affinity_group_data: + affinity_group_vim_id = target_vim.new_affinity_group( + affinity_group_data + ) + created = True + + ro_vim_item_update = { + "vim_id": affinity_group_vim_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + } + self.logger.debug( + "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( + task_id, ro_task["target_id"], affinity_group_vim_id, created + ) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} new-affinity-or-anti-affinity-group:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + + return "FAILED", ro_vim_item_update + + class VimInteractionSdnNet(VimInteractionBase): @staticmethod def _match_pci(port_pci, mapping): @@ -1140,15 +1266,39 @@ class VimInteractionSdnNet(VimInteractionBase): return "DONE", ro_vim_item_update_ok -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + + @property + def error(self): + return self.conf["period"]["refresh_error"] + + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ @@ -1163,7 +1313,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1184,6 +1336,9 @@ class NsWorker(threading.Thread): "sdn_net": VimInteractionSdnNet( self.db, self.my_vims, self.db_vims, self.logger ), + "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None # lists of tasks to delete because nsrs or vnfrs has been deleted from db @@ -1534,6 +1689,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -1831,6 +1987,52 @@ class NsWorker(threading.Thread): return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and than updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -1848,13 +2050,16 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh @@ -1866,6 +2071,8 @@ class NsWorker(threading.Thread): if self.logger.getEffectiveLevel() == logging.DEBUG: self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ + # Check if vim status refresh is enabled again + self.update_vm_refresh() # 0: get task_status_create lock_object = None task_status_create = None @@ -2019,10 +2226,8 @@ class NsWorker(threading.Thread): # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: new_status, db_vim_info_update = self.item2class[ task["item"] ].refresh(ro_task)