| # -*- coding: utf-8 -*- |
| |
| ## |
| # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| ## |
| |
| """" |
| This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. |
| The tasks are stored at database in table ro_tasks |
| A single ro_task refers to a VIM element (flavor, image, network, ...). |
| A ro_task can contain several 'tasks', each one with a target, where to store the results |
| """ |
| |
| import threading |
| import time |
| import queue |
| import logging |
| from pkg_resources import iter_entry_points |
| # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version |
| from osm_common.dbbase import DbException |
| # from osm_common.fsbase import FsException |
| # from osm_common.msgbase import MsgException |
| from osm_ro_plugin.vim_dummy import VimDummyConnector |
| from osm_ro_plugin import vimconn |
| from copy import deepcopy |
| from unittest.mock import Mock |
| |
| __author__ = "Alfonso Tierno" |
| __date__ = "$28-Sep-2017 12:07:15$" |
| |
| |
| def deep_get(target_dict, *args, **kwargs): |
| """ |
| Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None |
| Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None |
| :param target_dict: dictionary to be read |
| :param args: list of keys to read from target_dict |
| :param kwargs: only can contain default=value to return if key is not present in the nested dictionary |
| :return: The wanted value if exist, None or default otherwise |
| """ |
| for key in args: |
| if not isinstance(target_dict, dict) or key not in target_dict: |
| return kwargs.get("default") |
| target_dict = target_dict[key] |
| return target_dict |
| |
| |
| class NsWorkerException(Exception): |
| pass |
| |
| |
| class FailingConnector: |
| def __init__(self, error_msg): |
| self.error_msg = error_msg |
| for method in dir(vimconn.VimConnector): |
| if method[0] != "_": |
| setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg))) |
| |
| |
| class NsWorkerExceptionNotFound(NsWorkerException): |
| pass |
| |
| |
| class NsWorker(threading.Thread): |
| REFRESH_BUILD = 5 # 5 seconds |
| REFRESH_ACTIVE = 60 # 1 minute |
| REFRESH_ERROR = 600 |
| REFRESH_IMAGE = 3600 * 10 |
| REFRESH_DELETE = 3600 * 10 |
| QUEUE_SIZE = 2000 |
| # TODO delete assigment_lock = Lock() |
| terminate = False |
| # TODO delete assignment = {} |
| MAX_TIME_LOCKED = 3600 |
| |
| def __init__(self, worker, config, plugins, db): |
| """Init a thread. |
| Arguments: |
| 'id' number of thead |
| 'name' name of thread |
| 'host','user': host ip or name to manage and user |
| 'db', 'db_lock': database class and lock to use it in exclusion |
| """ |
| threading.Thread.__init__(self) |
| self.config = config |
| self.plugins = plugins |
| self.plugin_name = "unknown" |
| self.logger = logging.getLogger('ro.worker{}'.format("worker")) |
| self.worker_id = worker |
| self.task_queue = queue.Queue(self.QUEUE_SIZE) |
| self.my_vims = {} # targetvim: vimplugin class |
| self.db_vims = {} # targetvim: vim information from database |
| self.vim_targets = [] # targetvim list |
| self.my_id = config["process_id"] + ":" + str(worker) |
| self.db = db |
| self.item2create = { |
| "net": self.new_net, |
| "vdu": self.new_vm, |
| "image": self.new_image, |
| "flavor": self.new_flavor, |
| } |
| self.item2refresh = { |
| "net": self.refresh_net, |
| "vdu": self.refresh_vm, |
| "image": self.refresh_ok, |
| "flavor": self.refresh_ok, |
| } |
| self.item2delete = { |
| "net": self.del_net, |
| "vdu": self.del_vm, |
| "image": self.delete_ok, |
| "flavor": self.del_flavor, |
| } |
| self.item2action = { |
| "vdu": self.exec_vm, |
| } |
| self.time_last_task_processed = None |
| |
| def insert_task(self, task): |
| try: |
| self.task_queue.put(task, False) |
| return None |
| except queue.Full: |
| raise NsWorkerException("timeout inserting a task") |
| |
| def terminate(self): |
| self.insert_task("exit") |
| |
| def del_task(self, task): |
| with self.task_lock: |
| if task["status"] == "SCHEDULED": |
| task["status"] = "SUPERSEDED" |
| return True |
| else: # task["status"] == "processing" |
| self.task_lock.release() |
| return False |
| |
| def _load_plugin(self, name, type="vim"): |
| # type can be vim or sdn |
| if "rovim_dummy" not in self.plugins: |
| self.plugins["rovim_dummy"] = VimDummyConnector |
| if name in self.plugins: |
| return self.plugins[name] |
| try: |
| for v in iter_entry_points('osm_ro{}.plugins'.format(type), name): |
| self.plugins[name] = v.load() |
| except Exception as e: |
| self.logger.critical("Cannot load osm_{}: {}".format(name, e)) |
| if name: |
| self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e)) |
| if name and name not in self.plugins: |
| error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \ |
| " registered".format(t=type, n=name) |
| self.logger.critical(error_text) |
| self.plugins[name] = FailingConnector(error_text) |
| |
| return self.plugins[name] |
| |
| def _load_vim(self, vim_account_id): |
| target_id = "vim:" + vim_account_id |
| plugin_name = "" |
| vim = None |
| try: |
| step = "Getting vim={} from db".format(vim_account_id) |
| vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) |
| |
| # if deep_get(vim, "config", "sdn-controller"): |
| # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) |
| # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) |
| |
| step = "Decrypt password" |
| schema_version = vim.get("schema_version") |
| self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'), |
| schema_version=schema_version, salt=vim_account_id) |
| |
| step = "Load plugin 'rovim_{}'".format(vim.get("vim_type")) |
| plugin_name = "rovim_" + vim["vim_type"] |
| vim_module_conn = self._load_plugin(plugin_name) |
| self.my_vims[target_id] = vim_module_conn( |
| uuid=vim['_id'], name=vim['name'], |
| tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'), |
| url=vim['vim_url'], url_admin=None, |
| user=vim['vim_user'], passwd=vim['vim_password'], |
| config=vim.get('config'), persistent_info={} |
| ) |
| self.vim_targets.append(target_id) |
| self.db_vims[target_id] = vim |
| self.error_status = None |
| self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format( |
| vim_account_id, plugin_name)) |
| except Exception as e: |
| self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format( |
| vim_account_id, plugin_name, step, e)) |
| self.db_vims[target_id] = vim or {} |
| self.my_vims[target_id] = FailingConnector(str(e)) |
| self.error_status = "Error loading vimconnector: {}".format(e) |
| |
| def _get_db_task(self): |
| """ |
| Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions |
| :return: None |
| """ |
| now = time.time() |
| if not self.time_last_task_processed: |
| self.time_last_task_processed = now |
| try: |
| while True: |
| locked = self.db.set_one( |
| "ro_tasks", |
| q_filter={"target_id": self.vim_targets, |
| "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| "locked_at.lt": now - self.MAX_TIME_LOCKED, |
| "to_check_at.lt": self.time_last_task_processed}, |
| update_dict={"locked_by": self.my_id, "locked_at": now}, |
| fail_on_empty=False) |
| if locked: |
| # read and return |
| ro_task = self.db.get_one( |
| "ro_tasks", |
| q_filter={"target_id": self.vim_targets, |
| "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| "locked_at": now}) |
| return ro_task |
| if self.time_last_task_processed == now: |
| self.time_last_task_processed = None |
| return None |
| else: |
| self.time_last_task_processed = now |
| # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) |
| |
| except DbException as e: |
| self.logger.error("Database exception at _get_db_task: {}".format(e)) |
| except Exception as e: |
| self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True) |
| return None |
| |
| def _delete_task(self, ro_task, task_index, task_depends, db_update): |
| """ |
| Determine if this task need to be done or superseded |
| :return: None |
| """ |
| my_task = ro_task["tasks"][task_index] |
| task_id = my_task["task_id"] |
| needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False) |
| if my_task["status"] == "FAILED": |
| return None, None # TODO need to be retry?? |
| try: |
| for index, task in enumerate(ro_task["tasks"]): |
| if index == task_index: |
| continue # own task |
| if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE": |
| # set to finished |
| db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED" |
| elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"): |
| needed_delete = False |
| if needed_delete: |
| return self.item2delete[my_task["item"]](ro_task, task_index) |
| else: |
| return "SUPERSEDED", None |
| except Exception as e: |
| if not isinstance(e, NsWorkerException): |
| self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e), |
| exc_info=True) |
| return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| |
| def _create_task(self, ro_task, task_index, task_depends, db_update): |
| """ |
| Determine if this task need to be created |
| :return: None |
| """ |
| my_task = ro_task["tasks"][task_index] |
| task_id = my_task["task_id"] |
| task_status = None |
| if my_task["status"] == "FAILED": |
| return None, None # TODO need to be retry?? |
| elif my_task["status"] == "SCHEDULED": |
| # check if already created by another task |
| for index, task in enumerate(ro_task["tasks"]): |
| if index == task_index: |
| continue # own task |
| if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"): |
| return task["status"], "COPY_VIM_INFO" |
| |
| try: |
| task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends) |
| # TODO update other CREATE tasks |
| except Exception as e: |
| if not isinstance(e, NsWorkerException): |
| self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) |
| task_status = "FAILED" |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| # TODO update ro_vim_item_update |
| return task_status, ro_vim_item_update |
| else: |
| return None, None |
| |
| def _get_dependency(self, task_id, ro_task=None, target_id=None): |
| if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): |
| ro_task_dependency = self.db.get_one( |
| "ro_tasks", |
| q_filter={"target_id": target_id, |
| "tasks.target_record_id": task_id |
| }, |
| fail_on_empty=False) |
| if ro_task_dependency: |
| for task_index, task in enumerate(ro_task_dependency["tasks"]): |
| if task["target_record_id"] == task_id: |
| return ro_task_dependency, task_index |
| |
| else: |
| if ro_task: |
| for task_index, task in enumerate(ro_task["tasks"]): |
| if task["task_id"] == task_id: |
| return ro_task, task_index |
| ro_task_dependency = self.db.get_one( |
| "ro_tasks", |
| q_filter={"tasks.ANYINDEX.task_id": task_id, |
| "tasks.ANYINDEX.target_record.ne": None |
| }, |
| fail_on_empty=False) |
| if ro_task_dependency: |
| for task_index, task in ro_task_dependency["tasks"]: |
| if task["task_id"] == task_id: |
| return ro_task_dependency, task_index |
| raise NsWorkerException("Cannot get depending task {}".format(task_id)) |
| |
| def _proccess_pending_tasks(self, ro_task): |
| ro_task_id = ro_task["_id"] |
| now = time.time() |
| next_check_at = now + (24*60*60) # one day |
| db_ro_task_update = {} |
| |
| def _update_refresh(new_status): |
| # compute next_refresh |
| nonlocal task |
| nonlocal next_check_at |
| nonlocal db_ro_task_update |
| nonlocal ro_task |
| |
| next_refresh = time.time() |
| if task["item"] in ("image", "flavor"): |
| next_refresh += self.REFRESH_IMAGE |
| elif new_status == "BUILD": |
| next_refresh += self.REFRESH_BUILD |
| elif new_status == "DONE": |
| next_refresh += self.REFRESH_ACTIVE |
| else: |
| next_refresh += self.REFRESH_ERROR |
| next_check_at = min(next_check_at, next_refresh) |
| db_ro_task_update["vim_info.refresh_at"] = next_refresh |
| ro_task["vim_info"]["refresh_at"] = next_refresh |
| |
| try: |
| # 0 get task_status_create |
| task_status_create = None |
| task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and |
| t["status"] in ("BUILD", "DONE")), None) |
| if task_create: |
| task_status_create = task_create["status"] |
| # 1. look for SCHEDULED or if CREATE also DONE,BUILD |
| for task_action in ("DELETE", "CREATE", "EXEC"): |
| db_vim_update = None |
| for task_index, task in enumerate(ro_task["tasks"]): |
| target_update = None |
| if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\ |
| task["action"] != task_action or \ |
| (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")): |
| continue |
| task_path = "tasks.{}.status".format(task_index) |
| try: |
| if task["status"] == "SCHEDULED": |
| task_depends = {} |
| # check if tasks that this depends on have been completed |
| dependency_not_completed = False |
| for dependency_task_id in (task.get("depends_on") or ()): |
| dependency_ro_task, dependency_task_index = \ |
| self._get_dependency(dependency_task_id, target_id=ro_task["target_id"]) |
| dependency_task = dependency_ro_task["tasks"][dependency_task_index] |
| if dependency_task["status"] == "SCHEDULED": |
| dependency_not_completed = True |
| next_check_at = min(next_check_at, dependency_ro_task["to_check_at"]) |
| break |
| elif dependency_task["status"] == "FAILED": |
| error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( |
| task["action"], task["item"], dependency_task["action"], |
| dependency_task["item"], dependency_task_id, |
| dependency_ro_task["vim_info"].get("vim_details")) |
| self.logger.error("task={} {}".format(task["task_id"], error_text)) |
| raise NsWorkerException(error_text) |
| |
| task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"] |
| task_depends["TASK-{}".format(dependency_task_id)] = \ |
| dependency_ro_task["vim_info"]["vim_id"] |
| if dependency_not_completed: |
| # TODO set at vim_info.vim_details that it is waiting |
| continue |
| |
| if task["action"] == "DELETE": |
| new_status, db_vim_info_update = self._delete_task(ro_task, task_index, |
| task_depends, db_ro_task_update) |
| new_status = "FINISHED" if new_status == "DONE" else new_status |
| # ^with FINISHED instead of DONE it will not be refreshing |
| if new_status in ("FINISHED", "SUPERSEDED"): |
| target_update = "DELETE" |
| elif task["action"] == "EXEC": |
| self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update) |
| new_status = "FINISHED" if new_status == "DONE" else new_status |
| # ^with FINISHED instead of DONE it will not be refreshing |
| if new_status in ("FINISHED", "SUPERSEDED"): |
| target_update = "DELETE" |
| elif task["action"] == "CREATE": |
| if task["status"] == "SCHEDULED": |
| if task_status_create: |
| new_status = task_status_create |
| target_update = "COPY_VIM_INFO" |
| else: |
| new_status, db_vim_info_update = \ |
| self.item2create[task["item"]](ro_task, task_index, task_depends) |
| # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) |
| _update_refresh(new_status) |
| else: |
| if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]: |
| new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task) |
| _update_refresh(new_status) |
| except Exception as e: |
| new_status = "FAILED" |
| db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| if not isinstance(e, (NsWorkerException, vimconn.VimConnException)): |
| self.logger.error("Unexpected exception at _delete_task task={}: {}". |
| format(task["task_id"], e), exc_info=True) |
| |
| try: |
| if db_vim_info_update: |
| db_vim_update = db_vim_info_update.copy() |
| db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()}) |
| ro_task["vim_info"].update(db_vim_info_update) |
| |
| if new_status: |
| if task_action == "CREATE": |
| task_status_create = new_status |
| db_ro_task_update[task_path] = new_status |
| if target_update or db_vim_update: |
| |
| if target_update == "DELETE": |
| self._update_target(task, None) |
| elif target_update == "COPY_VIM_INFO": |
| self._update_target(task, ro_task["vim_info"]) |
| else: |
| self._update_target(task, db_vim_update) |
| |
| except Exception as e: |
| self.logger.error("Unexpected exception at _update_target task={}: {}". |
| format(task["task_id"], e), exc_info=True) |
| |
| # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, |
| # outside this task (by ro_nbi) do not update it |
| db_ro_task_update["locked_by"] = None |
| # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked |
| db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) |
| db_ro_task_update["to_check_at"] = next_check_at |
| if not self.db.set_one("ro_tasks", |
| update_dict=db_ro_task_update, |
| q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]}, |
| fail_on_empty=False): |
| del db_ro_task_update["to_check_at"] |
| self.db.set_one("ro_tasks", |
| q_filter={"_id": ro_task["_id"]}, |
| update_dict=db_ro_task_update, |
| fail_on_empty=True) |
| except DbException as e: |
| self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e)) |
| except Exception as e: |
| self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True) |
| |
| def _update_target(self, task, ro_vim_item_update): |
| try: |
| table, _id, path = task["target_record"].split(":") |
| if ro_vim_item_update: |
| update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in |
| ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')} |
| if ro_vim_item_update.get("interfaces"): |
| path_vdu = path[:path.rfind(".")] |
| path_vdu = path_vdu[:path_vdu.rfind(".")] |
| path_interfaces = path_vdu + ".interfaces" |
| for i, iface in enumerate(ro_vim_item_update.get("interfaces")): |
| if iface: |
| update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if |
| k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')}) |
| if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): |
| update_dict["ip-address"] = iface.get("ip_address").split(";")[0] |
| if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): |
| update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0] |
| |
| self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) |
| else: |
| self.db.set_one(table, q_filter={"_id": _id}, update_dict=None, |
| unset={path: None}) |
| except DbException as e: |
| self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e)) |
| |
| def new_image(self, ro_task, task_index, task_depends): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| created = False |
| created_items = {} |
| target_vim = self.my_vims[ro_task["target_id"]] |
| try: |
| # FIND |
| if task.get("find_params"): |
| vim_images = target_vim.get_image_list(**task["find_params"]) |
| if not vim_images: |
| raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format( |
| task["find_params"])) |
| elif len(vim_images) > 1: |
| raise NsWorkerException( |
| "More than one network found with this criteria: '{}'".format(task["find_params"])) |
| else: |
| vim_image_id = vim_images[0]["id"] |
| |
| ro_vim_item_update = {"vim_id": vim_image_id, |
| "vim_status": "DONE", |
| "created": created, |
| "created_items": created_items, |
| "vim_details": None} |
| self.logger.debug( |
| "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created)) |
| return "DONE", ro_vim_item_update |
| except (NsWorkerException, vimconn.VimConnException) as e: |
| self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "created": created, |
| "vim_details": str(e)} |
| return "FAILED", ro_vim_item_update |
| |
| def del_flavor(self, ro_task, task_index): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| flavor_vim_id = ro_task["vim_info"]["vim_id"] |
| ro_vim_item_update_ok = {"vim_status": "DELETED", |
| "created": False, |
| "vim_details": "DELETED", |
| "vim_id": None} |
| try: |
| if flavor_vim_id: |
| target_vim = self.my_vims[ro_task["target_id"]] |
| target_vim.delete_flavor(flavor_vim_id) |
| |
| except vimconn.VimConnNotFoundException: |
| ro_vim_item_update_ok["vim_details"] = "already deleted" |
| |
| except vimconn.VimConnException as e: |
| self.logger.error("ro_task={} vim={} del-flavor={}: {}".format( |
| ro_task["_id"], ro_task["target_id"], flavor_vim_id, e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "vim_details": "Error while deleting: {}".format(e)} |
| return "FAILED", ro_vim_item_update |
| |
| self.logger.debug("task={} {} del-flavor={} {}".format( |
| task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", ""))) |
| return "DONE", ro_vim_item_update_ok |
| |
| def refresh_ok(self, ro_task): |
| """skip calling VIM to get image status. Assumes ok""" |
| if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": |
| return "FAILED", {} |
| return "DONE", {} |
| |
| def delete_ok(self, ro_task): |
| """skip calling VIM to delete image status. Assumes ok""" |
| return "DONE", {} |
| |
| def new_flavor(self, ro_task, task_index, task_depends): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| created = False |
| created_items = {} |
| target_vim = self.my_vims[ro_task["target_id"]] |
| try: |
| # FIND |
| vim_flavor_id = None |
| if task.get("find_params"): |
| try: |
| flavor_data = task["find_params"]["flavor_data"] |
| vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) |
| except vimconn.VimConnNotFoundException: |
| pass |
| |
| if not vim_flavor_id and task.get("params"): |
| # CREATE |
| flavor_data = task["params"]["flavor_data"] |
| vim_flavor_id = target_vim.new_flavor(flavor_data) |
| created = True |
| |
| ro_vim_item_update = {"vim_id": vim_flavor_id, |
| "vim_status": "DONE", |
| "created": created, |
| "created_items": created_items, |
| "vim_details": None} |
| self.logger.debug( |
| "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created)) |
| return "DONE", ro_vim_item_update |
| except (vimconn.VimConnException, NsWorkerException) as e: |
| self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "created": created, |
| "vim_details": str(e)} |
| return "FAILED", ro_vim_item_update |
| |
| def new_net(self, ro_task, task_index, task_depends): |
| vim_net_id = None |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| created = False |
| created_items = {} |
| target_vim = self.my_vims[ro_task["target_id"]] |
| try: |
| # FIND |
| if task.get("find_params"): |
| # if management, get configuration of VIM |
| if task["find_params"].get("filter_dict"): |
| vim_filter = task["find_params"]["filter_dict"] |
| elif task["find_params"].get("mgmt"): # mamagement network |
| if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"): |
| vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]} |
| elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"): |
| vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]} |
| else: |
| vim_filter = {"name": task["find_params"]["name"]} |
| else: |
| raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"])) |
| |
| vim_nets = target_vim.get_network_list(vim_filter) |
| if not vim_nets and not task.get("params"): |
| raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format( |
| task.get("find_params"))) |
| elif len(vim_nets) > 1: |
| raise NsWorkerException( |
| "More than one network found with this criteria: '{}'".format(task["find_params"])) |
| if vim_nets: |
| vim_net_id = vim_nets[0]["id"] |
| else: |
| # CREATE |
| params = task["params"] |
| vim_net_id, created_items = target_vim.new_network(**params) |
| created = True |
| |
| ro_vim_item_update = {"vim_id": vim_net_id, |
| "vim_status": "BUILD", |
| "created": created, |
| "created_items": created_items, |
| "vim_details": None} |
| self.logger.debug( |
| "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created)) |
| return "BUILD", ro_vim_item_update |
| except (vimconn.VimConnException, NsWorkerException) as e: |
| self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "created": created, |
| "vim_details": str(e)} |
| return "FAILED", ro_vim_item_update |
| |
| def refresh_net(self, ro_task): |
| """Call VIM to get network status""" |
| ro_task_id = ro_task["_id"] |
| target_vim = self.my_vims[ro_task["target_id"]] |
| |
| vim_id = ro_task["vim_info"]["vim_id"] |
| net_to_refresh_list = [vim_id] |
| try: |
| vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) |
| vim_info = vim_dict[vim_id] |
| if vim_info["status"] == "ACTIVE": |
| task_status = "DONE" |
| elif vim_info["status"] == "BUILD": |
| task_status = "BUILD" |
| else: |
| task_status = "FAILED" |
| except vimconn.VimConnException as e: |
| # Mark all tasks at VIM_ERROR status |
| self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) |
| vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} |
| task_status = "FAILED" |
| |
| ro_vim_item_update = {} |
| if ro_task["vim_info"]["vim_status"] != vim_info["status"]: |
| ro_vim_item_update["vim_status"] = vim_info["status"] |
| if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): |
| ro_vim_item_update["vim_name"] = vim_info.get("name") |
| if vim_info["status"] in ("ERROR", "VIM_ERROR"): |
| if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: |
| ro_vim_item_update["vim_details"] = vim_info["error_msg"] |
| elif vim_info["status"] == "DELETED": |
| ro_vim_item_update["vim_id"] = None |
| ro_vim_item_update["vim_details"] = "Deleted externally" |
| else: |
| if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: |
| ro_vim_item_update["vim_details"] = vim_info["vim_info"] |
| if ro_vim_item_update: |
| self.logger.debug("ro_task={} {} get-net={}: status={} {}".format( |
| ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), |
| ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) |
| return task_status, ro_vim_item_update |
| |
| def del_net(self, ro_task, task_index): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| net_vim_id = ro_task["vim_info"]["vim_id"] |
| ro_vim_item_update_ok = {"vim_status": "DELETED", |
| "created": False, |
| "vim_details": "DELETED", |
| "vim_id": None} |
| try: |
| if net_vim_id or ro_task["vim_info"]["created_items"]: |
| target_vim = self.my_vims[ro_task["target_id"]] |
| target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"]) |
| |
| except vimconn.VimConnNotFoundException: |
| ro_vim_item_update_ok["vim_details"] = "already deleted" |
| |
| except vimconn.VimConnException as e: |
| self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"], |
| net_vim_id, e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "vim_details": "Error while deleting: {}".format(e)} |
| return "FAILED", ro_vim_item_update |
| |
| self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id, |
| ro_vim_item_update_ok.get("vim_details", ""))) |
| return "DONE", ro_vim_item_update_ok |
| |
| def new_vm(self, ro_task, task_index, task_depends): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| created = False |
| created_items = {} |
| target_vim = self.my_vims[ro_task["target_id"]] |
| try: |
| created = True |
| params = task["params"] |
| params_copy = deepcopy(params) |
| net_list = params_copy["net_list"] |
| for net in net_list: |
| if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id |
| network_id = task_depends[net["net_id"]] |
| if not network_id: |
| raise NsWorkerException("Cannot create VM because depends on a network not created or found " |
| "for {}".format(net["net_id"])) |
| net["net_id"] = network_id |
| if params_copy["image_id"].startswith("TASK-"): |
| params_copy["image_id"] = task_depends[params_copy["image_id"]] |
| if params_copy["flavor_id"].startswith("TASK-"): |
| params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] |
| |
| vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) |
| interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] |
| |
| ro_vim_item_update = {"vim_id": vim_vm_id, |
| "vim_status": "BUILD", |
| "created": created, |
| "created_items": created_items, |
| "vim_details": None, |
| "interfaces_vim_ids": interfaces, |
| "interfaces": [], |
| } |
| self.logger.debug( |
| "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created)) |
| return "BUILD", ro_vim_item_update |
| except (vimconn.VimConnException, NsWorkerException) as e: |
| self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "created": created, |
| "vim_details": str(e)} |
| return "FAILED", ro_vim_item_update |
| |
| def del_vm(self, ro_task, task_index): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| vm_vim_id = ro_task["vim_info"]["vim_id"] |
| ro_vim_item_update_ok = {"vim_status": "DELETED", |
| "created": False, |
| "vim_details": "DELETED", |
| "vim_id": None} |
| try: |
| if vm_vim_id or ro_task["vim_info"]["created_items"]: |
| target_vim = self.my_vims[ro_task["target_id"]] |
| target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"]) |
| |
| except vimconn.VimConnNotFoundException: |
| ro_vim_item_update_ok["vim_details"] = "already deleted" |
| |
| except vimconn.VimConnException as e: |
| self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"], |
| vm_vim_id, e)) |
| ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| "vim_details": "Error while deleting: {}".format(e)} |
| return "FAILED", ro_vim_item_update |
| |
| self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id, |
| ro_vim_item_update_ok.get("vim_details", ""))) |
| return "DONE", ro_vim_item_update_ok |
| |
| def refresh_vm(self, ro_task): |
| """Call VIM to get vm status""" |
| ro_task_id = ro_task["_id"] |
| target_vim = self.my_vims[ro_task["target_id"]] |
| |
| vim_id = ro_task["vim_info"]["vim_id"] |
| if not vim_id: |
| return None, None |
| vm_to_refresh_list = [vim_id] |
| try: |
| vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) |
| vim_info = vim_dict[vim_id] |
| if vim_info["status"] == "ACTIVE": |
| task_status = "DONE" |
| elif vim_info["status"] == "BUILD": |
| task_status = "BUILD" |
| else: |
| task_status = "FAILED" |
| except vimconn.VimConnException as e: |
| # Mark all tasks at VIM_ERROR status |
| self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) |
| vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} |
| task_status = "FAILED" |
| |
| ro_vim_item_update = {} |
| # TODO check and update interfaces |
| vim_interfaces = [] |
| for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: |
| iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None) |
| # if iface: |
| # iface.pop("vim_info", None) |
| vim_interfaces.append(iface) |
| |
| task = ro_task["tasks"][0] # TODO look for a task CREATE and active |
| if task.get("mgmt_vnf_interface") is not None: |
| vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True |
| mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0)) |
| vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True |
| |
| if ro_task["vim_info"]["interfaces"] != vim_interfaces: |
| ro_vim_item_update["interfaces"] = vim_interfaces |
| if ro_task["vim_info"]["vim_status"] != vim_info["status"]: |
| ro_vim_item_update["vim_status"] = vim_info["status"] |
| if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): |
| ro_vim_item_update["vim_name"] = vim_info.get("name") |
| if vim_info["status"] in ("ERROR", "VIM_ERROR"): |
| if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: |
| ro_vim_item_update["vim_details"] = vim_info["error_msg"] |
| elif vim_info["status"] == "DELETED": |
| ro_vim_item_update["vim_id"] = None |
| ro_vim_item_update["vim_details"] = "Deleted externally" |
| else: |
| if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: |
| ro_vim_item_update["vim_details"] = vim_info["vim_info"] |
| if ro_vim_item_update: |
| self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format( |
| ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), |
| ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) |
| return task_status, ro_vim_item_update |
| |
| def exec_vm(self, ro_task, task_index, task_depends): |
| task = ro_task["tasks"][task_index] |
| task_id = task["task_id"] |
| target_vim = self.my_vims[ro_task["target_id"]] |
| try: |
| params = task["params"] |
| params_copy = deepcopy(params) |
| params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"), |
| params_copy.pop("schema_version"), params_copy.pop("salt")) |
| |
| target_vim.inject_user_key(**params_copy) |
| self.logger.debug( |
| "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])) |
| return "DONE", params_copy["key"] |
| except (vimconn.VimConnException, NsWorkerException) as e: |
| self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) |
| ro_vim_item_update = {"vim_details": str(e)} |
| return "FAILED", ro_vim_item_update |
| |
| def run(self): |
| # load database |
| self.logger.debug("Starting") |
| while True: |
| try: |
| task = self.task_queue.get(block=False if self.my_vims else True) |
| if task[0] == "terminate": |
| break |
| if task[0] == "load_vim": |
| self._load_vim(task[1]) |
| continue |
| except queue.Empty: |
| pass |
| |
| try: |
| busy = False |
| ro_task = self._get_db_task() |
| if ro_task: |
| self._proccess_pending_tasks(ro_task) |
| busy = True |
| if not busy: |
| time.sleep(5) |
| except Exception as e: |
| self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) |
| |
| self.logger.debug("Finishing") |