X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=6336d9e60f50d5174228ec788f8016614f3f44a9;hb=93f4addd7625efbb4e0e9042c65908e6fb5714f6;hp=8a5c5c1e336ff6a8f4a49b86e1f009b3d661fcc9;hpb=1c21a746c7290ecb839a5d03c7573349d2a5dec4;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 8a5c5c1e..6336d9e6 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -33,6 +33,7 @@ from shutil import rmtree import threading import time import traceback +from typing import Dict from unittest.mock import Mock from importlib_metadata import entry_points @@ -488,8 +489,10 @@ class VimInteractionVdu(VimInteractionBase): vim_info_info = yaml.safe_load(vim_info["vim_info"]) if vim_info_info.get("name"): vim_info["name"] = vim_info_info["name"] - except Exception: - pass + except Exception as vim_info_error: + self.logger.exception( + f"{vim_info_error} occured while getting the vim_info from yaml" + ) except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error( @@ -729,7 +732,7 @@ class VimInteractionFlavor(VimInteractionBase): flavor_data = task["find_params"]["flavor_data"] vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) except vimconn.VimConnNotFoundException: - pass + self.logger.exception("VimConnNotFoundException occured.") if not vim_flavor_id and task.get("params"): # CREATE @@ -974,7 +977,6 @@ class VimInteractionSdnNet(VimInteractionBase): return self.new(ro_task, task_create_index, None) def new(self, ro_task, task_index, task_depends): - task = ro_task["tasks"][task_index] task_id = task["task_id"] target_vim = self.my_vims[ro_task["target_id"]] @@ -1265,15 +1267,39 @@ class VimInteractionSdnNet(VimInteractionBase): return "DONE", ro_vim_item_update_ok -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + @property + def error(self): + return self.conf["period"]["refresh_error"] + + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ @@ -1288,7 +1314,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1358,7 +1386,9 @@ class NsWorker(threading.Thread): try: mkdir(file_name) except FileExistsError: - pass + self.logger.exception( + "FileExistsError occured while processing vim_config." + ) file_name = file_name + "/ca_cert" @@ -1411,7 +1441,8 @@ class NsWorker(threading.Thread): self.logger.info("Unloaded {}".format(target_id)) rmtree("{}:{}".format(target_id, self.worker_index)) except FileNotFoundError: - pass # this is raised by rmtree if folder does not exist + # This is raised by rmtree if folder does not exist. + self.logger.exception("FileNotFoundError occured while unloading VIM.") except Exception as e: self.logger.error("Cannot unload {}: {}".format(target_id, e)) @@ -1662,6 +1693,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -1959,6 +1991,66 @@ class NsWorker(threading.Thread): return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self, ro_task): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and then updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + next_refresh = self._get_next_refresh(ro_task, next_refresh) + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + + def _get_next_refresh(self, ro_task: dict, next_refresh: float): + """Decide the next_refresh according to vim type and refresh config period. + Args: + ro_task (dict): ro_task details + next_refresh (float): next refresh time as epoch format + + Returns: + next_refresh (float) -1 if vm updates are disabled or vim type is openstack. + """ + target_vim = ro_task["target_id"] + vim_type = self.db_vims[target_vim]["vim_type"] + if self.refresh_config.active == -1 or vim_type == "openstack": + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + return next_refresh + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -1976,13 +2068,13 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + next_refresh = self._get_next_refresh(ro_task, next_refresh) else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh @@ -1994,6 +2086,8 @@ class NsWorker(threading.Thread): if self.logger.getEffectiveLevel() == logging.DEBUG: self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ + # Check if vim status refresh is enabled again + self.update_vm_refresh(ro_task) # 0: get task_status_create lock_object = None task_status_create = None @@ -2101,7 +2195,10 @@ class NsWorker(threading.Thread): ) if task["action"] == "DELETE": - (new_status, db_vim_info_update,) = self._delete_task( + ( + new_status, + db_vim_info_update, + ) = self._delete_task( ro_task, task_index, task_depends, db_ro_task_update ) new_status = ( @@ -2147,10 +2244,8 @@ class NsWorker(threading.Thread): # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: new_status, db_vim_info_update = self.item2class[ task["item"] ].refresh(ro_task)