X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=d278957f9201b4715f2329691ba9ff2e99513f4b;hb=24a2b5184371649bff2806b9a72473f315787ef2;hp=cb52d77cc5275c239f14a910fdb0efd11cfe479d;hpb=7b521f73dd996a279f23b2f512cd89a42c1c79f6;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index cb52d77c..d278957f 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -27,9 +27,9 @@ A ro_task can contain several 'tasks', each one with a target, where to store th from copy import deepcopy from http import HTTPStatus import logging -from os import mkdir +from os import makedirs +from os import path import queue -from shutil import rmtree import threading import time import traceback @@ -39,7 +39,8 @@ from unittest.mock import Mock from importlib_metadata import entry_points from osm_common.dbbase import DbException from osm_ng_ro.vim_admin import LockRenew -from osm_ro_plugin import sdnconn, vimconn +from osm_ro_plugin import sdnconn +from osm_ro_plugin import vimconn from osm_ro_plugin.sdn_dummy import SdnDummyConnector from osm_ro_plugin.vim_dummy import VimDummyConnector import yaml @@ -747,8 +748,10 @@ class VimInteractionFlavor(VimInteractionBase): try: flavor_data = task["find_params"]["flavor_data"] vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) - except vimconn.VimConnNotFoundException: - self.logger.exception("VimConnNotFoundException occured.") + except vimconn.VimConnNotFoundException as flavor_not_found_msg: + self.logger.warning( + f"VimConnNotFoundException occured: {flavor_not_found_msg}" + ) if not vim_flavor_id and task.get("params"): # CREATE @@ -1037,7 +1040,6 @@ class VimInteractionSdnNet(VimInteractionBase): return self.new(ro_task, task_create_index, None) def new(self, ro_task, task_index, task_depends): - task = ro_task["tasks"][task_index] task_id = task["task_id"] target_vim = self.my_vims[ro_task["target_id"]] @@ -1055,11 +1057,15 @@ class VimInteractionSdnNet(VimInteractionBase): try: # CREATE params = task["params"] - vlds_to_connect = params["vlds"] - associated_vim = params["target_vim"] + vlds_to_connect = params.get("vlds", []) + associated_vim = params.get("target_vim") # external additional ports additional_ports = params.get("sdn-ports") or () - _, _, vim_account_id = associated_vim.partition(":") + _, _, vim_account_id = ( + (None, None, None) + if associated_vim is None + else associated_vim.partition(":") + ) if associated_vim: # get associated VIM @@ -1583,7 +1589,7 @@ class NsWorker(threading.Thread): self.task_lock.release() return False - def _process_vim_config(self, target_id, db_vim): + def _process_vim_config(self, target_id: str, db_vim: dict) -> None: """ Process vim config, creating vim configuration files as ca_cert :param target_id: vim/sdn/wim + id @@ -1594,17 +1600,14 @@ class NsWorker(threading.Thread): return file_name = "" + work_dir = "/app/osm_ro/certs" try: if db_vim["config"].get("ca_cert_content"): - file_name = "{}:{}".format(target_id, self.worker_index) + file_name = f"{work_dir}/{target_id}:{self.worker_index}" - try: - mkdir(file_name) - except FileExistsError: - self.logger.exception( - "FileExistsError occured while processing vim_config." - ) + if not path.isdir(file_name): + makedirs(file_name) file_name = file_name + "/ca_cert" @@ -1655,10 +1658,6 @@ class NsWorker(threading.Thread): self.vim_targets.remove(target_id) self.logger.info("Unloaded {}".format(target_id)) - rmtree("{}:{}".format(target_id, self.worker_index)) - except FileNotFoundError: - # This is raised by rmtree if folder does not exist. - self.logger.exception("FileNotFoundError occured while unloading VIM.") except Exception as e: self.logger.error("Cannot unload {}: {}".format(target_id, e)) @@ -1830,14 +1829,17 @@ class NsWorker(threading.Thread): persistent_info={}, ) else: # sdn - plugin_name = "rosdn_" + vim["type"] + plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) step = "Loading plugin '{}'".format(plugin_name) vim_module_conn = self._load_plugin(plugin_name, "sdn") step = "Loading {}'".format(target_id) wim = deepcopy(vim) wim_config = wim.pop("config", {}) or {} wim["uuid"] = wim["_id"] - wim["wim_url"] = wim["url"] + if "url" in wim and "wim_url" not in wim: + wim["wim_url"] = wim["url"] + elif "url" not in wim and "wim_url" in wim: + wim["url"] = wim["wim_url"] if wim.get("dpid"): wim_config["dpid"] = wim.pop("dpid") @@ -2073,7 +2075,7 @@ class NsWorker(threading.Thread): "created_items", False ) - self.logger.warning("Needed delete: {}".format(needed_delete)) + self.logger.debug("Needed delete: {}".format(needed_delete)) if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? @@ -2097,7 +2099,7 @@ class NsWorker(threading.Thread): needed_delete = False if needed_delete: - self.logger.warning( + self.logger.debug( "Deleting ro_task={} task_index={}".format(ro_task, task_index) ) return self.item2class[my_task["item"]].delete(ro_task, task_index) @@ -2202,25 +2204,22 @@ class NsWorker(threading.Thread): fail_on_empty=False, ) - self.logger.warning("ro_task_dependency={}".format(ro_task_dependency)) + self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) if ro_task_dependency: for task_index, task in enumerate(ro_task_dependency["tasks"]): if task["task_id"] == task_id: return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) - def update_vm_refresh(self): + def update_vm_refresh(self, ro_task): """Enables the VM status updates if self.refresh_config.active parameter - is not -1 and than updates the DB accordingly + is not -1 and then updates the DB accordingly """ try: self.logger.debug("Checking if VM status update config") next_refresh = time.time() - if self.refresh_config.active == -1: - next_refresh = -1 - else: - next_refresh += self.refresh_config.active + next_refresh = self._get_next_refresh(ro_task, next_refresh) if next_refresh != -1: db_ro_task_update = {} @@ -2255,6 +2254,23 @@ class NsWorker(threading.Thread): except Exception as e: self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + def _get_next_refresh(self, ro_task: dict, next_refresh: float): + """Decide the next_refresh according to vim type and refresh config period. + Args: + ro_task (dict): ro_task details + next_refresh (float): next refresh time as epoch format + + Returns: + next_refresh (float) -1 if vm updates are disabled or vim type is openstack. + """ + target_vim = ro_task["target_id"] + vim_type = self.db_vims[target_vim]["vim_type"] + if self.refresh_config.active == -1 or vim_type == "openstack": + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + return next_refresh + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -2276,10 +2292,7 @@ class NsWorker(threading.Thread): elif new_status == "BUILD": next_refresh += self.refresh_config.build elif new_status == "DONE": - if self.refresh_config.active == -1: - next_refresh = -1 - else: - next_refresh += self.refresh_config.active + next_refresh = self._get_next_refresh(ro_task, next_refresh) else: next_refresh += self.refresh_config.error @@ -2294,7 +2307,7 @@ class NsWorker(threading.Thread): self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ # Check if vim status refresh is enabled again - self.update_vm_refresh() + self.update_vm_refresh(ro_task) # 0: get task_status_create lock_object = None task_status_create = None @@ -2355,7 +2368,7 @@ class NsWorker(threading.Thread): dependency_task = dependency_ro_task["tasks"][ dependency_task_index ] - self.logger.warning( + self.logger.debug( "dependency_ro_task={} dependency_task_index={}".format( dependency_ro_task, dependency_task_index ) @@ -2412,7 +2425,10 @@ class NsWorker(threading.Thread): ) if task["action"] == "DELETE": - (new_status, db_vim_info_update,) = self._delete_task( + ( + new_status, + db_vim_info_update, + ) = self._delete_task( ro_task, task_index, task_depends, db_ro_task_update ) new_status = ( @@ -2460,7 +2476,10 @@ class NsWorker(threading.Thread): else: refresh_at = ro_task["vim_info"]["refresh_at"] if refresh_at and refresh_at != -1 and now > refresh_at: - (new_status, db_vim_info_update,) = self.item2class[ + ( + new_status, + db_vim_info_update, + ) = self.item2class[ task["item"] ].refresh(ro_task) _update_refresh(new_status) @@ -2824,7 +2843,7 @@ class NsWorker(threading.Thread): """ ro_task = self._get_db_task() if ro_task: - self.logger.warning("Task to process: {}".format(ro_task)) + self.logger.debug("Task to process: {}".format(ro_task)) time.sleep(1) self._process_pending_tasks(ro_task) busy = True