X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=4a7de944c83aa833135023a1c7ee34c29865ec93;hb=06e914fba9a8f3b8861a98f175c6a6975882be00;hp=767382d99ee0f4c0cf007f585dedae53543cb859;hpb=42eb06f7a44e86a46c8310eccd81e1d628e649e4;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 767382d9..4a7de944 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -17,7 +17,7 @@ # ## -"""" +""" This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. The tasks are stored at database in table ro_tasks A single ro_task refers to a VIM element (flavor, image, network, ...). @@ -27,18 +27,20 @@ A ro_task can contain several 'tasks', each one with a target, where to store th from copy import deepcopy from http import HTTPStatus import logging -from os import mkdir +from os import makedirs +from os import path import queue -from shutil import rmtree import threading import time import traceback +from typing import Dict from unittest.mock import Mock from importlib_metadata import entry_points from osm_common.dbbase import DbException from osm_ng_ro.vim_admin import LockRenew -from osm_ro_plugin import sdnconn, vimconn +from osm_ro_plugin import sdnconn +from osm_ro_plugin import vimconn from osm_ro_plugin.sdn_dummy import SdnDummyConnector from osm_ro_plugin.vim_dummy import VimDummyConnector import yaml @@ -54,7 +56,7 @@ def deep_get(target_dict, *args, **kwargs): :param target_dict: dictionary to be read :param args: list of keys to read from target_dict :param kwargs: only can contain default=value to return if key is not present in the nested dictionary - :return: The wanted value if exist, None or default otherwise + :return: The wanted value if exists, None or default otherwise """ for key in args: if not isinstance(target_dict, dict) or key not in target_dict: @@ -343,9 +345,7 @@ class VimInteractionVdu(VimInteractionBase): task = ro_task["tasks"][task_index] task_id = task["task_id"] created = False - created_items = {} target_vim = self.my_vims[ro_task["target_id"]] - try: created = True params = task["params"] @@ -387,7 +387,6 @@ class VimInteractionVdu(VimInteractionBase): ) affinity_group["affinity_group_id"] = affinity_group_id - vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] @@ -415,6 +414,7 @@ class VimInteractionVdu(VimInteractionBase): return "BUILD", ro_vim_item_update except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) self.logger.error( "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) ) @@ -502,8 +502,10 @@ class VimInteractionVdu(VimInteractionBase): vim_info_info = yaml.safe_load(vim_info["vim_info"]) if vim_info_info.get("name"): vim_info["name"] = vim_info_info["name"] - except Exception: - pass + except Exception as vim_info_error: + self.logger.exception( + f"{vim_info_error} occured while getting the vim_info from yaml" + ) except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error( @@ -640,8 +642,11 @@ class VimInteractionImage(VimInteractionBase): try: # FIND + vim_image_id = "" if task.get("find_params"): - vim_images = target_vim.get_image_list(**task["find_params"]) + vim_images = target_vim.get_image_list( + task["find_params"].get("filter_dict", {}) + ) if not vim_images: raise NsWorkerExceptionNotFound( @@ -660,7 +665,7 @@ class VimInteractionImage(VimInteractionBase): ro_vim_item_update = { "vim_id": vim_image_id, - "vim_status": "DONE", + "vim_status": "ACTIVE", "created": created, "created_items": created_items, "vim_details": None, @@ -686,6 +691,112 @@ class VimInteractionImage(VimInteractionBase): return "FAILED", ro_vim_item_update +class VimInteractionSharedVolume(VimInteractionBase): + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + shared_volume_vim_id = ro_task["vim_info"]["vim_id"] + created_items = ro_task["vim_info"]["created_items"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + if created_items and created_items.get(shared_volume_vim_id).get("keep"): + ro_vim_item_update_ok = { + "vim_status": "ACTIVE", + "created": False, + "vim_message": None, + } + return "DONE", ro_vim_item_update_ok + try: + if shared_volume_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_shared_volumes(shared_volume_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-shared-volume={}: {}".format( + ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-shared-volume={} {}".format( + task_id, + ro_task["target_id"], + shared_volume_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + shared_volume_vim_id = None + shared_volume_data = None + + if task.get("params"): + shared_volume_data = task["params"] + + if shared_volume_data: + self.logger.info( + f"Creating the new shared_volume for {shared_volume_data}\n" + ) + ( + shared_volume_name, + shared_volume_vim_id, + ) = target_vim.new_shared_volumes(shared_volume_data) + created = True + created_items[shared_volume_vim_id] = { + "name": shared_volume_name, + "keep": shared_volume_data.get("keep"), + } + + ro_vim_item_update = { + "vim_id": shared_volume_vim_id, + "vim_status": "ACTIVE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} new-shared-volume={} created={}".format( + task_id, ro_task["target_id"], shared_volume_vim_id, created + ) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} new-shared-volume:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + class VimInteractionFlavor(VimInteractionBase): def delete(self, ro_task, task_index): task = ro_task["tasks"][task_index] @@ -734,17 +845,20 @@ class VimInteractionFlavor(VimInteractionBase): created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] - try: # FIND vim_flavor_id = None - if task.get("find_params"): + if task.get("find_params", {}).get("vim_flavor_id"): + vim_flavor_id = task["find_params"]["vim_flavor_id"] + elif task.get("find_params", {}).get("flavor_data"): try: flavor_data = task["find_params"]["flavor_data"] vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) - except vimconn.VimConnNotFoundException: - pass + except vimconn.VimConnNotFoundException as flavor_not_found_msg: + self.logger.warning( + f"VimConnNotFoundException occured: {flavor_not_found_msg}" + ) if not vim_flavor_id and task.get("params"): # CREATE @@ -754,7 +868,7 @@ class VimInteractionFlavor(VimInteractionBase): ro_vim_item_update = { "vim_id": vim_flavor_id, - "vim_status": "DONE", + "vim_status": "ACTIVE", "created": created, "created_items": created_items, "vim_details": None, @@ -832,6 +946,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): try: affinity_group_vim_id = None affinity_group_data = None + param_affinity_group_id = "" if task.get("params"): affinity_group_data = task["params"].get("affinity_group_data") @@ -860,7 +975,7 @@ class VimInteractionAffinityGroup(VimInteractionBase): ro_vim_item_update = { "vim_id": affinity_group_vim_id, - "vim_status": "DONE", + "vim_status": "ACTIVE", "created": created, "created_items": created_items, "vim_details": None, @@ -887,12 +1002,55 @@ class VimInteractionAffinityGroup(VimInteractionBase): return "FAILED", ro_vim_item_update +class VimInteractionUpdateVdu(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + vim_vm_id = "" + if task.get("params"): + vim_vm_id = task["params"].get("vim_vm_id") + action = task["params"].get("action") + context = {action: action} + target_vim.action_vminstance(vim_vm_id, context) + # created = True + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "ACTIVE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) + ) + return "DONE", ro_vim_item_update, db_task_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} VM Migration:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + class VimInteractionSdnNet(VimInteractionBase): @staticmethod def _match_pci(port_pci, mapping): """ - Check if port_pci matches with mapping - mapping can have brackets to indicate that several chars are accepted. e.g + Check if port_pci matches with mapping. + The mapping can have brackets to indicate that several chars are accepted. e.g pci '0000:af:10.1' matches with '0000:af:1[01].[1357]' :param port_pci: text :param mapping: text, can contain brackets to indicate several chars are available @@ -991,7 +1149,6 @@ class VimInteractionSdnNet(VimInteractionBase): return self.new(ro_task, task_create_index, None) def new(self, ro_task, task_index, task_depends): - task = ro_task["tasks"][task_index] task_id = task["task_id"] target_vim = self.my_vims[ro_task["target_id"]] @@ -1008,12 +1165,17 @@ class VimInteractionSdnNet(VimInteractionBase): try: # CREATE + db_vim = {} params = task["params"] - vlds_to_connect = params["vlds"] - associated_vim = params["target_vim"] + vlds_to_connect = params.get("vlds", []) + associated_vim = params.get("target_vim") # external additional ports additional_ports = params.get("sdn-ports") or () - _, _, vim_account_id = associated_vim.partition(":") + _, _, vim_account_id = ( + (None, None, None) + if associated_vim is None + else associated_vim.partition(":") + ) if associated_vim: # get associated VIM @@ -1291,6 +1453,7 @@ class VimInteractionMigration(VimInteractionBase): refreshed_vim_info = {} try: + vim_vm_id = "" if task.get("params"): vim_vm_id = task["params"].get("vim_vm_id") migrate_host = task["params"].get("migrate_host") @@ -1360,18 +1523,103 @@ class VimInteractionMigration(VimInteractionBase): return "FAILED", ro_vim_item_update, db_task_update -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class VimInteractionResize(VimInteractionBase): + def exec(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + db_task_update = {"retries": 0} + created = False + target_flavor_uuid = None + created_items = {} + refreshed_vim_info = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + params = task["params"] + params_copy = deepcopy(params) + target_flavor_uuid = task_depends[params_copy["flavor_id"]] + vim_vm_id = "" + if task.get("params"): + self.logger.info("vim_vm_id %s", vim_vm_id) + + if target_flavor_uuid is not None: + resized_status = target_vim.resize_instance( + vim_vm_id, target_flavor_uuid + ) + + if resized_status: + # Refresh VM to get new vim_info + vm_to_refresh_list = [vim_vm_id] + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + refreshed_vim_info = vim_dict[vim_vm_id] + + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "ACTIVE", + "created": created, + "created_items": created_items, + "vim_details": None, + "vim_message": None, + } + + if refreshed_vim_info and refreshed_vim_info.get("status") not in ( + "ERROR", + "VIM_ERROR", + ): + ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] + + self.logger.debug( + "task={} {} resize done".format(task_id, ro_task["target_id"]) + ) + return "DONE", ro_vim_item_update, db_task_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update, db_task_update + + +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + + @property + def error(self): + return self.conf["period"]["refresh_error"] + + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ - :param worker_index: thread index :param config: general configuration of RO, among others the process_id with the docker id where it runs :param plugins: global shared dict with the loaded plugins @@ -1383,7 +1631,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1394,6 +1644,9 @@ class NsWorker(threading.Thread): self.db = db self.item2class = { "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), + "shared-volumes": VimInteractionSharedVolume( + self.db, self.my_vims, self.db_vims, self.logger + ), "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), "image": VimInteractionImage( self.db, self.my_vims, self.db_vims, self.logger @@ -1404,12 +1657,18 @@ class NsWorker(threading.Thread): "sdn_net": VimInteractionSdnNet( self.db, self.my_vims, self.db_vims, self.logger ), + "update": VimInteractionUpdateVdu( + self.db, self.my_vims, self.db_vims, self.logger + ), "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( self.db, self.my_vims, self.db_vims, self.logger ), "migrate": VimInteractionMigration( self.db, self.my_vims, self.db_vims, self.logger ), + "verticalscale": VimInteractionResize( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None # lists of tasks to delete because nsrs or vnfrs has been deleted from db @@ -1437,7 +1696,7 @@ class NsWorker(threading.Thread): self.task_lock.release() return False - def _process_vim_config(self, target_id, db_vim): + def _process_vim_config(self, target_id: str, db_vim: dict) -> None: """ Process vim config, creating vim configuration files as ca_cert :param target_id: vim/sdn/wim + id @@ -1448,15 +1707,14 @@ class NsWorker(threading.Thread): return file_name = "" + work_dir = "/app/osm_ro/certs" try: if db_vim["config"].get("ca_cert_content"): - file_name = "{}:{}".format(target_id, self.worker_index) + file_name = f"{work_dir}/{target_id}:{self.worker_index}" - try: - mkdir(file_name) - except FileExistsError: - pass + if not path.isdir(file_name): + makedirs(file_name) file_name = file_name + "/ca_cert" @@ -1507,9 +1765,6 @@ class NsWorker(threading.Thread): self.vim_targets.remove(target_id) self.logger.info("Unloaded {}".format(target_id)) - rmtree("{}:{}".format(target_id, self.worker_index)) - except FileNotFoundError: - pass # this is raised by rmtree if folder does not exist except Exception as e: self.logger.error("Cannot unload {}: {}".format(target_id, e)) @@ -1533,6 +1788,7 @@ class NsWorker(threading.Thread): if target == "wim" else "sdns" ) + error_text = "" try: step = "Getting {} from db".format(target_id) @@ -1627,7 +1883,7 @@ class NsWorker(threading.Thread): """ Load or reload a vim_account, sdn_controller or wim_account. Read content from database, load the plugin if not loaded. - In case of error loading the plugin, it load a failing VIM_connector + In case of error loading the plugin, it loads a failing VIM_connector It fills self db_vims dictionary, my_vims dictionary and vim_targets list :param target_id: Contains type:_id; where type can be 'vim', ... :return: None if ok, descriptive text if error @@ -1642,9 +1898,9 @@ class NsWorker(threading.Thread): ) plugin_name = "" vim = None + step = "Getting {}={} from db".format(target, _id) try: - step = "Getting {}={} from db".format(target, _id) # TODO process for wim, sdnc, ... vim = self.db.get_one(target_database, {"_id": _id}) @@ -1681,14 +1937,17 @@ class NsWorker(threading.Thread): persistent_info={}, ) else: # sdn - plugin_name = "rosdn_" + vim["type"] + plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) step = "Loading plugin '{}'".format(plugin_name) vim_module_conn = self._load_plugin(plugin_name, "sdn") step = "Loading {}'".format(target_id) wim = deepcopy(vim) wim_config = wim.pop("config", {}) or {} wim["uuid"] = wim["_id"] - wim["wim_url"] = wim["url"] + if "url" in wim and "wim_url" not in wim: + wim["wim_url"] = wim["url"] + elif "url" not in wim and "wim_url" in wim: + wim["url"] = wim["wim_url"] if wim.get("dpid"): wim_config["dpid"] = wim.pop("dpid") @@ -1757,6 +2016,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -1790,128 +2050,6 @@ class NsWorker(threading.Thread): return None - def _get_db_all_tasks(self): - """ - Read all content of table ro_tasks to log it - :return: None - """ - try: - # Checking the content of the BD: - - # read and return - ro_task = self.db.get_list("ro_tasks") - for rt in ro_task: - self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS") - return ro_task - - except DbException as e: - self.logger.error("Database exception at _get_db_all_tasks: {}".format(e)) - except Exception as e: - self.logger.critical( - "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True - ) - - return None - - def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event): - """ - Generate a log with the following format: - - Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by; - target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id; - task_array_index;task_id;task_action;task_item;task_args - - Example: - - TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013; - 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a - ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False, - 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None, - 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED; - 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2; - CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}} - """ - try: - line = [] - i = 0 - if ro_task is not None and isinstance(ro_task, dict): - for t in ro_task["tasks"]: - line.clear() - line.append(mark) - line.append(event) - line.append(ro_task.get("_id", "")) - line.append(str(ro_task.get("locked_at", ""))) - line.append(str(ro_task.get("modified_at", ""))) - line.append(str(ro_task.get("created_at", ""))) - line.append(str(ro_task.get("to_check_at", ""))) - line.append(str(ro_task.get("locked_by", ""))) - line.append(str(ro_task.get("target_id", ""))) - line.append(str(ro_task.get("vim_info", {}).get("refresh_at", ""))) - line.append(str(ro_task.get("vim_info", ""))) - line.append(str(ro_task.get("tasks", ""))) - if isinstance(t, dict): - line.append(str(t.get("status", ""))) - line.append(str(t.get("action_id", ""))) - line.append(str(i)) - line.append(str(t.get("task_id", ""))) - line.append(str(t.get("action", ""))) - line.append(str(t.get("item", ""))) - line.append(str(t.get("find_params", ""))) - line.append(str(t.get("params", ""))) - else: - line.extend([""] * 2) - line.append(str(i)) - line.extend([""] * 5) - - i += 1 - self.logger.debug(";".join(line)) - elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict): - i = 0 - while True: - st = "tasks.{}.status".format(i) - if st not in db_ro_task_update: - break - line.clear() - line.append(mark) - line.append(event) - line.append(db_ro_task_update.get("_id", "")) - line.append(str(db_ro_task_update.get("locked_at", ""))) - line.append(str(db_ro_task_update.get("modified_at", ""))) - line.append("") - line.append(str(db_ro_task_update.get("to_check_at", ""))) - line.append(str(db_ro_task_update.get("locked_by", ""))) - line.append("") - line.append(str(db_ro_task_update.get("vim_info.refresh_at", ""))) - line.append("") - line.append(str(db_ro_task_update.get("vim_info", ""))) - line.append(str(str(db_ro_task_update).count(".status"))) - line.append(db_ro_task_update.get(st, "")) - line.append("") - line.append(str(i)) - line.extend([""] * 3) - i += 1 - self.logger.debug(";".join(line)) - - elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict): - line.clear() - line.append(mark) - line.append(event) - line.append(db_ro_task_delete.get("_id", "")) - line.append("") - line.append(db_ro_task_delete.get("modified_at", "")) - line.extend([""] * 13) - self.logger.debug(";".join(line)) - - else: - line.clear() - line.append(mark) - line.append(event) - line.extend([""] * 16) - self.logger.debug(";".join(line)) - - except Exception as e: - self.logger.error("Error logging ro_task: {}".format(e)) - def _delete_task(self, ro_task, task_index, task_depends, db_update): """ Determine if this task need to be done or superseded @@ -1923,7 +2061,7 @@ class NsWorker(threading.Thread): "created_items", False ) - self.logger.warning("Needed delete: {}".format(needed_delete)) + self.logger.debug("Needed delete: {}".format(needed_delete)) if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? @@ -1947,7 +2085,7 @@ class NsWorker(threading.Thread): needed_delete = False if needed_delete: - self.logger.warning( + self.logger.debug( "Deleting ro_task={} task_index={}".format(ro_task, task_index) ) return self.item2class[my_task["item"]].delete(ro_task, task_index) @@ -1971,7 +2109,6 @@ class NsWorker(threading.Thread): """ my_task = ro_task["tasks"][task_index] task_id = my_task["task_id"] - task_status = None if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? @@ -2052,13 +2189,73 @@ class NsWorker(threading.Thread): fail_on_empty=False, ) - self.logger.warning("ro_task_dependency={}".format(ro_task_dependency)) + self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) if ro_task_dependency: for task_index, task in enumerate(ro_task_dependency["tasks"]): if task["task_id"] == task_id: return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self, ro_task): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and then updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + next_refresh = self._get_next_refresh(ro_task, next_refresh) + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + + def _get_next_refresh(self, ro_task: dict, next_refresh: float): + """Decide the next_refresh according to vim type and refresh config period. + Args: + ro_task (dict): ro_task details + next_refresh (float): next refresh time as epoch format + + Returns: + next_refresh (float) -1 if vm updates are disabled or vim type is openstack. + """ + target_vim = ro_task["target_id"] + vim_type = self.db_vims[target_vim]["vim_type"] + if self.refresh_config.active == -1 or vim_type == "openstack": + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + return next_refresh + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -2076,13 +2273,13 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + next_refresh = self._get_next_refresh(ro_task, next_refresh) else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh @@ -2094,6 +2291,8 @@ class NsWorker(threading.Thread): if self.logger.getEffectiveLevel() == logging.DEBUG: self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ + # Check if vim status refresh is enabled again + self.update_vm_refresh(ro_task) # 0: get task_status_create lock_object = None task_status_create = None @@ -2139,6 +2338,7 @@ class NsWorker(threading.Thread): task_path = "tasks.{}.status".format(task_index) try: db_vim_info_update = None + dependency_ro_task = {} if task["status"] == "SCHEDULED": # check if tasks that this depends on have been completed @@ -2154,7 +2354,7 @@ class NsWorker(threading.Thread): dependency_task = dependency_ro_task["tasks"][ dependency_task_index ] - self.logger.warning( + self.logger.debug( "dependency_ro_task={} dependency_task_index={}".format( dependency_ro_task, dependency_task_index ) @@ -2209,9 +2409,11 @@ class NsWorker(threading.Thread): lock_object = LockRenew.add_lock_object( "ro_tasks", ro_task, self ) - if task["action"] == "DELETE": - (new_status, db_vim_info_update,) = self._delete_task( + ( + new_status, + db_vim_info_update, + ) = self._delete_task( ro_task, task_index, task_depends, db_ro_task_update ) new_status = ( @@ -2254,14 +2456,14 @@ class NsWorker(threading.Thread): new_status, db_vim_info_update = self.item2class[ task["item"] ].new(ro_task, task_index, task_depends) - # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): - new_status, db_vim_info_update = self.item2class[ + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: + ( + new_status, + db_vim_info_update, + ) = self.item2class[ task["item"] ].refresh(ro_task) _update_refresh(new_status) @@ -2337,7 +2539,7 @@ class NsWorker(threading.Thread): lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time, ] - # locked_at contains two times to avoid race condition. In case the lock has been renew, it will + # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will # contain exactly locked_at + self.task_locked_time LockRenew.remove_lock_object(lock_object) @@ -2349,7 +2551,7 @@ class NsWorker(threading.Thread): # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, # outside this task (by ro_nbi) do not update it db_ro_task_update["locked_by"] = None - # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked + # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked db_ro_task_update["locked_at"] = int(now - self.task_locked_time) db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at @@ -2625,7 +2827,7 @@ class NsWorker(threading.Thread): """ ro_task = self._get_db_task() if ro_task: - self.logger.warning("Task to process: {}".format(ro_task)) + self.logger.debug("Task to process: {}".format(ro_task)) time.sleep(1) self._process_pending_tasks(ro_task) busy = True