X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=681128ec34006ea4c20a1419c3358645394f8c8a;hp=0ec845257225c62592fb91d6ea1bd6daad265d4f;hb=becd083422f214e1480a8ecfadc748af2ea41703;hpb=55fa0bb312eebc5899abca6124b2114db1682f7b diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 0ec84525..681128ec 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -24,24 +24,25 @@ A single ro_task refers to a VIM element (flavor, image, network, ...). A ro_task can contain several 'tasks', each one with a target, where to store the results """ +import logging +import queue import threading import time -import queue -import logging import yaml -from pkg_resources import iter_entry_points +from copy import deepcopy +from http import HTTPStatus +from os import mkdir +from importlib_metadata import entry_points +from shutil import rmtree +from unittest.mock import Mock + # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version from osm_common.dbbase import DbException -# from osm_common.fsbase import FsException -# from osm_common.msgbase import MsgException from osm_ro_plugin.vim_dummy import VimDummyConnector from osm_ro_plugin.sdn_dummy import SdnDummyConnector from osm_ro_plugin import vimconn, sdnconn -from copy import deepcopy -from unittest.mock import Mock -from http import HTTPStatus -from os import mkdir -from shutil import rmtree +from osm_ng_ro.vim_admin import LockRenew + __author__ = "Alfonso Tierno" __date__ = "$28-Sep-2017 12:07:15$" @@ -70,12 +71,18 @@ class NsWorkerException(Exception): class FailingConnector: def __init__(self, error_msg): self.error_msg = error_msg + for method in dir(vimconn.VimConnector): if method[0] != "_": - setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg))) + setattr( + self, method, Mock(side_effect=vimconn.VimConnException(error_msg)) + ) + for method in dir(sdnconn.SdnConnectorBase): if method[0] != "_": - setattr(self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))) + setattr( + self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)) + ) class NsWorkerExceptionNotFound(NsWorkerException): @@ -83,8 +90,9 @@ class NsWorkerExceptionNotFound(NsWorkerException): class VimInteractionBase: - """ Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... + """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... It implements methods that does nothing and return ok""" + def __init__(self, db, my_vims, db_vims, logger): self.db = db self.logger = logger @@ -98,6 +106,7 @@ class VimInteractionBase: """skip calling VIM to get image, flavor status. Assumes ok""" if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": return "FAILED", {} + return "DONE", {} def delete(self, ro_task, task_index): @@ -109,7 +118,6 @@ class VimInteractionBase: class VimInteractionNet(VimInteractionBase): - def new(self, ro_task, task_index, task_depends): vim_net_id = None task = ro_task["tasks"][task_index] @@ -117,29 +125,56 @@ class VimInteractionNet(VimInteractionBase): created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] + try: # FIND if task.get("find_params"): # if management, get configuration of VIM if task["find_params"].get("filter_dict"): vim_filter = task["find_params"]["filter_dict"] - elif task["find_params"].get("mgmt"): # mamagement network - if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"): - vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]} - elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"): - vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]} + # mamagement network + elif task["find_params"].get("mgmt"): + if deep_get( + self.db_vims[ro_task["target_id"]], + "config", + "management_network_id", + ): + vim_filter = { + "id": self.db_vims[ro_task["target_id"]]["config"][ + "management_network_id" + ] + } + elif deep_get( + self.db_vims[ro_task["target_id"]], + "config", + "management_network_name", + ): + vim_filter = { + "name": self.db_vims[ro_task["target_id"]]["config"][ + "management_network_name" + ] + } else: vim_filter = {"name": task["find_params"]["name"]} else: - raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"])) + raise NsWorkerExceptionNotFound( + "Invalid find_params for new_net {}".format(task["find_params"]) + ) vim_nets = target_vim.get_network_list(vim_filter) if not vim_nets and not task.get("params"): - raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format( - task.get("find_params"))) + raise NsWorkerExceptionNotFound( + "Network not found with this criteria: '{}'".format( + task.get("find_params") + ) + ) elif len(vim_nets) > 1: raise NsWorkerException( - "More than one network found with this criteria: '{}'".format(task["find_params"])) + "More than one network found with this criteria: '{}'".format( + task["find_params"] + ) + ) + if vim_nets: vim_net_id = vim_nets[0]["id"] else: @@ -148,31 +183,43 @@ class VimInteractionNet(VimInteractionBase): vim_net_id, created_items = target_vim.new_network(**params) created = True - ro_vim_item_update = {"vim_id": vim_net_id, - "vim_status": "BUILD", - "created": created, - "created_items": created_items, - "vim_details": None} + ro_vim_item_update = { + "vim_id": vim_net_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None, + } self.logger.debug( - "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created)) + "task={} {} new-net={} created={}".format( + task_id, ro_task["target_id"], vim_net_id, created + ) + ) + return "BUILD", ro_vim_item_update except (vimconn.VimConnException, NsWorkerException) as e: - self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "created": created, - "vim_details": str(e)} + self.logger.error( + "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + return "FAILED", ro_vim_item_update def refresh(self, ro_task): """Call VIM to get network status""" ro_task_id = ro_task["_id"] target_vim = self.my_vims[ro_task["target_id"]] - vim_id = ro_task["vim_info"]["vim_id"] net_to_refresh_list = [vim_id] + try: vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": task_status = "DONE" elif vim_info["status"] == "BUILD": @@ -181,15 +228,21 @@ class VimInteractionNet(VimInteractionBase): task_status = "FAILED" except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status - self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + self.logger.error( + "ro_task={} vim={} get-net={}: {}".format( + ro_task_id, ro_task["target_id"], vim_id, e + ) + ) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} task_status = "FAILED" ro_vim_item_update = {} if ro_task["vim_info"]["vim_status"] != vim_info["status"]: ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): ro_vim_item_update["vim_details"] = vim_info.get("error_msg") @@ -199,43 +252,69 @@ class VimInteractionNet(VimInteractionBase): else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: - self.logger.debug("ro_task={} {} get-net={}: status={} {}".format( - ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + self.logger.debug( + "ro_task={} {} get-net={}: status={} {}".format( + ro_task_id, + ro_task["target_id"], + vim_id, + ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") + if ro_vim_item_update.get("vim_status") != "ACTIVE" + else "", + ) + ) + return task_status, ro_vim_item_update def delete(self, ro_task, task_index): task = ro_task["tasks"][task_index] task_id = task["task_id"] net_vim_id = ro_task["vim_info"]["vim_id"] - ro_vim_item_update_ok = {"vim_status": "DELETED", - "created": False, - "vim_details": "DELETED", - "vim_id": None} + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + try: if net_vim_id or ro_task["vim_info"]["created_items"]: target_vim = self.my_vims[ro_task["target_id"]] - target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"]) - + target_vim.delete_network( + net_vim_id, ro_task["vim_info"]["created_items"] + ) except vimconn.VimConnNotFoundException: ro_vim_item_update_ok["vim_details"] = "already deleted" - except vimconn.VimConnException as e: - self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"], - net_vim_id, e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e)} + self.logger.error( + "ro_task={} vim={} del-net={}: {}".format( + ro_task["_id"], ro_task["target_id"], net_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + return "FAILED", ro_vim_item_update - self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id, - ro_vim_item_update_ok.get("vim_details", ""))) + self.logger.debug( + "task={} {} del-net={} {}".format( + task_id, + ro_task["target_id"], + net_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + return "DONE", ro_vim_item_update_ok class VimInteractionVdu(VimInteractionBase): - max_retries_inject_ssh_key = 20 # 20 times - time_retries_inject_ssh_key = 30 # wevery 30 seconds + max_retries_inject_ssh_key = 20 # 20 times + time_retries_inject_ssh_key = 30 # wevery 30 seconds def new(self, ro_task, task_index, task_depends): task = ro_task["tasks"][task_index] @@ -243,89 +322,127 @@ class VimInteractionVdu(VimInteractionBase): created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] + try: created = True params = task["params"] params_copy = deepcopy(params) net_list = params_copy["net_list"] + for net in net_list: - if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id + # change task_id into network_id + if "net_id" in net and net["net_id"].startswith("TASK-"): network_id = task_depends[net["net_id"]] + if not network_id: - raise NsWorkerException("Cannot create VM because depends on a network not created or found " - "for {}".format(net["net_id"])) + raise NsWorkerException( + "Cannot create VM because depends on a network not created or found " + "for {}".format(net["net_id"]) + ) + net["net_id"] = network_id + if params_copy["image_id"].startswith("TASK-"): params_copy["image_id"] = task_depends[params_copy["image_id"]] + if params_copy["flavor_id"].startswith("TASK-"): params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] - ro_vim_item_update = {"vim_id": vim_vm_id, - "vim_status": "BUILD", - "created": created, - "created_items": created_items, - "vim_details": None, - "interfaces_vim_ids": interfaces, - "interfaces": [], - } + ro_vim_item_update = { + "vim_id": vim_vm_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None, + "interfaces_vim_ids": interfaces, + "interfaces": [], + } self.logger.debug( - "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created)) + "task={} {} new-vm={} created={}".format( + task_id, ro_task["target_id"], vim_vm_id, created + ) + ) + return "BUILD", ro_vim_item_update except (vimconn.VimConnException, NsWorkerException) as e: - self.logger.error("task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "created": created, - "vim_details": str(e)} + self.logger.error( + "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + return "FAILED", ro_vim_item_update def delete(self, ro_task, task_index): task = ro_task["tasks"][task_index] task_id = task["task_id"] vm_vim_id = ro_task["vim_info"]["vim_id"] - ro_vim_item_update_ok = {"vim_status": "DELETED", - "created": False, - "vim_details": "DELETED", - "vim_id": None} + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + try: if vm_vim_id or ro_task["vim_info"]["created_items"]: target_vim = self.my_vims[ro_task["target_id"]] - target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"]) - + target_vim.delete_vminstance( + vm_vim_id, ro_task["vim_info"]["created_items"] + ) except vimconn.VimConnNotFoundException: ro_vim_item_update_ok["vim_details"] = "already deleted" - except vimconn.VimConnException as e: - self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"], - vm_vim_id, e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e)} + self.logger.error( + "ro_task={} vim={} del-vm={}: {}".format( + ro_task["_id"], ro_task["target_id"], vm_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + return "FAILED", ro_vim_item_update - self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id, - ro_vim_item_update_ok.get("vim_details", ""))) + self.logger.debug( + "task={} {} del-vm={} {}".format( + task_id, + ro_task["target_id"], + vm_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + return "DONE", ro_vim_item_update_ok def refresh(self, ro_task): """Call VIM to get vm status""" ro_task_id = ro_task["_id"] target_vim = self.my_vims[ro_task["target_id"]] - vim_id = ro_task["vim_info"]["vim_id"] + if not vim_id: return None, None + vm_to_refresh_list = [vim_id] try: vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": task_status = "DONE" elif vim_info["status"] == "BUILD": task_status = "BUILD" else: task_status = "FAILED" + # try to load and parse vim_information try: vim_info_info = yaml.safe_load(vim_info["vim_info"]) @@ -335,34 +452,57 @@ class VimInteractionVdu(VimInteractionBase): pass except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status - self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + self.logger.error( + "ro_task={} vim={} get-vm={}: {}".format( + ro_task_id, ro_task["target_id"], vim_id, e + ) + ) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} task_status = "FAILED" ro_vim_item_update = {} + # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED vim_interfaces = [] if vim_info.get("interfaces"): for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: - iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), - None) + iface = next( + ( + iface + for iface in vim_info["interfaces"] + if vim_iface_id == iface["vim_interface_id"] + ), + None, + ) # if iface: # iface.pop("vim_info", None) vim_interfaces.append(iface) - task_create = next(t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] != "FINISHED") + task_create = next( + t + for t in ro_task["tasks"] + if t and t["action"] == "CREATE" and t["status"] != "FINISHED" + ) if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None: - vim_interfaces[task_create["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True - mgmt_vdu_iface = task_create.get("mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)) + vim_interfaces[task_create["mgmt_vnf_interface"]][ + "mgmt_vnf_interface" + ] = True + + mgmt_vdu_iface = task_create.get( + "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0) + ) if vim_interfaces: vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True if ro_task["vim_info"]["interfaces"] != vim_interfaces: ro_vim_item_update["interfaces"] = vim_interfaces + if ro_task["vim_info"]["vim_status"] != vim_info["status"]: ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"): ro_vim_item_update["vim_details"] = vim_info.get("error_msg") @@ -372,10 +512,20 @@ class VimInteractionVdu(VimInteractionBase): else: if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: - self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format( - ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), - ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + self.logger.debug( + "ro_task={} {} get-vm={}: status={} {}".format( + ro_task_id, + ro_task["target_id"], + vim_id, + ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") + if ro_vim_item_update.get("vim_status") != "ACTIVE" + else "", + ) + ) + return task_status, ro_vim_item_update def exec(self, ro_task, task_index, task_depends): @@ -384,89 +534,142 @@ class VimInteractionVdu(VimInteractionBase): target_vim = self.my_vims[ro_task["target_id"]] db_task_update = {"retries": 0} retries = task.get("retries", 0) + try: params = task["params"] params_copy = deepcopy(params) - params_copy["ro_key"] = self.db.decrypt(params_copy.pop("private_key"), - params_copy.pop("schema_version"), params_copy.pop("salt")) + params_copy["ro_key"] = self.db.decrypt( + params_copy.pop("private_key"), + params_copy.pop("schema_version"), + params_copy.pop("salt"), + ) params_copy["ip_addr"] = params_copy.pop("ip_address") target_vim.inject_user_key(**params_copy) self.logger.debug( - "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])) - return "DONE", None, db_task_update, # params_copy["key"] + "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]) + ) + + return ( + "DONE", + None, + db_task_update, + ) # params_copy["key"] except (vimconn.VimConnException, NsWorkerException) as e: retries += 1 + if retries < self.max_retries_inject_ssh_key: - return "BUILD", None, {"retries": retries, "next_retry": self.time_retries_inject_ssh_key} - self.logger.error("task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)) + return ( + "BUILD", + None, + { + "retries": retries, + "next_retry": self.time_retries_inject_ssh_key, + }, + ) + + self.logger.error( + "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) + ) ro_vim_item_update = {"vim_details": str(e)} + return "FAILED", ro_vim_item_update, db_task_update class VimInteractionImage(VimInteractionBase): - def new(self, ro_task, task_index, task_depends): task = ro_task["tasks"][task_index] task_id = task["task_id"] created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] + try: # FIND if task.get("find_params"): vim_images = target_vim.get_image_list(**task["find_params"]) + if not vim_images: - raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format( - task["find_params"])) + raise NsWorkerExceptionNotFound( + "Image not found with this criteria: '{}'".format( + task["find_params"] + ) + ) elif len(vim_images) > 1: raise NsWorkerException( - "More than one network found with this criteria: '{}'".format(task["find_params"])) + "More than one network found with this criteria: '{}'".format( + task["find_params"] + ) + ) else: vim_image_id = vim_images[0]["id"] - ro_vim_item_update = {"vim_id": vim_image_id, - "vim_status": "DONE", - "created": created, - "created_items": created_items, - "vim_details": None} + ro_vim_item_update = { + "vim_id": vim_image_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + } self.logger.debug( - "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created)) + "task={} {} new-image={} created={}".format( + task_id, ro_task["target_id"], vim_image_id, created + ) + ) + return "DONE", ro_vim_item_update except (NsWorkerException, vimconn.VimConnException) as e: - self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "created": created, - "vim_details": str(e)} + self.logger.error( + "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + return "FAILED", ro_vim_item_update class VimInteractionFlavor(VimInteractionBase): - def delete(self, ro_task, task_index): task = ro_task["tasks"][task_index] task_id = task["task_id"] flavor_vim_id = ro_task["vim_info"]["vim_id"] - ro_vim_item_update_ok = {"vim_status": "DELETED", - "created": False, - "vim_details": "DELETED", - "vim_id": None} + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + try: if flavor_vim_id: target_vim = self.my_vims[ro_task["target_id"]] target_vim.delete_flavor(flavor_vim_id) - except vimconn.VimConnNotFoundException: ro_vim_item_update_ok["vim_details"] = "already deleted" - except vimconn.VimConnException as e: - self.logger.error("ro_task={} vim={} del-flavor={}: {}".format( - ro_task["_id"], ro_task["target_id"], flavor_vim_id, e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e)} + self.logger.error( + "ro_task={} vim={} del-flavor={}: {}".format( + ro_task["_id"], ro_task["target_id"], flavor_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + return "FAILED", ro_vim_item_update - self.logger.debug("task={} {} del-flavor={} {}".format( - task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", ""))) + self.logger.debug( + "task={} {} del-flavor={} {}".format( + task_id, + ro_task["target_id"], + flavor_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + return "DONE", ro_vim_item_update_ok def new(self, ro_task, task_index, task_depends): @@ -475,9 +678,11 @@ class VimInteractionFlavor(VimInteractionBase): created = False created_items = {} target_vim = self.my_vims[ro_task["target_id"]] + try: # FIND vim_flavor_id = None + if task.get("find_params"): try: flavor_data = task["find_params"]["flavor_data"] @@ -491,24 +696,34 @@ class VimInteractionFlavor(VimInteractionBase): vim_flavor_id = target_vim.new_flavor(flavor_data) created = True - ro_vim_item_update = {"vim_id": vim_flavor_id, - "vim_status": "DONE", - "created": created, - "created_items": created_items, - "vim_details": None} + ro_vim_item_update = { + "vim_id": vim_flavor_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None, + } self.logger.debug( - "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created)) + "task={} {} new-flavor={} created={}".format( + task_id, ro_task["target_id"], vim_flavor_id, created + ) + ) + return "DONE", ro_vim_item_update except (vimconn.VimConnException, NsWorkerException) as e: - self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "created": created, - "vim_details": str(e)} + self.logger.error( + "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + return "FAILED", ro_vim_item_update class VimInteractionSdnNet(VimInteractionBase): - @staticmethod def _match_pci(port_pci, mapping): """ @@ -528,21 +743,34 @@ class VimInteractionSdnNet(VimInteractionBase): pci_index = 0 while True: bracket_start = mapping.find("[", mapping_index) + if bracket_start == -1: break + bracket_end = mapping.find("]", bracket_start) if bracket_end == -1: break + length = bracket_start - mapping_index - if length and port_pci[pci_index:pci_index + length] != mapping[mapping_index:bracket_start]: + if ( + length + and port_pci[pci_index : pci_index + length] + != mapping[mapping_index:bracket_start] + ): return False - if port_pci[pci_index + length] not in mapping[bracket_start+1:bracket_end]: + + if ( + port_pci[pci_index + length] + not in mapping[bracket_start + 1 : bracket_end] + ): return False + pci_index += length + 1 mapping_index = bracket_end + 1 if port_pci[pci_index:] != mapping[mapping_index:]: return False + return True def _get_interfaces(self, vlds_to_connect, vim_account_id): @@ -552,35 +780,49 @@ class VimInteractionSdnNet(VimInteractionBase): :return: """ interfaces = [] + for vld in vlds_to_connect: table, _, db_id = vld.partition(":") db_id, _, vld = db_id.partition(":") _, _, vld_id = vld.partition(".") + if table == "vnfrs": q_filter = {"vim-account-id": vim_account_id, "_id": db_id} iface_key = "vnf-vld-id" else: # table == "nsrs" q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id} iface_key = "ns-vld-id" + db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter) + for db_vnfr in db_vnfrs: for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())): for iface_index, interface in enumerate(vdur["interfaces"]): - if interface.get(iface_key) == vld_id and \ - interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): + if interface.get(iface_key) == vld_id and interface.get( + "type" + ) in ("SR-IOV", "PCI-PASSTHROUGH"): # only SR-IOV o PT interface_ = interface.copy() - interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(db_vnfr["_id"], vdu_index, - iface_index) + interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format( + db_vnfr["_id"], vdu_index, iface_index + ) + if vdur.get("status") == "ERROR": interface_["status"] = "ERROR" + interfaces.append(interface_) + return interfaces def refresh(self, ro_task): # look for task create - task_create_index, _ = next(i_t for i_t in enumerate(ro_task["tasks"]) - if i_t[1] and i_t[1]["action"] == "CREATE" and i_t[1]["status"] != "FINISHED") + task_create_index, _ = next( + i_t + for i_t in enumerate(ro_task["tasks"]) + if i_t[1] + and i_t[1]["action"] == "CREATE" + and i_t[1]["status"] != "FINISHED" + ) return self.new(ro_task, task_create_index, None) @@ -601,17 +843,21 @@ class VimInteractionSdnNet(VimInteractionBase): created = ro_task["vim_info"].get("created", False) try: - # CREATE params = task["params"] vlds_to_connect = params["vlds"] associated_vim = params["target_vim"] - additional_ports = params.get("sdn-ports") or () # external additional ports + # external additional ports + additional_ports = params.get("sdn-ports") or () _, _, vim_account_id = associated_vim.partition(":") + if associated_vim: # get associated VIM if associated_vim not in self.db_vims: - self.db_vims[associated_vim] = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + self.db_vims[associated_vim] = self.db.get_one( + "vim_accounts", {"_id": vim_account_id} + ) + db_vim = self.db_vims[associated_vim] # look for ports to connect @@ -622,8 +868,10 @@ class VimInteractionSdnNet(VimInteractionBase): pending_ports = error_ports = 0 vlan_used = None sdn_need_update = False + for port in ports: vlan_used = port.get("vlan") or vlan_used + # TODO. Do not connect if already done if not port.get("compute_node") or not port.get("pci"): if port.get("status") == "ERROR": @@ -631,33 +879,56 @@ class VimInteractionSdnNet(VimInteractionBase): else: pending_ports += 1 continue + pmap = None - compute_node_mappings = next((c for c in db_vim["config"].get("sdn-port-mapping", ()) - if c and c["compute_node"] == port["compute_node"]), None) + compute_node_mappings = next( + ( + c + for c in db_vim["config"].get("sdn-port-mapping", ()) + if c and c["compute_node"] == port["compute_node"] + ), + None, + ) + if compute_node_mappings: # process port_mapping pci of type 0000:af:1[01].[1357] - pmap = next((p for p in compute_node_mappings["ports"] - if self._match_pci(port["pci"], p.get("pci"))), None) + pmap = next( + ( + p + for p in compute_node_mappings["ports"] + if self._match_pci(port["pci"], p.get("pci")) + ), + None, + ) + if not pmap: if not db_vim["config"].get("mapping_not_needed"): - error_list.append("Port mapping not found for compute_node={} pci={}".format( - port["compute_node"], port["pci"])) + error_list.append( + "Port mapping not found for compute_node={} pci={}".format( + port["compute_node"], port["pci"] + ) + ) continue + pmap = {} service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"]) new_port = { - "service_endpoint_id": pmap.get("service_endpoint_id") or service_endpoint_id, - "service_endpoint_encapsulation_type": "dot1q" if port["type"] == "SR-IOV" else None, + "service_endpoint_id": pmap.get("service_endpoint_id") + or service_endpoint_id, + "service_endpoint_encapsulation_type": "dot1q" + if port["type"] == "SR-IOV" + else None, "service_endpoint_encapsulation_info": { "vlan": port.get("vlan"), - "mac": port.get("mac_address"), - "device_id": pmap.get("device_id") or port["compute_node"], # device_id - "device_interface_id": pmap.get("device_interface_id") or port["pci"], + "mac": port.get("mac-address"), + "device_id": pmap.get("device_id") or port["compute_node"], + "device_interface_id": pmap.get("device_interface_id") + or port["pci"], "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"), "switch_port": pmap.get("switch_port"), "service_mapping_info": pmap.get("service_mapping_info"), - } + }, } # TODO @@ -667,109 +938,179 @@ class VimInteractionSdnNet(VimInteractionBase): sdn_ports.append(new_port) if error_ports: - error_list.append("{} interfaces have not been created as VDU is on ERROR status".format(error_ports)) + error_list.append( + "{} interfaces have not been created as VDU is on ERROR status".format( + error_ports + ) + ) # connect external ports for index, additional_port in enumerate(additional_ports): - additional_port_id = additional_port.get("service_endpoint_id") or "external-{}".format(index) - sdn_ports.append({ - "service_endpoint_id": additional_port_id, - "service_endpoint_encapsulation_type": additional_port.get("service_endpoint_encapsulation_type", - "dot1q"), - "service_endpoint_encapsulation_info": { - "vlan": additional_port.get("vlan") or vlan_used, - "mac": additional_port.get("mac_address"), - "device_id": additional_port.get("device_id"), - "device_interface_id": additional_port.get("device_interface_id"), - "switch_dpid": additional_port.get("switch_dpid") or additional_port.get("switch_id"), - "switch_port": additional_port.get("switch_port"), - "service_mapping_info": additional_port.get("service_mapping_info"), - }}) + additional_port_id = additional_port.get( + "service_endpoint_id" + ) or "external-{}".format(index) + sdn_ports.append( + { + "service_endpoint_id": additional_port_id, + "service_endpoint_encapsulation_type": additional_port.get( + "service_endpoint_encapsulation_type", "dot1q" + ), + "service_endpoint_encapsulation_info": { + "vlan": additional_port.get("vlan") or vlan_used, + "mac": additional_port.get("mac_address"), + "device_id": additional_port.get("device_id"), + "device_interface_id": additional_port.get( + "device_interface_id" + ), + "switch_dpid": additional_port.get("switch_dpid") + or additional_port.get("switch_id"), + "switch_port": additional_port.get("switch_port"), + "service_mapping_info": additional_port.get( + "service_mapping_info" + ), + }, + } + ) new_connected_ports.append(additional_port_id) sdn_info = "" + # if there are more ports to connect or they have been modified, call create/update if error_list: sdn_status = "ERROR" sdn_info = "; ".join(error_list) elif set(connected_ports) != set(new_connected_ports) or sdn_need_update: last_update = time.time() + if not sdn_net_id: if len(sdn_ports) < 2: sdn_status = "ACTIVE" + if not pending_ports: - self.logger.debug("task={} {} new-sdn-net done, less than 2 ports". - format(task_id, ro_task["target_id"])) + self.logger.debug( + "task={} {} new-sdn-net done, less than 2 ports".format( + task_id, ro_task["target_id"] + ) + ) else: net_type = params.get("type") or "ELAN" - sdn_net_id, created_items = target_vim.create_connectivity_service( - net_type, sdn_ports) + ( + sdn_net_id, + created_items, + ) = target_vim.create_connectivity_service(net_type, sdn_ports) created = True - self.logger.debug("task={} {} new-sdn-net={} created={}". - format(task_id, ro_task["target_id"], sdn_net_id, created)) + self.logger.debug( + "task={} {} new-sdn-net={} created={}".format( + task_id, ro_task["target_id"], sdn_net_id, created + ) + ) else: created_items = target_vim.edit_connectivity_service( - sdn_net_id, conn_info=created_items, connection_points=sdn_ports) + sdn_net_id, conn_info=created_items, connection_points=sdn_ports + ) created = True - self.logger.debug("task={} {} update-sdn-net={} created={}". - format(task_id, ro_task["target_id"], sdn_net_id, created)) + self.logger.debug( + "task={} {} update-sdn-net={} created={}".format( + task_id, ro_task["target_id"], sdn_net_id, created + ) + ) + connected_ports = new_connected_ports elif sdn_net_id: - wim_status_dict = target_vim.get_connectivity_service_status(sdn_net_id, conn_info=created_items) + wim_status_dict = target_vim.get_connectivity_service_status( + sdn_net_id, conn_info=created_items + ) sdn_status = wim_status_dict["sdn_status"] + if wim_status_dict.get("sdn_info"): sdn_info = str(wim_status_dict.get("sdn_info")) or "" + if wim_status_dict.get("error_msg"): sdn_info = wim_status_dict.get("error_msg") or "" if pending_ports: if sdn_status != "ERROR": sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format( - len(ports)-pending_ports, len(ports)) + len(ports) - pending_ports, len(ports) + ) + if sdn_status == "ACTIVE": sdn_status = "BUILD" - ro_vim_item_update = {"vim_id": sdn_net_id, - "vim_status": sdn_status, - "created": created, - "created_items": created_items, - "connected_ports": connected_ports, - "vim_details": sdn_info, - "last_update": last_update} + ro_vim_item_update = { + "vim_id": sdn_net_id, + "vim_status": sdn_status, + "created": created, + "created_items": created_items, + "connected_ports": connected_ports, + "vim_details": sdn_info, + "last_update": last_update, + } + return sdn_status, ro_vim_item_update except Exception as e: - self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), - exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException))) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "created": created, - "vim_details": str(e)} + self.logger.error( + "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), + exc_info=not isinstance( + e, (sdnconn.SdnConnectorError, vimconn.VimConnException) + ), + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e), + } + return "FAILED", ro_vim_item_update def delete(self, ro_task, task_index): task = ro_task["tasks"][task_index] task_id = task["task_id"] sdn_vim_id = ro_task["vim_info"].get("vim_id") - ro_vim_item_update_ok = {"vim_status": "DELETED", - "created": False, - "vim_details": "DELETED", - "vim_id": None} + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None, + } + try: if sdn_vim_id: target_vim = self.my_vims[ro_task["target_id"]] - target_vim.delete_connectivity_service(sdn_vim_id, ro_task["vim_info"].get("created_items")) + target_vim.delete_connectivity_service( + sdn_vim_id, ro_task["vim_info"].get("created_items") + ) except Exception as e: - if isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value: + if ( + isinstance(e, sdnconn.SdnConnectorError) + and e.http_code == HTTPStatus.NOT_FOUND.value + ): ro_vim_item_update_ok["vim_details"] = "already deleted" else: - self.logger.error("ro_task={} vim={} del-sdn-net={}: {}".format(ro_task["_id"], ro_task["target_id"], - sdn_vim_id, e), - exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException))) - ro_vim_item_update = {"vim_status": "VIM_ERROR", - "vim_details": "Error while deleting: {}".format(e)} + self.logger.error( + "ro_task={} vim={} del-sdn-net={}: {}".format( + ro_task["_id"], ro_task["target_id"], sdn_vim_id, e + ), + exc_info=not isinstance( + e, (sdnconn.SdnConnectorError, vimconn.VimConnException) + ), + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e), + } + return "FAILED", ro_vim_item_update - self.logger.debug("task={} {} del-sdn-net={} {}".format(task_id, ro_task["target_id"], sdn_vim_id, - ro_vim_item_update_ok.get("vim_details", ""))) + self.logger.debug( + "task={} {} del-sdn-net={} {}".format( + task_id, + ro_task["target_id"], + sdn_vim_id, + ro_vim_item_update_ok.get("vim_details", ""), + ) + ) + return "DONE", ro_vim_item_update_ok @@ -779,12 +1120,8 @@ class NsWorker(threading.Thread): REFRESH_ERROR = 600 REFRESH_IMAGE = 3600 * 10 REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 2000 - # TODO delete assigment_lock = Lock() + QUEUE_SIZE = 100 terminate = False - # TODO delete assignment = {} - MAX_TIME_LOCKED = 3600 - MAX_TIME_VIM_LOCKED = 120 def __init__(self, worker_index, config, plugins, db): """ @@ -798,23 +1135,36 @@ class NsWorker(threading.Thread): self.config = config self.plugins = plugins self.plugin_name = "unknown" - self.logger = logging.getLogger('ro.worker{}'.format(worker_index)) + self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index self.task_queue = queue.Queue(self.QUEUE_SIZE) - self.my_vims = {} # targetvim: vimplugin class - self.db_vims = {} # targetvim: vim information from database - self.vim_targets = [] # targetvim list + # targetvim: vimplugin class + self.my_vims = {} + # targetvim: vim information from database + self.db_vims = {} + # targetvim list + self.vim_targets = [] self.my_id = config["process_id"] + ":" + str(worker_index) self.db = db self.item2class = { "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), - "image": VimInteractionImage(self.db, self.my_vims, self.db_vims, self.logger), - "flavor": VimInteractionFlavor(self.db, self.my_vims, self.db_vims, self.logger), - "sdn_net": VimInteractionSdnNet(self.db, self.my_vims, self.db_vims, self.logger), + "image": VimInteractionImage( + self.db, self.my_vims, self.db_vims, self.logger + ), + "flavor": VimInteractionFlavor( + self.db, self.my_vims, self.db_vims, self.logger + ), + "sdn_net": VimInteractionSdnNet( + self.db, self.my_vims, self.db_vims, self.logger + ), } self.time_last_task_processed = None - self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db + # lists of tasks to delete because nsrs or vnfrs has been deleted from db + self.tasks_to_delete = [] + # it is idle when there are not vim_targets associated + self.idle = True + self.task_locked_time = config["global"]["task_locked_time"] def insert_task(self, task): try: @@ -844,37 +1194,51 @@ class NsWorker(threading.Thread): """ if not db_vim.get("config"): return + file_name = "" + try: if db_vim["config"].get("ca_cert_content"): file_name = "{}:{}".format(target_id, self.worker_index) + try: mkdir(file_name) except FileExistsError: pass + file_name = file_name + "/ca_cert" + with open(file_name, "w") as f: f.write(db_vim["config"]["ca_cert_content"]) del db_vim["config"]["ca_cert_content"] db_vim["config"]["ca_cert"] = file_name except Exception as e: - raise NsWorkerException("Error writing to file '{}': {}".format(file_name, e)) + raise NsWorkerException( + "Error writing to file '{}': {}".format(file_name, e) + ) def _load_plugin(self, name, type="vim"): # type can be vim or sdn if "rovim_dummy" not in self.plugins: self.plugins["rovim_dummy"] = VimDummyConnector + if "rosdn_dummy" not in self.plugins: self.plugins["rosdn_dummy"] = SdnDummyConnector + if name in self.plugins: return self.plugins[name] + try: - for v in iter_entry_points('osm_ro{}.plugins'.format(type), name): - self.plugins[name] = v.load() + for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): + self.plugins[name] = ep.load() except Exception as e: raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) + if name and name not in self.plugins: - raise NsWorkerException("Plugin 'osm_{n}' has not been installed".format(n=name)) + raise NsWorkerException( + "Plugin 'osm_{n}' has not been installed".format(n=name) + ) + return self.plugins[name] def _unload_vim(self, target_id): @@ -884,10 +1248,13 @@ class NsWorker(threading.Thread): :return: None. """ try: - target, _, _id = target_id.partition(":") self.db_vims.pop(target_id, None) self.my_vims.pop(target_id, None) - self.vim_targets.remove(target_id) + + if target_id in self.vim_targets: + self.vim_targets.remove(target_id) + + self.logger.info("Unloaded {}".format(target_id)) rmtree("{}:{}".format(target_id, self.worker_index)) except FileNotFoundError: pass # this is raised by rmtree if folder does not exist @@ -906,43 +1273,67 @@ class NsWorker(threading.Thread): unset_dict = {} op_text = "" step = "" - loaded = target_id in self.my_vims - target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns" + loaded = target_id in self.vim_targets + target_database = ( + "vim_accounts" + if target == "vim" + else "wim_accounts" + if target == "wim" + else "sdns" + ) + try: step = "Getting {} from db".format(target_id) db_vim = self.db.get_one(target_database, {"_id": _id}) - for op_index, operation in enumerate(db_vim["_admin"].get("operations", ())): + + for op_index, operation in enumerate( + db_vim["_admin"].get("operations", ()) + ): if operation["operationState"] != "PROCESSING": continue + locked_at = operation.get("locked_at") - if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED: + + if locked_at is not None and locked_at >= now - self.task_locked_time: # some other thread is doing this operation return + # lock op_text = "_admin.operations.{}.".format(op_index) - if not self.db.set_one(target_database, - q_filter={"_id": _id, - op_text + "operationState": "PROCESSING", - op_text + "locked_at": locked_at - }, - update_dict={op_text + "locked_at": now, - "admin.current_operation": op_index}, - fail_on_empty=False): + + if not self.db.set_one( + target_database, + q_filter={ + "_id": _id, + op_text + "operationState": "PROCESSING", + op_text + "locked_at": locked_at, + }, + update_dict={ + op_text + "locked_at": now, + "admin.current_operation": op_index, + }, + fail_on_empty=False, + ): return + unset_dict[op_text + "locked_at"] = None unset_dict["current_operation"] = None step = "Loading " + target_id error_text = self._load_vim(target_id) + if not error_text: step = "Checking connectivity" - if target == 'vim': + + if target == "vim": self.my_vims[target_id].check_vim_connectivity() else: self.my_vims[target_id].check_credentials() + update_dict["_admin.operationalState"] = "ENABLED" update_dict["_admin.detailed-status"] = "" unset_dict[op_text + "detailed-status"] = None update_dict[op_text + "operationState"] = "COMPLETED" + return except Exception as e: @@ -957,10 +1348,18 @@ class NsWorker(threading.Thread): unset_dict.pop(op_text + "detailed-status", None) update_dict["_admin.operationalState"] = "ERROR" update_dict["_admin.detailed-status"] = error_text + if op_text: update_dict[op_text + "statusEnteredTime"] = now - self.db.set_one(target_database, q_filter={"_id": _id}, update_dict=update_dict, unset=unset_dict, - fail_on_empty=False) + + self.db.set_one( + target_database, + q_filter={"_id": _id}, + update_dict=update_dict, + unset=unset_dict, + fail_on_empty=False, + ) + if not loaded: self._unload_vim(target_id) @@ -982,9 +1381,16 @@ class NsWorker(threading.Thread): :return: None if ok, descriptive text if error """ target, _, _id = target_id.partition(":") - target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns" + target_database = ( + "vim_accounts" + if target == "vim" + else "wim_accounts" + if target == "wim" + else "sdns" + ) plugin_name = "" vim = None + try: step = "Getting {}={} from db".format(target, _id) # TODO process for wim, sdnc, ... @@ -996,20 +1402,31 @@ class NsWorker(threading.Thread): step = "Decrypting password" schema_version = vim.get("schema_version") - self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'), - schema_version=schema_version, salt=_id) + self.db.encrypt_decrypt_fields( + vim, + "decrypt", + fields=("password", "secret"), + schema_version=schema_version, + salt=_id, + ) self._process_vim_config(target_id, vim) + if target == "vim": plugin_name = "rovim_" + vim["vim_type"] step = "Loading plugin '{}'".format(plugin_name) vim_module_conn = self._load_plugin(plugin_name) step = "Loading {}'".format(target_id) self.my_vims[target_id] = vim_module_conn( - uuid=vim['_id'], name=vim['name'], - tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'), - url=vim['vim_url'], url_admin=None, - user=vim['vim_user'], passwd=vim['vim_password'], - config=vim.get('config') or {}, persistent_info={} + uuid=vim["_id"], + name=vim["name"], + tenant_id=vim.get("vim_tenant_id"), + tenant_name=vim.get("vim_tenant_name"), + url=vim["vim_url"], + url_admin=None, + user=vim["vim_user"], + passwd=vim["vim_password"], + config=vim.get("config") or {}, + persistent_info={}, ) else: # sdn plugin_name = "rosdn_" + vim["type"] @@ -1020,20 +1437,32 @@ class NsWorker(threading.Thread): wim_config = wim.pop("config", {}) or {} wim["uuid"] = wim["_id"] wim["wim_url"] = wim["url"] + if wim.get("dpid"): wim_config["dpid"] = wim.pop("dpid") + if wim.get("switch_id"): wim_config["switch_id"] = wim.pop("switch_id") - self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) # wim, wim_account, config + + # wim, wim_account, config + self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) self.db_vims[target_id] = vim self.error_status = None - self.logger.info("Connector loaded for {}, plugin={}".format(target_id, plugin_name)) + + self.logger.info( + "Connector loaded for {}, plugin={}".format(target_id, plugin_name) + ) except Exception as e: - self.logger.error("Cannot load {} plugin={}: {} {}".format( - target_id, plugin_name, step, e)) + self.logger.error( + "Cannot load {} plugin={}: {} {}".format( + target_id, plugin_name, step, e + ) + ) + self.db_vims[target_id] = vim or {} self.db_vims[target_id] = FailingConnector(str(e)) error_status = "{} Error: {}".format(step, e) + return error_status finally: if target_id not in self.vim_targets: @@ -1045,26 +1474,36 @@ class NsWorker(threading.Thread): :return: None """ now = time.time() + if not self.time_last_task_processed: self.time_last_task_processed = now + try: while True: locked = self.db.set_one( "ro_tasks", - q_filter={"target_id": self.vim_targets, - "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at.lt": now - self.MAX_TIME_LOCKED, - "to_check_at.lt": self.time_last_task_processed}, + q_filter={ + "target_id": self.vim_targets, + "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], + "locked_at.lt": now - self.task_locked_time, + "to_check_at.lt": self.time_last_task_processed, + }, update_dict={"locked_by": self.my_id, "locked_at": now}, - fail_on_empty=False) + fail_on_empty=False, + ) + if locked: # read and return ro_task = self.db.get_one( "ro_tasks", - q_filter={"target_id": self.vim_targets, - "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at": now}) + q_filter={ + "target_id": self.vim_targets, + "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], + "locked_at": now, + }, + ) return ro_task + if self.time_last_task_processed == now: self.time_last_task_processed = None return None @@ -1075,7 +1514,10 @@ class NsWorker(threading.Thread): except DbException as e: self.logger.error("Database exception at _get_db_task: {}".format(e)) except Exception as e: - self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True) + self.logger.critical( + "Unexpected exception at _get_db_task: {}".format(e), exc_info=True + ) + return None def _delete_task(self, ro_task, task_index, task_depends, db_update): @@ -1085,26 +1527,45 @@ class NsWorker(threading.Thread): """ my_task = ro_task["tasks"][task_index] task_id = my_task["task_id"] - needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False) + needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get( + "created_items", False + ) + if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? + try: for index, task in enumerate(ro_task["tasks"]): if index == task_index or not task: continue # own task - if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE": + + if ( + my_task["target_record"] == task["target_record"] + and task["action"] == "CREATE" + ): # set to finished - db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED" - elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"): + db_update["tasks.{}.status".format(index)] = task[ + "status" + ] = "FINISHED" + elif task["action"] == "CREATE" and task["status"] not in ( + "FINISHED", + "SUPERSEDED", + ): needed_delete = False + if needed_delete: return self.item2class[my_task["item"]].delete(ro_task, task_index) else: return "SUPERSEDED", None except Exception as e: if not isinstance(e, NsWorkerException): - self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e), - exc_info=True) + self.logger.critical( + "Unexpected exception at _delete_task task={}: {}".format( + task_id, e + ), + exc_info=True, + ) + return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} def _create_task(self, ro_task, task_index, task_depends, db_update): @@ -1115,6 +1576,7 @@ class NsWorker(threading.Thread): my_task = ro_task["tasks"][task_index] task_id = my_task["task_id"] task_status = None + if my_task["status"] == "FAILED": return None, None # TODO need to be retry?? elif my_task["status"] == "SCHEDULED": @@ -1122,19 +1584,29 @@ class NsWorker(threading.Thread): for index, task in enumerate(ro_task["tasks"]): if index == task_index or not task: continue # own task - if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"): + + if task["action"] == "CREATE" and task["status"] not in ( + "SCHEDULED", + "FINISHED", + "SUPERSEDED", + ): return task["status"], "COPY_VIM_INFO" try: task_status, ro_vim_item_update = self.item2class[my_task["item"]].new( - ro_task, task_index, task_depends) + ro_task, task_index, task_depends + ) # TODO update other CREATE tasks except Exception as e: if not isinstance(e, NsWorkerException): - self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) + self.logger.error( + "Error executing task={}: {}".format(task_id, e), exc_info=True + ) + task_status = "FAILED" ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} # TODO update ro_vim_item_update + return task_status, ro_vim_item_update else: return None, None @@ -1150,16 +1622,20 @@ class NsWorker(threading.Thread): :param target_id: :return: database ro_task plus index of task """ - if task_id.startswith("vim:") or task_id.startswith("sdn:") or task_id.startswith("wim:"): + if ( + task_id.startswith("vim:") + or task_id.startswith("sdn:") + or task_id.startswith("wim:") + ): target_id, _, task_id = task_id.partition(" ") if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): ro_task_dependency = self.db.get_one( "ro_tasks", - q_filter={"target_id": target_id, - "tasks.target_record_id": task_id - }, - fail_on_empty=False) + q_filter={"target_id": target_id, "tasks.target_record_id": task_id}, + fail_on_empty=False, + ) + if ro_task_dependency: for task_index, task in enumerate(ro_task_dependency["tasks"]): if task["target_record_id"] == task_id: @@ -1170,12 +1646,16 @@ class NsWorker(threading.Thread): for task_index, task in enumerate(ro_task["tasks"]): if task and task["task_id"] == task_id: return ro_task, task_index + ro_task_dependency = self.db.get_one( "ro_tasks", - q_filter={"tasks.ANYINDEX.task_id": task_id, - "tasks.ANYINDEX.target_record.ne": None - }, - fail_on_empty=False) + q_filter={ + "tasks.ANYINDEX.task_id": task_id, + "tasks.ANYINDEX.target_record.ne": None, + }, + fail_on_empty=False, + ) + if ro_task_dependency: for task_index, task in ro_task_dependency["tasks"]: if task["task_id"] == task_id: @@ -1185,7 +1665,8 @@ class NsWorker(threading.Thread): def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() - next_check_at = now + (24*60*60) # one day + # one day + next_check_at = now + (24 * 60 * 60) db_ro_task_update = {} def _update_refresh(new_status): @@ -1196,6 +1677,7 @@ class NsWorker(threading.Thread): nonlocal ro_task next_refresh = time.time() + if task["item"] in ("image", "flavor"): next_refresh += self.REFRESH_IMAGE elif new_status == "BUILD": @@ -1204,76 +1686,152 @@ class NsWorker(threading.Thread): next_refresh += self.REFRESH_ACTIVE else: next_refresh += self.REFRESH_ERROR + next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh ro_task["vim_info"]["refresh_at"] = next_refresh try: - # 0 get task_status_create + # 0: get task_status_create + lock_object = None task_status_create = None - task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and - t["status"] in ("BUILD", "DONE")), None) + task_create = next( + ( + t + for t in ro_task["tasks"] + if t + and t["action"] == "CREATE" + and t["status"] in ("BUILD", "DONE") + ), + None, + ) + if task_create: task_status_create = task_create["status"] - # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD + + # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD for task_action in ("DELETE", "CREATE", "EXEC"): db_vim_update = None new_status = None + for task_index, task in enumerate(ro_task["tasks"]): if not task: continue # task deleted + task_depends = {} target_update = None - if (task_action in ("DELETE", "EXEC") and task["status"] not in ("SCHEDULED", "BUILD")) or \ - task["action"] != task_action or \ - (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")): + + if ( + ( + task_action in ("DELETE", "EXEC") + and task["status"] not in ("SCHEDULED", "BUILD") + ) + or task["action"] != task_action + or ( + task_action == "CREATE" + and task["status"] in ("FINISHED", "SUPERSEDED") + ) + ): continue + task_path = "tasks.{}.status".format(task_index) try: db_vim_info_update = None + if task["status"] == "SCHEDULED": # check if tasks that this depends on have been completed dependency_not_completed = False - for dependency_task_id in (task.get("depends_on") or ()): - dependency_ro_task, dependency_task_index = \ - self._get_dependency(dependency_task_id, target_id=ro_task["target_id"]) - dependency_task = dependency_ro_task["tasks"][dependency_task_index] + + for dependency_task_id in task.get("depends_on") or (): + ( + dependency_ro_task, + dependency_task_index, + ) = self._get_dependency( + dependency_task_id, target_id=ro_task["target_id"] + ) + dependency_task = dependency_ro_task["tasks"][ + dependency_task_index + ] + if dependency_task["status"] == "SCHEDULED": dependency_not_completed = True - next_check_at = min(next_check_at, dependency_ro_task["to_check_at"]) + next_check_at = min( + next_check_at, dependency_ro_task["to_check_at"] + ) + # must allow dependent task to be processed first + # to do this set time after last_task_processed + next_check_at = max( + self.time_last_task_processed, next_check_at + ) break elif dependency_task["status"] == "FAILED": error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( - task["action"], task["item"], dependency_task["action"], - dependency_task["item"], dependency_task_id, - dependency_ro_task["vim_info"].get("vim_details")) - self.logger.error("task={} {}".format(task["task_id"], error_text)) + task["action"], + task["item"], + dependency_task["action"], + dependency_task["item"], + dependency_task_id, + dependency_ro_task["vim_info"].get( + "vim_details" + ), + ) + self.logger.error( + "task={} {}".format(task["task_id"], error_text) + ) raise NsWorkerException(error_text) - task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"] - task_depends["TASK-{}".format(dependency_task_id)] = \ - dependency_ro_task["vim_info"]["vim_id"] + task_depends[dependency_task_id] = dependency_ro_task[ + "vim_info" + ]["vim_id"] + task_depends[ + "TASK-{}".format(dependency_task_id) + ] = dependency_ro_task["vim_info"]["vim_id"] + if dependency_not_completed: # TODO set at vim_info.vim_details that it is waiting continue + # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew + # the task of renew this locking. It will update database locket_at periodically + if not lock_object: + lock_object = LockRenew.add_lock_object( + "ro_tasks", ro_task, self + ) + if task["action"] == "DELETE": - new_status, db_vim_info_update = self._delete_task(ro_task, task_index, - task_depends, db_ro_task_update) - new_status = "FINISHED" if new_status == "DONE" else new_status + (new_status, db_vim_info_update,) = self._delete_task( + ro_task, task_index, task_depends, db_ro_task_update + ) + new_status = ( + "FINISHED" if new_status == "DONE" else new_status + ) # ^with FINISHED instead of DONE it will not be refreshing + if new_status in ("FINISHED", "SUPERSEDED"): target_update = "DELETE" elif task["action"] == "EXEC": - new_status, db_vim_info_update, db_task_update = self.item2class[task["item"]].exec( - ro_task, task_index, task_depends) - new_status = "FINISHED" if new_status == "DONE" else new_status + ( + new_status, + db_vim_info_update, + db_task_update, + ) = self.item2class[task["item"]].exec( + ro_task, task_index, task_depends + ) + new_status = ( + "FINISHED" if new_status == "DONE" else new_status + ) # ^with FINISHED instead of DONE it will not be refreshing + if db_task_update: # load into database the modified db_task_update "retries" and "next_retry" if db_task_update.get("retries"): - db_ro_task_update["tasks.{}.retries".format(task_index)] = db_task_update["retries"] - next_check_at = time.time() + db_task_update.get("next_retry", 60) + db_ro_task_update[ + "tasks.{}.retries".format(task_index) + ] = db_task_update["retries"] + + next_check_at = time.time() + db_task_update.get( + "next_retry", 60 + ) target_update = None elif task["action"] == "CREATE": if task["status"] == "SCHEDULED": @@ -1281,33 +1839,55 @@ class NsWorker(threading.Thread): new_status = task_status_create target_update = "COPY_VIM_INFO" else: - new_status, db_vim_info_update = \ - self.item2class[task["item"]].new(ro_task, task_index, task_depends) + new_status, db_vim_info_update = self.item2class[ + task["item"] + ].new(ro_task, task_index, task_depends) # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]: - new_status, db_vim_info_update = self.item2class[task["item"]].refresh(ro_task) + if ( + ro_task["vim_info"]["refresh_at"] + and now > ro_task["vim_info"]["refresh_at"] + ): + new_status, db_vim_info_update = self.item2class[ + task["item"] + ].refresh(ro_task) _update_refresh(new_status) + except Exception as e: new_status = "FAILED" - db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} - if not isinstance(e, (NsWorkerException, vimconn.VimConnException)): - self.logger.error("Unexpected exception at _delete_task task={}: {}". - format(task["task_id"], e), exc_info=True) + db_vim_info_update = { + "vim_status": "VIM_ERROR", + "vim_details": str(e), + } + + if not isinstance( + e, (NsWorkerException, vimconn.VimConnException) + ): + self.logger.error( + "Unexpected exception at _delete_task task={}: {}".format( + task["task_id"], e + ), + exc_info=True, + ) try: if db_vim_info_update: db_vim_update = db_vim_info_update.copy() - db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()}) + db_ro_task_update.update( + { + "vim_info." + k: v + for k, v in db_vim_info_update.items() + } + ) ro_task["vim_info"].update(db_vim_info_update) if new_status: if task_action == "CREATE": task_status_create = new_status db_ro_task_update[task_path] = new_status - if target_update or db_vim_update: + if target_update or db_vim_update: if target_update == "DELETE": self._update_target(task, None) elif target_update == "COPY_VIM_INFO": @@ -1316,76 +1896,144 @@ class NsWorker(threading.Thread): self._update_target(task, db_vim_update) except Exception as e: - if isinstance(e, DbException) and e.http_code == HTTPStatus.NOT_FOUND: + if ( + isinstance(e, DbException) + and e.http_code == HTTPStatus.NOT_FOUND + ): # if the vnfrs or nsrs has been removed from database, this task must be removed - self.logger.debug("marking to delete task={}".format(task["task_id"])) + self.logger.debug( + "marking to delete task={}".format(task["task_id"]) + ) self.tasks_to_delete.append(task) else: - self.logger.error("Unexpected exception at _update_target task={}: {}". - format(task["task_id"], e), exc_info=True) - - q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]} + self.logger.error( + "Unexpected exception at _update_target task={}: {}".format( + task["task_id"], e + ), + exc_info=True, + ) + + locked_at = ro_task["locked_at"] + + if lock_object: + locked_at = [ + lock_object["locked_at"], + lock_object["locked_at"] + self.task_locked_time, + ] + # locked_at contains two times to avoid race condition. In case the lock has been renew, it will + # contain exactly locked_at + self.task_locked_time + LockRenew.remove_lock_object(lock_object) + + q_filter = { + "_id": ro_task["_id"], + "to_check_at": ro_task["to_check_at"], + "locked_at": locked_at, + } # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, # outside this task (by ro_nbi) do not update it db_ro_task_update["locked_by"] = None # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked - db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) + db_ro_task_update["locked_at"] = int(now - self.task_locked_time) + db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at - if not self.db.set_one("ro_tasks", - update_dict=db_ro_task_update, - q_filter=q_filter, - fail_on_empty=False): + + if not self.db.set_one( + "ro_tasks", + update_dict=db_ro_task_update, + q_filter=q_filter, + fail_on_empty=False, + ): del db_ro_task_update["to_check_at"] del q_filter["to_check_at"] - self.db.set_one("ro_tasks", - q_filter=q_filter, - update_dict=db_ro_task_update, - fail_on_empty=True) + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) except DbException as e: - self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e)) + self.logger.error( + "ro_task={} Error updating database {}".format(ro_task_id, e) + ) except Exception as e: - self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True) + self.logger.error( + "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True + ) def _update_target(self, task, ro_vim_item_update): table, _, temp = task["target_record"].partition(":") _id, _, path_vim_status = temp.partition(":") - path_item = path_vim_status[:path_vim_status.rfind(".")] - path_item = path_item[:path_item.rfind(".")] + path_item = path_vim_status[: path_vim_status.rfind(".")] + path_item = path_item[: path_item.rfind(".")] # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id" # path_item: dot separated list targeting record information, e.g. "vdur.10" + if ro_vim_item_update: - update_dict = {path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k in - ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')} + update_dict = { + path_vim_status + "." + k: v + for k, v in ro_vim_item_update.items() + if k + in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces") + } + if path_vim_status.startswith("vdur."): # for backward compatibility, add vdur.name apart from vdur.vim_name if ro_vim_item_update.get("vim_name"): update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"] + # for backward compatibility, add vdur.vim-id apart from vdur.vim_id if ro_vim_item_update.get("vim_id"): update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"] + # update general status if ro_vim_item_update.get("vim_status"): - update_dict[path_item + ".status"] = ro_vim_item_update["vim_status"] + update_dict[path_item + ".status"] = ro_vim_item_update[ + "vim_status" + ] + if ro_vim_item_update.get("interfaces"): path_interfaces = path_item + ".interfaces" + for i, iface in enumerate(ro_vim_item_update.get("interfaces")): if iface: - update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if - k in ('vlan', 'compute_node', 'pci')}) + update_dict.update( + { + path_interfaces + ".{}.".format(i) + k: v + for k, v in iface.items() + if k in ("vlan", "compute_node", "pci") + } + ) + # put ip_address and mac_address with ip-address and mac-address - if iface.get('ip_address'): - update_dict[path_interfaces + ".{}.".format(i) + "ip-address"] = iface['ip_address'] - if iface.get('mac_address'): - update_dict[path_interfaces + ".{}.".format(i) + "mac-address"] = iface['mac_address'] + if iface.get("ip_address"): + update_dict[ + path_interfaces + ".{}.".format(i) + "ip-address" + ] = iface["ip_address"] + + if iface.get("mac_address"): + update_dict[ + path_interfaces + ".{}.".format(i) + "mac-address" + ] = iface["mac_address"] + if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): - update_dict["ip-address"] = iface.get("ip_address").split(";")[0] + update_dict["ip-address"] = iface.get("ip_address").split( + ";" + )[0] + if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): - update_dict[path_item + ".ip-address"] = iface.get("ip_address").split(";")[0] + update_dict[path_item + ".ip-address"] = iface.get( + "ip_address" + ).split(";")[0] self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) else: update_dict = {path_item + ".status": "DELETED"} - self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict, unset={path_vim_status: None}) + self.db.set_one( + table, + q_filter={"_id": _id}, + update_dict=update_dict, + unset={path_vim_status: None}, + ) def _process_delete_db_tasks(self): """ @@ -1396,14 +2044,18 @@ class NsWorker(threading.Thread): task = self.tasks_to_delete[0] vnfrs_deleted = None nsr_id = task["nsr_id"] + if task["target_record"].startswith("vnfrs:"): # check if nsrs is present if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False): vnfrs_deleted = task["target_record"].split(":")[1] + try: self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted) except Exception as e: - self.logger.error("Error deleting task={}: {}".format(task["task_id"], e)) + self.logger.error( + "Error deleting task={}: {}".format(task["task_id"], e) + ) self.tasks_to_delete.pop(0) @staticmethod @@ -1420,29 +2072,45 @@ class NsWorker(threading.Thread): ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) now = time.time() conflict = False + for ro_task in ro_tasks: db_update = {} to_delete_ro_task = True + for index, task in enumerate(ro_task["tasks"]): if not task: pass - elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or \ - (vnfrs_deleted and task["target_record"].startswith("vnfrs:"+vnfrs_deleted)): + elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or ( + vnfrs_deleted + and task["target_record"].startswith("vnfrs:" + vnfrs_deleted) + ): db_update["tasks.{}".format(index)] = None else: - to_delete_ro_task = False # used by other nsr, ro_task cannot be deleted + # used by other nsr, ro_task cannot be deleted + to_delete_ro_task = False + # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed if to_delete_ro_task: - if not db.del_one("ro_tasks", - q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]}, - fail_on_empty=False): + if not db.del_one( + "ro_tasks", + q_filter={ + "_id": ro_task["_id"], + "modified_at": ro_task["modified_at"], + }, + fail_on_empty=False, + ): conflict = True elif db_update: db_update["modified_at"] = now - if not db.set_one("ro_tasks", - q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]}, - update_dict=db_update, - fail_on_empty=False): + if not db.set_one( + "ro_tasks", + q_filter={ + "_id": ro_task["_id"], + "modified_at": ro_task["modified_at"], + }, + update_dict=db_update, + fail_on_empty=False, + ): conflict = True if not conflict: return @@ -1451,27 +2119,40 @@ class NsWorker(threading.Thread): def run(self): # load database - self.logger.debug("Starting") + self.logger.info("Starting") while True: # step 1: get commands from queue try: - task = self.task_queue.get(block=False if self.my_vims else True) + if self.vim_targets: + task = self.task_queue.get(block=False) + else: + if not self.idle: + self.logger.debug("enters in idle state") + self.idle = True + task = self.task_queue.get(block=True) + self.idle = False + if task[0] == "terminate": break elif task[0] == "load_vim": + self.logger.info("order to load vim {}".format(task[1])) self._load_vim(task[1]) elif task[0] == "unload_vim": + self.logger.info("order to unload vim {}".format(task[1])) self._unload_vim(task[1]) elif task[0] == "reload_vim": self._reload_vim(task[1]) elif task[0] == "check_vim": + self.logger.info("order to check vim {}".format(task[1])) self._check_vim(task[1]) continue except Exception as e: if isinstance(e, queue.Empty): pass else: - self.logger.critical("Error processing task: {}".format(e), exc_info=True) + self.logger.critical( + "Error processing task: {}".format(e), exc_info=True + ) # step 2: process pending_tasks, delete not needed tasks try: @@ -1485,6 +2166,8 @@ class NsWorker(threading.Thread): if not busy: time.sleep(5) except Exception as e: - self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) + self.logger.critical( + "Unexpected exception at run: " + str(e), exc_info=True + ) - self.logger.debug("Finishing") + self.logger.info("Finishing")