X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;fp=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=0b96c53619cdfbed424b0f21027250f9705ce773;hb=1d213f4c8825da8347ffb07b3eaa1315f0fab698;hp=0000000000000000000000000000000000000000;hpb=6114d94a7ad6da980b1f4b595f60e4cabe05526b;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py new file mode 100644 index 00000000..0b96c536 --- /dev/null +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -0,0 +1,919 @@ +# -*- coding: utf-8 -*- + +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +## + +"""" +This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. +The tasks are stored at database in table ro_tasks +A single ro_task refers to a VIM element (flavor, image, network, ...). +A ro_task can contain several 'tasks', each one with a target, where to store the results +""" + +import threading +import time +import queue +import logging +from pkg_resources import iter_entry_points +# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version +from osm_common.dbbase import DbException +# from osm_common.fsbase import FsException +# from osm_common.msgbase import MsgException +from osm_ro_plugin.vim_dummy import VimDummyConnector +from osm_ro_plugin import vimconn +from copy import deepcopy +from unittest.mock import Mock + +__author__ = "Alfonso Tierno" +__date__ = "$28-Sep-2017 12:07:15$" + + +def deep_get(target_dict, *args, **kwargs): + """ + Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None + Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None + :param target_dict: dictionary to be read + :param args: list of keys to read from target_dict + :param kwargs: only can contain default=value to return if key is not present in the nested dictionary + :return: The wanted value if exist, None or default otherwise + """ + for key in args: + if not isinstance(target_dict, dict) or key not in target_dict: + return kwargs.get("default") + target_dict = target_dict[key] + return target_dict + + +class NsWorkerException(Exception): + pass + + +class FailingConnector: + def __init__(self, error_msg): + self.error_msg = error_msg + for method in dir(vimconn.VimConnector): + if method[0] != "_": + setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg))) + + +class NsWorkerExceptionNotFound(NsWorkerException): + pass + + +class NsWorker(threading.Thread): + REFRESH_BUILD = 5 # 5 seconds + REFRESH_ACTIVE = 60 # 1 minute + REFRESH_ERROR = 600 + REFRESH_IMAGE = 3600 * 10 + REFRESH_DELETE = 3600 * 10 + QUEUE_SIZE = 2000 + # TODO delete assigment_lock = Lock() + terminate = False + # TODO delete assignment = {} + MAX_TIME_LOCKED = 3600 + + def __init__(self, worker, config, plugins, db): + """Init a thread. + Arguments: + 'id' number of thead + 'name' name of thread + 'host','user': host ip or name to manage and user + 'db', 'db_lock': database class and lock to use it in exclusion + """ + threading.Thread.__init__(self) + self.config = config + self.plugins = plugins + self.plugin_name = "unknown" + self.logger = logging.getLogger('ro.worker{}'.format("worker")) + self.worker_id = worker + self.task_queue = queue.Queue(self.QUEUE_SIZE) + self.my_vims = {} # targetvim: vimplugin class + self.db_vims = {} # targetvim: vim information from database + self.vim_targets = [] # targetvim list + self.my_id = config["process_id"] + ":" + str(worker) + self.db = db + self.item2create = { + "net": self.new_net, + "vdu": self.new_vm, + "image": self.new_image, + "flavor": self.new_flavor, + } + self.item2refresh = { + "net": self.refresh_net, + "vdu": self.refresh_vm, + "image": self.refresh_ok, + "flavor": self.refresh_ok, + } + self.item2delete = { + "net": self.del_net, + "vdu": self.del_vm, + "image": self.delete_ok, + "flavor": self.del_flavor, + } + self.item2action = { + "vdu": self.exec_vm, + } + self.time_last_task_processed = None + + def insert_task(self, task): + try: + self.task_queue.put(task, False) + return None + except queue.Full: + raise NsWorkerException("timeout inserting a task") + + def terminate(self): + self.insert_task("exit") + + def del_task(self, task): + with self.task_lock: + if task["status"] == "SCHEDULED": + task["status"] = "SUPERSEDED" + return True + else: # task["status"] == "processing" + self.task_lock.release() + return False + + def _load_plugin(self, name, type="vim"): + # type can be vim or sdn + if "rovim_dummy" not in self.plugins: + self.plugins["rovim_dummy"] = VimDummyConnector + if name in self.plugins: + return self.plugins[name] + try: + for v in iter_entry_points('osm_ro{}.plugins'.format(type), name): + self.plugins[name] = v.load() + except Exception as e: + self.logger.critical("Cannot load osm_{}: {}".format(name, e)) + if name: + self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e)) + if name and name not in self.plugins: + error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \ + " registered".format(t=type, n=name) + self.logger.critical(error_text) + self.plugins[name] = FailingConnector(error_text) + + return self.plugins[name] + + def _load_vim(self, vim_account_id): + target_id = "vim:" + vim_account_id + plugin_name = "" + vim = None + try: + step = "Getting vim={} from db".format(vim_account_id) + vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + + # if deep_get(vim, "config", "sdn-controller"): + # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) + # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) + + step = "Decrypt password" + schema_version = vim.get("schema_version") + self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'), + schema_version=schema_version, salt=vim_account_id) + + step = "Load plugin 'rovim_{}'".format(vim.get("vim_type")) + plugin_name = "rovim_" + vim["vim_type"] + vim_module_conn = self._load_plugin(plugin_name) + self.my_vims[target_id] = vim_module_conn( + uuid=vim['_id'], name=vim['name'], + tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'), + url=vim['vim_url'], url_admin=None, + user=vim['vim_user'], passwd=vim['vim_password'], + config=vim.get('config'), persistent_info={} + ) + self.vim_targets.append(target_id) + self.db_vims[target_id] = vim + self.error_status = None + self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format( + vim_account_id, plugin_name)) + except Exception as e: + self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format( + vim_account_id, plugin_name, step, e)) + self.db_vims[target_id] = vim or {} + self.my_vims[target_id] = FailingConnector(str(e)) + self.error_status = "Error loading vimconnector: {}".format(e) + + def _get_db_task(self): + """ + Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions + :return: None + """ + now = time.time() + if not self.time_last_task_processed: + self.time_last_task_processed = now + try: + while True: + locked = self.db.set_one( + "ro_tasks", + q_filter={"target_id": self.vim_targets, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at.lt": now - self.MAX_TIME_LOCKED, + "to_check_at.lt": self.time_last_task_processed}, + update_dict={"locked_by": self.my_id, "locked_at": now}, + fail_on_empty=False) + if locked: + # read and return + ro_task = self.db.get_one( + "ro_tasks", + q_filter={"target_id": self.vim_targets, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at": now}) + return ro_task + if self.time_last_task_processed == now: + self.time_last_task_processed = None + return None + else: + self.time_last_task_processed = now + # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) + + except DbException as e: + self.logger.error("Database exception at _get_db_task: {}".format(e)) + except Exception as e: + self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True) + return None + + def _delete_task(self, ro_task, task_index, task_depends, db_update): + """ + Determine if this task need to be done or superseded + :return: None + """ + my_task = ro_task["tasks"][task_index] + task_id = my_task["task_id"] + needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False) + if my_task["status"] == "FAILED": + return None, None # TODO need to be retry?? + try: + for index, task in enumerate(ro_task["tasks"]): + if index == task_index: + continue # own task + if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE": + # set to finished + db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED" + elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"): + needed_delete = False + if needed_delete: + return self.item2delete[my_task["item"]](ro_task, task_index) + else: + return "SUPERSEDED", None + except Exception as e: + if not isinstance(e, NsWorkerException): + self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e), + exc_info=True) + return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} + + def _create_task(self, ro_task, task_index, task_depends, db_update): + """ + Determine if this task need to be created + :return: None + """ + my_task = ro_task["tasks"][task_index] + task_id = my_task["task_id"] + task_status = None + if my_task["status"] == "FAILED": + return None, None # TODO need to be retry?? + elif my_task["status"] == "SCHEDULED": + # check if already created by another task + for index, task in enumerate(ro_task["tasks"]): + if index == task_index: + continue # own task + if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"): + return task["status"], "COPY_VIM_INFO" + + try: + task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends) + # TODO update other CREATE tasks + except Exception as e: + if not isinstance(e, NsWorkerException): + self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) + task_status = "FAILED" + ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + # TODO update ro_vim_item_update + return task_status, ro_vim_item_update + else: + return None, None + + def _get_dependency(self, task_id, ro_task=None, target_id=None): + if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): + ro_task_dependency = self.db.get_one( + "ro_tasks", + q_filter={"target_id": target_id, + "tasks.target_record_id": task_id + }, + fail_on_empty=False) + if ro_task_dependency: + for task_index, task in enumerate(ro_task_dependency["tasks"]): + if task["target_record_id"] == task_id: + return ro_task_dependency, task_index + + else: + if ro_task: + for task_index, task in enumerate(ro_task["tasks"]): + if task["task_id"] == task_id: + return ro_task, task_index + ro_task_dependency = self.db.get_one( + "ro_tasks", + q_filter={"tasks.ANYINDEX.task_id": task_id, + "tasks.ANYINDEX.target_record.ne": None + }, + fail_on_empty=False) + if ro_task_dependency: + for task_index, task in ro_task_dependency["tasks"]: + if task["task_id"] == task_id: + return ro_task_dependency, task_index + raise NsWorkerException("Cannot get depending task {}".format(task_id)) + + def _proccess_pending_tasks(self, ro_task): + ro_task_id = ro_task["_id"] + now = time.time() + next_check_at = now + (24*60*60) # one day + db_ro_task_update = {} + + def _update_refresh(new_status): + # compute next_refresh + nonlocal task + nonlocal next_check_at + nonlocal db_ro_task_update + nonlocal ro_task + + next_refresh = time.time() + if task["item"] in ("image", "flavor"): + next_refresh += self.REFRESH_IMAGE + elif new_status == "BUILD": + next_refresh += self.REFRESH_BUILD + elif new_status == "DONE": + next_refresh += self.REFRESH_ACTIVE + else: + next_refresh += self.REFRESH_ERROR + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + ro_task["vim_info"]["refresh_at"] = next_refresh + + try: + # 0 get task_status_create + task_status_create = None + task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and + t["status"] in ("BUILD", "DONE")), None) + if task_create: + task_status_create = task_create["status"] + # 1. look for SCHEDULED or if CREATE also DONE,BUILD + for task_action in ("DELETE", "CREATE", "EXEC"): + db_vim_update = None + for task_index, task in enumerate(ro_task["tasks"]): + target_update = None + if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\ + task["action"] != task_action or \ + (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")): + continue + task_path = "tasks.{}.status".format(task_index) + try: + if task["status"] == "SCHEDULED": + task_depends = {} + # check if tasks that this depends on have been completed + dependency_not_completed = False + for dependency_task_id in (task.get("depends_on") or ()): + dependency_ro_task, dependency_task_index = \ + self._get_dependency(dependency_task_id, target_id=ro_task["target_id"]) + dependency_task = dependency_ro_task["tasks"][dependency_task_index] + if dependency_task["status"] == "SCHEDULED": + dependency_not_completed = True + next_check_at = min(next_check_at, dependency_ro_task["to_check_at"]) + break + elif dependency_task["status"] == "FAILED": + error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( + task["action"], task["item"], dependency_task["action"], + dependency_task["item"], dependency_task_id, + dependency_ro_task["vim_info"].get("vim_details")) + self.logger.error("task={} {}".format(task["task_id"], error_text)) + raise NsWorkerException(error_text) + + task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"] + task_depends["TASK-{}".format(dependency_task_id)] = \ + dependency_ro_task["vim_info"]["vim_id"] + if dependency_not_completed: + # TODO set at vim_info.vim_details that it is waiting + continue + + if task["action"] == "DELETE": + new_status, db_vim_info_update = self._delete_task(ro_task, task_index, + task_depends, db_ro_task_update) + new_status = "FINISHED" if new_status == "DONE" else new_status + # ^with FINISHED instead of DONE it will not be refreshing + if new_status in ("FINISHED", "SUPERSEDED"): + target_update = "DELETE" + elif task["action"] == "EXEC": + self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update) + new_status = "FINISHED" if new_status == "DONE" else new_status + # ^with FINISHED instead of DONE it will not be refreshing + if new_status in ("FINISHED", "SUPERSEDED"): + target_update = "DELETE" + elif task["action"] == "CREATE": + if task["status"] == "SCHEDULED": + if task_status_create: + new_status = task_status_create + target_update = "COPY_VIM_INFO" + else: + new_status, db_vim_info_update = \ + self.item2create[task["item"]](ro_task, task_index, task_depends) + # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) + _update_refresh(new_status) + else: + if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]: + new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task) + _update_refresh(new_status) + except Exception as e: + new_status = "FAILED" + db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + if not isinstance(e, (NsWorkerException, vimconn.VimConnException)): + self.logger.error("Unexpected exception at _delete_task task={}: {}". + format(task["task_id"], e), exc_info=True) + + try: + if db_vim_info_update: + db_vim_update = db_vim_info_update.copy() + db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()}) + ro_task["vim_info"].update(db_vim_info_update) + + if new_status: + if task_action == "CREATE": + task_status_create = new_status + db_ro_task_update[task_path] = new_status + if target_update or db_vim_update: + + if target_update == "DELETE": + self._update_target(task, None) + elif target_update == "COPY_VIM_INFO": + self._update_target(task, ro_task["vim_info"]) + else: + self._update_target(task, db_vim_update) + + except Exception as e: + self.logger.error("Unexpected exception at _update_target task={}: {}". + format(task["task_id"], e), exc_info=True) + + # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, + # outside this task (by ro_nbi) do not update it + db_ro_task_update["locked_by"] = None + # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked + db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) + db_ro_task_update["to_check_at"] = next_check_at + if not self.db.set_one("ro_tasks", + update_dict=db_ro_task_update, + q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]}, + fail_on_empty=False): + del db_ro_task_update["to_check_at"] + self.db.set_one("ro_tasks", + q_filter={"_id": ro_task["_id"]}, + update_dict=db_ro_task_update, + fail_on_empty=True) + except DbException as e: + self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e)) + except Exception as e: + self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True) + + def _update_target(self, task, ro_vim_item_update): + try: + table, _id, path = task["target_record"].split(":") + if ro_vim_item_update: + update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in + ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')} + if ro_vim_item_update.get("interfaces"): + path_vdu = path[:path.rfind(".")] + path_vdu = path_vdu[:path_vdu.rfind(".")] + path_interfaces = path_vdu + ".interfaces" + for i, iface in enumerate(ro_vim_item_update.get("interfaces")): + if iface: + update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if + k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')}) + if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): + update_dict["ip-address"] = iface.get("ip_address").split(";")[0] + if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): + update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0] + + self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) + else: + self.db.set_one(table, q_filter={"_id": _id}, update_dict=None, + unset={path: None}) + except DbException as e: + self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e)) + + def new_image(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + if task.get("find_params"): + vim_images = target_vim.get_image_list(**task["find_params"]) + if not vim_images: + raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format( + task["find_params"])) + elif len(vim_images) > 1: + raise NsWorkerException( + "More than one network found with this criteria: '{}'".format(task["find_params"])) + else: + vim_image_id = vim_images[0]["id"] + + ro_vim_item_update = {"vim_id": vim_image_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created)) + return "DONE", ro_vim_item_update + except (NsWorkerException, vimconn.VimConnException) as e: + self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def del_flavor(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + flavor_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if flavor_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_flavor(flavor_vim_id) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-flavor={}: {}".format( + ro_task["_id"], ro_task["target_id"], flavor_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-flavor={} {}".format( + task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def refresh_ok(self, ro_task): + """skip calling VIM to get image status. Assumes ok""" + if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": + return "FAILED", {} + return "DONE", {} + + def delete_ok(self, ro_task): + """skip calling VIM to delete image status. Assumes ok""" + return "DONE", {} + + def new_flavor(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + vim_flavor_id = None + if task.get("find_params"): + try: + flavor_data = task["find_params"]["flavor_data"] + vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) + except vimconn.VimConnNotFoundException: + pass + + if not vim_flavor_id and task.get("params"): + # CREATE + flavor_data = task["params"]["flavor_data"] + vim_flavor_id = target_vim.new_flavor(flavor_data) + created = True + + ro_vim_item_update = {"vim_id": vim_flavor_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created)) + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def new_net(self, ro_task, task_index, task_depends): + vim_net_id = None + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + if task.get("find_params"): + # if management, get configuration of VIM + if task["find_params"].get("filter_dict"): + vim_filter = task["find_params"]["filter_dict"] + elif task["find_params"].get("mgmt"): # mamagement network + if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"): + vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]} + elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"): + vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]} + else: + vim_filter = {"name": task["find_params"]["name"]} + else: + raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"])) + + vim_nets = target_vim.get_network_list(vim_filter) + if not vim_nets and not task.get("params"): + raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format( + task.get("find_params"))) + elif len(vim_nets) > 1: + raise NsWorkerException( + "More than one network found with this criteria: '{}'".format(task["find_params"])) + if vim_nets: + vim_net_id = vim_nets[0]["id"] + else: + # CREATE + params = task["params"] + vim_net_id, created_items = target_vim.new_network(**params) + created = True + + ro_vim_item_update = {"vim_id": vim_net_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created)) + return "BUILD", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def refresh_net(self, ro_task): + """Call VIM to get network status""" + ro_task_id = ro_task["_id"] + target_vim = self.my_vims[ro_task["target_id"]] + + vim_id = ro_task["vim_info"]["vim_id"] + net_to_refresh_list = [vim_id] + try: + vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) + vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": + task_status = "DONE" + elif vim_info["status"] == "BUILD": + task_status = "BUILD" + else: + task_status = "FAILED" + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + task_status = "FAILED" + + ro_vim_item_update = {} + if ro_task["vim_info"]["vim_status"] != vim_info["status"]: + ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): + ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): + if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: + ro_vim_item_update["vim_details"] = vim_info["error_msg"] + elif vim_info["status"] == "DELETED": + ro_vim_item_update["vim_id"] = None + ro_vim_item_update["vim_details"] = "Deleted externally" + else: + if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: + ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: + self.logger.debug("ro_task={} {} get-net={}: status={} {}".format( + ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + return task_status, ro_vim_item_update + + def del_net(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + net_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if net_vim_id or ro_task["vim_info"]["created_items"]: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"]) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"], + net_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id, + ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def new_vm(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + net_list = params_copy["net_list"] + for net in net_list: + if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id + network_id = task_depends[net["net_id"]] + if not network_id: + raise NsWorkerException("Cannot create VM because depends on a network not created or found " + "for {}".format(net["net_id"])) + net["net_id"] = network_id + if params_copy["image_id"].startswith("TASK-"): + params_copy["image_id"] = task_depends[params_copy["image_id"]] + if params_copy["flavor_id"].startswith("TASK-"): + params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] + + vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) + interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] + + ro_vim_item_update = {"vim_id": vim_vm_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None, + "interfaces_vim_ids": interfaces, + "interfaces": [], + } + self.logger.debug( + "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created)) + return "BUILD", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def del_vm(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + vm_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if vm_vim_id or ro_task["vim_info"]["created_items"]: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"]) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"], + vm_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id, + ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def refresh_vm(self, ro_task): + """Call VIM to get vm status""" + ro_task_id = ro_task["_id"] + target_vim = self.my_vims[ro_task["target_id"]] + + vim_id = ro_task["vim_info"]["vim_id"] + if not vim_id: + return None, None + vm_to_refresh_list = [vim_id] + try: + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": + task_status = "DONE" + elif vim_info["status"] == "BUILD": + task_status = "BUILD" + else: + task_status = "FAILED" + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + task_status = "FAILED" + + ro_vim_item_update = {} + # TODO check and update interfaces + vim_interfaces = [] + for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: + iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None) + # if iface: + # iface.pop("vim_info", None) + vim_interfaces.append(iface) + + task = ro_task["tasks"][0] # TODO look for a task CREATE and active + if task.get("mgmt_vnf_interface") is not None: + vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True + mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0)) + vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True + + if ro_task["vim_info"]["interfaces"] != vim_interfaces: + ro_vim_item_update["interfaces"] = vim_interfaces + if ro_task["vim_info"]["vim_status"] != vim_info["status"]: + ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): + ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): + if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: + ro_vim_item_update["vim_details"] = vim_info["error_msg"] + elif vim_info["status"] == "DELETED": + ro_vim_item_update["vim_id"] = None + ro_vim_item_update["vim_details"] = "Deleted externally" + else: + if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: + ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: + self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format( + ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + return task_status, ro_vim_item_update + + def exec_vm(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + target_vim = self.my_vims[ro_task["target_id"]] + try: + params = task["params"] + params_copy = deepcopy(params) + params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"), + params_copy.pop("schema_version"), params_copy.pop("salt")) + + target_vim.inject_user_key(**params_copy) + self.logger.debug( + "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])) + return "DONE", params_copy["key"] + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def run(self): + # load database + self.logger.debug("Starting") + while True: + try: + task = self.task_queue.get(block=False if self.my_vims else True) + if task[0] == "terminate": + break + if task[0] == "load_vim": + self._load_vim(task[1]) + continue + except queue.Empty: + pass + + try: + busy = False + ro_task = self._get_db_task() + if ro_task: + self._proccess_pending_tasks(ro_task) + busy = True + if not busy: + time.sleep(5) + except Exception as e: + self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) + + self.logger.debug("Finishing")