X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=6336d9e60f50d5174228ec788f8016614f3f44a9;hb=93f4addd7625efbb4e0e9042c65908e6fb5714f6;hp=94af5a17836364606f551e73682296e13a6dcfcd;hpb=884862293ce112b057813f5811ac4f6adbb5f8c2;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 94af5a17..6336d9e6 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -24,24 +24,25 @@ A single ro_task refers to a VIM element (flavor, image, network, ...). A ro_task can contain several 'tasks', each one with a target, where to store the results """ -import logging -import queue -import threading -import time -import yaml from copy import deepcopy from http import HTTPStatus +import logging from os import mkdir -from pkg_resources import iter_entry_points +import queue from shutil import rmtree +import threading +import time +import traceback +from typing import Dict from unittest.mock import Mock -# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version +from importlib_metadata import entry_points from osm_common.dbbase import DbException -from osm_ro_plugin.vim_dummy import VimDummyConnector -from osm_ro_plugin.sdn_dummy import SdnDummyConnector -from osm_ro_plugin import vimconn, sdnconn from osm_ng_ro.vim_admin import LockRenew +from osm_ro_plugin import sdnconn, vimconn +from osm_ro_plugin.sdn_dummy import SdnDummyConnector +from osm_ro_plugin.vim_dummy import VimDummyConnector +import yaml __author__ = "Alfonso Tierno" @@ -125,6 +126,8 @@ class VimInteractionNet(VimInteractionBase): created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] + mgmtnet = False + mgmtnet_defined_in_vim = False try: # FIND @@ -132,13 +135,15 @@ class VimInteractionNet(VimInteractionBase): # if management, get configuration of VIM if task["find_params"].get("filter_dict"): vim_filter = task["find_params"]["filter_dict"] - # mamagement network + # management network elif task["find_params"].get("mgmt"): + mgmtnet = True if deep_get( self.db_vims[ro_task["target_id"]], "config", "management_network_id", ): + mgmtnet_defined_in_vim = True vim_filter = { "id": self.db_vims[ro_task["target_id"]]["config"][ "management_network_id" @@ -149,6 +154,7 @@ class VimInteractionNet(VimInteractionBase): "config", "management_network_name", ): + mgmtnet_defined_in_vim = True vim_filter = { "name": self.db_vims[ro_task["target_id"]]["config"][ "management_network_name" @@ -163,11 +169,29 @@ class VimInteractionNet(VimInteractionBase): vim_nets = target_vim.get_network_list(vim_filter) if not vim_nets and not task.get("params"): - raise NsWorkerExceptionNotFound( - "Network not found with this criteria: '{}'".format( - task.get("find_params") + # If there is mgmt-network in the descriptor, + # there is no mapping of that network to a VIM network in the descriptor, + # also there is no mapping in the "--config" parameter or at VIM creation; + # that mgmt-network will be created. + if mgmtnet and not mgmtnet_defined_in_vim: + net_name = ( + vim_filter.get("name") + if vim_filter.get("name") + else vim_filter.get("id")[:16] + ) + vim_net_id, created_items = target_vim.new_network( + net_name, None + ) + self.logger.debug( + "Created mgmt network vim_net_id: {}".format(vim_net_id) + ) + created = True + else: + raise NsWorkerExceptionNotFound( + "Network not found with this criteria: '{}'".format( + task.get("find_params") + ) ) - ) elif len(vim_nets) > 1: raise NsWorkerException( "More than one network found with this criteria: '{}'".format( @@ -348,6 +372,23 @@ class VimInteractionVdu(VimInteractionBase): if params_copy["flavor_id"].startswith("TASK-"): params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] + affinity_group_list = params_copy["affinity_group_list"] + for affinity_group in affinity_group_list: + # change task_id into affinity_group_id + if "affinity_group_id" in affinity_group and affinity_group[ + "affinity_group_id" + ].startswith("TASK-"): + affinity_group_id = task_depends[ + affinity_group["affinity_group_id"] + ] + + if not affinity_group_id: + raise NsWorkerException( + "found for {}".format(affinity_group["affinity_group_id"]) + ) + + affinity_group["affinity_group_id"] = affinity_group_id + vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] @@ -448,8 +489,10 @@ class VimInteractionVdu(VimInteractionBase): vim_info_info = yaml.safe_load(vim_info["vim_info"]) if vim_info_info.get("name"): vim_info["name"] = vim_info_info["name"] - except Exception: - pass + except Exception as vim_info_error: + self.logger.exception( + f"{vim_info_error} occured while getting the vim_info from yaml" + ) except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error( @@ -557,6 +600,7 @@ class VimInteractionVdu(VimInteractionBase): except (vimconn.VimConnException, NsWorkerException) as e: retries += 1 + self.logger.debug(traceback.format_exc()) if retries < self.max_retries_inject_ssh_key: return ( "BUILD", @@ -596,7 +640,7 @@ class VimInteractionImage(VimInteractionBase): ) elif len(vim_images) > 1: raise NsWorkerException( - "More than one network found with this criteria: '{}'".format( + "More than one image found with this criteria: '{}'".format( task["find_params"] ) ) @@ -688,7 +732,7 @@ class VimInteractionFlavor(VimInteractionBase): flavor_data = task["find_params"]["flavor_data"] vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) except vimconn.VimConnNotFoundException: - pass + self.logger.exception("VimConnNotFoundException occured.") if not vim_flavor_id and task.get("params"): # CREATE @@ -723,6 +767,112 @@ class VimInteractionFlavor(VimInteractionBase): return "FAILED", ro_vim_item_update +class VimInteractionAffinityGroup(VimInteractionBase): + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + affinity_group_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + + try: + if affinity_group_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_affinity_group(affinity_group_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( + ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-affinity-or-anti-affinity-group={} {}".format( + task_id, + ro_task["target_id"], + affinity_group_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + + try: + affinity_group_vim_id = None + affinity_group_data = None + + if task.get("params"): + affinity_group_data = task["params"].get("affinity_group_data") + + if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): + try: + param_affinity_group_id = task["params"]["affinity_group_data"].get( + "vim-affinity-group-id" + ) + affinity_group_vim_id = target_vim.get_affinity_group( + param_affinity_group_id + ).get("id") + except vimconn.VimConnNotFoundException: + self.logger.error( + "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" + "could not be found at VIM. Creating a new one.".format( + task_id, ro_task["target_id"], param_affinity_group_id + ) + ) + + if not affinity_group_vim_id and affinity_group_data: + affinity_group_vim_id = target_vim.new_affinity_group( + affinity_group_data + ) + created = True + + ro_vim_item_update = { + "vim_id": affinity_group_vim_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + } + self.logger.debug( + "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( + task_id, ro_task["target_id"], affinity_group_vim_id, created + ) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error( + "task={} vim={} new-affinity-or-anti-affinity-group:" + " {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + + return "FAILED", ro_vim_item_update + + class VimInteractionSdnNet(VimInteractionBase): @staticmethod def _match_pci(port_pci, mapping): @@ -827,7 +977,6 @@ class VimInteractionSdnNet(VimInteractionBase): return self.new(ro_task, task_create_index, None) def new(self, ro_task, task_index, task_depends): - task = ro_task["tasks"][task_index] task_id = task["task_id"] target_vim = self.my_vims[ro_task["target_id"]] @@ -845,11 +994,15 @@ class VimInteractionSdnNet(VimInteractionBase): try: # CREATE params = task["params"] - vlds_to_connect = params["vlds"] - associated_vim = params["target_vim"] + vlds_to_connect = params.get("vlds", []) + associated_vim = params.get("target_vim") # external additional ports additional_ports = params.get("sdn-ports") or () - _, _, vim_account_id = associated_vim.partition(":") + _, _, vim_account_id = ( + (None, None, None) + if associated_vim is None + else associated_vim.partition(":") + ) if associated_vim: # get associated VIM @@ -1114,15 +1267,39 @@ class VimInteractionSdnNet(VimInteractionBase): return "DONE", ro_vim_item_update_ok -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + + @property + def error(self): + return self.conf["period"]["refresh_error"] + + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ @@ -1137,7 +1314,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1158,6 +1337,9 @@ class NsWorker(threading.Thread): "sdn_net": VimInteractionSdnNet( self.db, self.my_vims, self.db_vims, self.logger ), + "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None # lists of tasks to delete because nsrs or vnfrs has been deleted from db @@ -1204,7 +1386,9 @@ class NsWorker(threading.Thread): try: mkdir(file_name) except FileExistsError: - pass + self.logger.exception( + "FileExistsError occured while processing vim_config." + ) file_name = file_name + "/ca_cert" @@ -1229,8 +1413,8 @@ class NsWorker(threading.Thread): return self.plugins[name] try: - for v in iter_entry_points("osm_ro{}.plugins".format(type), name): - self.plugins[name] = v.load() + for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): + self.plugins[name] = ep.load() except Exception as e: raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) @@ -1257,7 +1441,8 @@ class NsWorker(threading.Thread): self.logger.info("Unloaded {}".format(target_id)) rmtree("{}:{}".format(target_id, self.worker_index)) except FileNotFoundError: - pass # this is raised by rmtree if folder does not exist + # This is raised by rmtree if folder does not exist. + self.logger.exception("FileNotFoundError occured while unloading VIM.") except Exception as e: self.logger.error("Cannot unload {}: {}".format(target_id, e)) @@ -1429,14 +1614,17 @@ class NsWorker(threading.Thread): persistent_info={}, ) else: # sdn - plugin_name = "rosdn_" + vim["type"] + plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) step = "Loading plugin '{}'".format(plugin_name) vim_module_conn = self._load_plugin(plugin_name, "sdn") step = "Loading {}'".format(target_id) wim = deepcopy(vim) wim_config = wim.pop("config", {}) or {} wim["uuid"] = wim["_id"] - wim["wim_url"] = wim["url"] + if "url" in wim and "wim_url" not in wim: + wim["wim_url"] = wim["url"] + elif "url" not in wim and "wim_url" in wim: + wim["url"] = wim["wim_url"] if wim.get("dpid"): wim_config["dpid"] = wim.pop("dpid") @@ -1480,6 +1668,24 @@ class NsWorker(threading.Thread): try: while True: + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task( + None, + None, + None, + "TASK_WF", + "task_locked_time=" + + str(self.task_locked_time) + + " " + + "time_last_task_processed=" + + str(self.time_last_task_processed) + + " " + + "now=" + + str(now), + ) + """ locked = self.db.set_one( "ro_tasks", q_filter={ @@ -1487,6 +1693,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -1520,6 +1727,128 @@ class NsWorker(threading.Thread): return None + def _get_db_all_tasks(self): + """ + Read all content of table ro_tasks to log it + :return: None + """ + try: + # Checking the content of the BD: + + # read and return + ro_task = self.db.get_list("ro_tasks") + for rt in ro_task: + self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS") + return ro_task + + except DbException as e: + self.logger.error("Database exception at _get_db_all_tasks: {}".format(e)) + except Exception as e: + self.logger.critical( + "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True + ) + + return None + + def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event): + """ + Generate a log with the following format: + + Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by; + target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id; + task_array_index;task_id;task_action;task_item;task_args + + Example: + + TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013; + 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a + ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False, + 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None, + 'vim_details': None, 'refresh_at': None};1;SCHEDULED; + 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2; + CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}} + """ + try: + line = [] + i = 0 + if ro_task is not None and isinstance(ro_task, dict): + for t in ro_task["tasks"]: + line.clear() + line.append(mark) + line.append(event) + line.append(ro_task.get("_id", "")) + line.append(str(ro_task.get("locked_at", ""))) + line.append(str(ro_task.get("modified_at", ""))) + line.append(str(ro_task.get("created_at", ""))) + line.append(str(ro_task.get("to_check_at", ""))) + line.append(str(ro_task.get("locked_by", ""))) + line.append(str(ro_task.get("target_id", ""))) + line.append(str(ro_task.get("vim_info", {}).get("refresh_at", ""))) + line.append(str(ro_task.get("vim_info", ""))) + line.append(str(ro_task.get("tasks", ""))) + if isinstance(t, dict): + line.append(str(t.get("status", ""))) + line.append(str(t.get("action_id", ""))) + line.append(str(i)) + line.append(str(t.get("task_id", ""))) + line.append(str(t.get("action", ""))) + line.append(str(t.get("item", ""))) + line.append(str(t.get("find_params", ""))) + line.append(str(t.get("params", ""))) + else: + line.extend([""] * 2) + line.append(str(i)) + line.extend([""] * 5) + + i += 1 + self.logger.debug(";".join(line)) + elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict): + i = 0 + while True: + st = "tasks.{}.status".format(i) + if st not in db_ro_task_update: + break + line.clear() + line.append(mark) + line.append(event) + line.append(db_ro_task_update.get("_id", "")) + line.append(str(db_ro_task_update.get("locked_at", ""))) + line.append(str(db_ro_task_update.get("modified_at", ""))) + line.append("") + line.append(str(db_ro_task_update.get("to_check_at", ""))) + line.append(str(db_ro_task_update.get("locked_by", ""))) + line.append("") + line.append(str(db_ro_task_update.get("vim_info.refresh_at", ""))) + line.append("") + line.append(str(db_ro_task_update.get("vim_info", ""))) + line.append(str(str(db_ro_task_update).count(".status"))) + line.append(db_ro_task_update.get(st, "")) + line.append("") + line.append(str(i)) + line.extend([""] * 3) + i += 1 + self.logger.debug(";".join(line)) + + elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict): + line.clear() + line.append(mark) + line.append(event) + line.append(db_ro_task_delete.get("_id", "")) + line.append("") + line.append(db_ro_task_delete.get("modified_at", "")) + line.extend([""] * 13) + self.logger.debug(";".join(line)) + + else: + line.clear() + line.append(mark) + line.append(event) + line.extend([""] * 16) + self.logger.debug(";".join(line)) + + except Exception as e: + self.logger.error("Error logging ro_task: {}".format(e)) + def _delete_task(self, ro_task, task_index, task_depends, db_update): """ Determine if this task need to be done or superseded @@ -1662,6 +1991,66 @@ class NsWorker(threading.Thread): return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self, ro_task): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and then updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + next_refresh = self._get_next_refresh(ro_task, next_refresh) + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + + def _get_next_refresh(self, ro_task: dict, next_refresh: float): + """Decide the next_refresh according to vim type and refresh config period. + Args: + ro_task (dict): ro_task details + next_refresh (float): next refresh time as epoch format + + Returns: + next_refresh (float) -1 if vm updates are disabled or vim type is openstack. + """ + target_vim = ro_task["target_id"] + vim_type = self.db_vims[target_vim]["vim_type"] + if self.refresh_config.active == -1 or vim_type == "openstack": + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + return next_refresh + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -1679,19 +2068,26 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + next_refresh = self._get_next_refresh(ro_task, next_refresh) else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh ro_task["vim_info"]["refresh_at"] = next_refresh try: + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") + """ + # Check if vim status refresh is enabled again + self.update_vm_refresh(ro_task) # 0: get task_status_create lock_object = None task_status_create = None @@ -1799,7 +2195,10 @@ class NsWorker(threading.Thread): ) if task["action"] == "DELETE": - (new_status, db_vim_info_update,) = self._delete_task( + ( + new_status, + db_vim_info_update, + ) = self._delete_task( ro_task, task_index, task_depends, db_ro_task_update ) new_status = ( @@ -1845,14 +2244,17 @@ class NsWorker(threading.Thread): # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: new_status, db_vim_info_update = self.item2class[ task["item"] ].refresh(ro_task) _update_refresh(new_status) + else: + # The refresh is updated to avoid set the value of "refresh_at" to + # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, + # because it can happen that in this case the task is never processed + _update_refresh(task["status"]) except Exception as e: new_status = "FAILED" @@ -1937,6 +2339,14 @@ class NsWorker(threading.Thread): db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + db_ro_task_update_log = db_ro_task_update.copy() + db_ro_task_update_log["_id"] = q_filter["_id"] + self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") + """ + if not self.db.set_one( "ro_tasks", update_dict=db_ro_task_update, @@ -1945,6 +2355,17 @@ class NsWorker(threading.Thread): ): del db_ro_task_update["to_check_at"] del q_filter["to_check_at"] + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + self._log_ro_task( + None, + db_ro_task_update_log, + None, + "TASK_WF", + "SET_TASK " + str(q_filter), + ) + """ self.db.set_one( "ro_tasks", q_filter=q_filter, @@ -2159,6 +2580,11 @@ class NsWorker(threading.Thread): if self.tasks_to_delete: self._process_delete_db_tasks() busy = False + """ + # Log RO tasks only when loglevel is DEBUG + if self.logger.getEffectiveLevel() == logging.DEBUG: + _ = self._get_db_all_tasks() + """ ro_task = self._get_db_task() if ro_task: self._process_pending_tasks(ro_task)