Feature 7184 New Generation RO
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py
new file mode 100644 (file)
index 0000000..0b96c53
--- /dev/null
@@ -0,0 +1,919 @@
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+##
+
+""""
+This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
+The tasks are stored at database in table ro_tasks
+A single ro_task refers to a VIM element (flavor, image, network, ...).
+A ro_task can contain several 'tasks', each one with a target, where to store the results
+"""
+
+import threading
+import time
+import queue
+import logging
+from pkg_resources import iter_entry_points
+# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common.dbbase import DbException
+# from osm_common.fsbase import FsException
+# from osm_common.msgbase import MsgException
+from osm_ro_plugin.vim_dummy import VimDummyConnector
+from osm_ro_plugin import vimconn
+from copy import deepcopy
+from unittest.mock import Mock
+
+__author__ = "Alfonso Tierno"
+__date__ = "$28-Sep-2017 12:07:15$"
+
+
+def deep_get(target_dict, *args, **kwargs):
+    """
+    Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
+    Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
+    :param target_dict: dictionary to be read
+    :param args: list of keys to read from  target_dict
+    :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
+    :return: The wanted value if exist, None or default otherwise
+    """
+    for key in args:
+        if not isinstance(target_dict, dict) or key not in target_dict:
+            return kwargs.get("default")
+        target_dict = target_dict[key]
+    return target_dict
+
+
+class NsWorkerException(Exception):
+    pass
+
+
+class FailingConnector:
+    def __init__(self, error_msg):
+        self.error_msg = error_msg
+        for method in dir(vimconn.VimConnector):
+            if method[0] != "_":
+                setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
+
+
+class NsWorkerExceptionNotFound(NsWorkerException):
+    pass
+
+
+class NsWorker(threading.Thread):
+    REFRESH_BUILD = 5  # 5 seconds
+    REFRESH_ACTIVE = 60  # 1 minute
+    REFRESH_ERROR = 600
+    REFRESH_IMAGE = 3600 * 10
+    REFRESH_DELETE = 3600 * 10
+    QUEUE_SIZE = 2000
+    # TODO delete assigment_lock = Lock()
+    terminate = False
+    # TODO delete assignment = {}
+    MAX_TIME_LOCKED = 3600
+
+    def __init__(self, worker, config, plugins, db):
+        """Init a thread.
+        Arguments:
+            'id' number of thead
+            'name' name of thread
+            'host','user':  host ip or name to manage and user
+            'db', 'db_lock': database class and lock to use it in exclusion
+        """
+        threading.Thread.__init__(self)
+        self.config = config
+        self.plugins = plugins
+        self.plugin_name = "unknown"
+        self.logger = logging.getLogger('ro.worker{}'.format("worker"))
+        self.worker_id = worker
+        self.task_queue = queue.Queue(self.QUEUE_SIZE)
+        self.my_vims = {}   # targetvim: vimplugin class
+        self.db_vims = {}   # targetvim: vim information from database
+        self.vim_targets = []   # targetvim list
+        self.my_id = config["process_id"] + ":" + str(worker)
+        self.db = db
+        self.item2create = {
+            "net": self.new_net,
+            "vdu": self.new_vm,
+            "image": self.new_image,
+            "flavor": self.new_flavor,
+        }
+        self.item2refresh = {
+            "net": self.refresh_net,
+            "vdu": self.refresh_vm,
+            "image": self.refresh_ok,
+            "flavor": self.refresh_ok,
+        }
+        self.item2delete = {
+            "net": self.del_net,
+            "vdu": self.del_vm,
+            "image": self.delete_ok,
+            "flavor": self.del_flavor,
+        }
+        self.item2action = {
+            "vdu": self.exec_vm,
+        }
+        self.time_last_task_processed = None
+
+    def insert_task(self, task):
+        try:
+            self.task_queue.put(task, False)
+            return None
+        except queue.Full:
+            raise NsWorkerException("timeout inserting a task")
+
+    def terminate(self):
+        self.insert_task("exit")
+
+    def del_task(self, task):
+        with self.task_lock:
+            if task["status"] == "SCHEDULED":
+                task["status"] = "SUPERSEDED"
+                return True
+            else:  # task["status"] == "processing"
+                self.task_lock.release()
+                return False
+
+    def _load_plugin(self, name, type="vim"):
+        # type can be vim or sdn
+        if "rovim_dummy" not in self.plugins:
+            self.plugins["rovim_dummy"] = VimDummyConnector
+        if name in self.plugins:
+            return self.plugins[name]
+        try:
+            for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+                self.plugins[name] = v.load()
+        except Exception as e:
+            self.logger.critical("Cannot load osm_{}: {}".format(name, e))
+            if name:
+                self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
+        if name and name not in self.plugins:
+            error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
+                         " registered".format(t=type, n=name)
+            self.logger.critical(error_text)
+            self.plugins[name] = FailingConnector(error_text)
+
+        return self.plugins[name]
+
+    def _load_vim(self, vim_account_id):
+        target_id = "vim:" + vim_account_id
+        plugin_name = ""
+        vim = None
+        try:
+            step = "Getting vim={} from db".format(vim_account_id)
+            vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+
+            # if deep_get(vim, "config", "sdn-controller"):
+            #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+            #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+            step = "Decrypt password"
+            schema_version = vim.get("schema_version")
+            self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+                                           schema_version=schema_version, salt=vim_account_id)
+
+            step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
+            plugin_name = "rovim_" + vim["vim_type"]
+            vim_module_conn = self._load_plugin(plugin_name)
+            self.my_vims[target_id] = vim_module_conn(
+                uuid=vim['_id'], name=vim['name'],
+                tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+                url=vim['vim_url'], url_admin=None,
+                user=vim['vim_user'], passwd=vim['vim_password'],
+                config=vim.get('config'), persistent_info={}
+            )
+            self.vim_targets.append(target_id)
+            self.db_vims[target_id] = vim
+            self.error_status = None
+            self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
+                vim_account_id, plugin_name))
+        except Exception as e:
+            self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
+                vim_account_id, plugin_name, step, e))
+            self.db_vims[target_id] = vim or {}
+            self.my_vims[target_id] = FailingConnector(str(e))
+            self.error_status = "Error loading vimconnector: {}".format(e)
+
+    def _get_db_task(self):
+        """
+        Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+        :return: None
+        """
+        now = time.time()
+        if not self.time_last_task_processed:
+            self.time_last_task_processed = now
+        try:
+            while True:
+                locked = self.db.set_one(
+                    "ro_tasks",
+                    q_filter={"target_id": self.vim_targets,
+                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "to_check_at.lt": self.time_last_task_processed},
+                    update_dict={"locked_by": self.my_id, "locked_at": now},
+                    fail_on_empty=False)
+                if locked:
+                    # read and return
+                    ro_task = self.db.get_one(
+                        "ro_tasks",
+                        q_filter={"target_id": self.vim_targets,
+                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                  "locked_at": now})
+                    return ro_task
+                if self.time_last_task_processed == now:
+                    self.time_last_task_processed = None
+                    return None
+                else:
+                    self.time_last_task_processed = now
+                    # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+        except DbException as e:
+            self.logger.error("Database exception at _get_db_task: {}".format(e))
+        except Exception as e:
+            self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+        return None
+
+    def _delete_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to be done or superseded
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        try:
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index:
+                    continue  # own task
+                if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+                    # set to finished
+                    db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+                elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+                    needed_delete = False
+            if needed_delete:
+                return self.item2delete[my_task["item"]](ro_task, task_index)
+            else:
+                return "SUPERSEDED", None
+        except Exception as e:
+            if not isinstance(e, NsWorkerException):
+                self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+                                     exc_info=True)
+            return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+    def _create_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to be created
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        task_status = None
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        elif my_task["status"] == "SCHEDULED":
+            # check if already created by another task
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index:
+                    continue  # own task
+                if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+                    return task["status"], "COPY_VIM_INFO"
+
+            try:
+                task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
+                # TODO update other CREATE tasks
+            except Exception as e:
+                if not isinstance(e, NsWorkerException):
+                    self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+                task_status = "FAILED"
+                ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                # TODO update    ro_vim_item_update
+            return task_status, ro_vim_item_update
+        else:
+            return None, None
+
+    def _get_dependency(self, task_id, ro_task=None, target_id=None):
+        if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"target_id": target_id,
+                          "tasks.target_record_id": task_id
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in enumerate(ro_task_dependency["tasks"]):
+                    if task["target_record_id"] == task_id:
+                        return ro_task_dependency, task_index
+
+        else:
+            if ro_task:
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    if task["task_id"] == task_id:
+                        return ro_task, task_index
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"tasks.ANYINDEX.task_id": task_id,
+                          "tasks.ANYINDEX.target_record.ne": None
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in ro_task_dependency["tasks"]:
+                    if task["task_id"] == task_id:
+                        return ro_task_dependency, task_index
+        raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+    def _proccess_pending_tasks(self, ro_task):
+        ro_task_id = ro_task["_id"]
+        now = time.time()
+        next_check_at = now + (24*60*60)   # one day
+        db_ro_task_update = {}
+
+        def _update_refresh(new_status):
+            # compute next_refresh
+            nonlocal task
+            nonlocal next_check_at
+            nonlocal db_ro_task_update
+            nonlocal ro_task
+
+            next_refresh = time.time()
+            if task["item"] in ("image", "flavor"):
+                next_refresh += self.REFRESH_IMAGE
+            elif new_status == "BUILD":
+                next_refresh += self.REFRESH_BUILD
+            elif new_status == "DONE":
+                next_refresh += self.REFRESH_ACTIVE
+            else:
+                next_refresh += self.REFRESH_ERROR
+            next_check_at = min(next_check_at, next_refresh)
+            db_ro_task_update["vim_info.refresh_at"] = next_refresh
+            ro_task["vim_info"]["refresh_at"] = next_refresh
+
+        try:
+            # 0 get task_status_create
+            task_status_create = None
+            task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
+                                t["status"] in ("BUILD", "DONE")), None)
+            if task_create:
+                task_status_create = task_create["status"]
+            # 1. look for SCHEDULED or if CREATE also DONE,BUILD
+            for task_action in ("DELETE", "CREATE", "EXEC"):
+                db_vim_update = None
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    target_update = None
+                    if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
+                            task["action"] != task_action or \
+                            (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
+                        continue
+                    task_path = "tasks.{}.status".format(task_index)
+                    try:
+                        if task["status"] == "SCHEDULED":
+                            task_depends = {}
+                            # check if tasks that this depends on have been completed
+                            dependency_not_completed = False
+                            for dependency_task_id in (task.get("depends_on") or ()):
+                                dependency_ro_task, dependency_task_index = \
+                                    self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
+                                dependency_task = dependency_ro_task["tasks"][dependency_task_index]
+                                if dependency_task["status"] == "SCHEDULED":
+                                    dependency_not_completed = True
+                                    next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
+                                    break
+                                elif dependency_task["status"] == "FAILED":
+                                    error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
+                                        task["action"], task["item"], dependency_task["action"],
+                                        dependency_task["item"], dependency_task_id,
+                                        dependency_ro_task["vim_info"].get("vim_details"))
+                                    self.logger.error("task={} {}".format(task["task_id"], error_text))
+                                    raise NsWorkerException(error_text)
+
+                                task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
+                                task_depends["TASK-{}".format(dependency_task_id)] = \
+                                    dependency_ro_task["vim_info"]["vim_id"]
+                            if dependency_not_completed:
+                                # TODO set at vim_info.vim_details that it is waiting
+                                continue
+
+                        if task["action"] == "DELETE":
+                            new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
+                                                                               task_depends, db_ro_task_update)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if new_status in ("FINISHED", "SUPERSEDED"):
+                                target_update = "DELETE"
+                        elif task["action"] == "EXEC":
+                            self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if new_status in ("FINISHED", "SUPERSEDED"):
+                                target_update = "DELETE"
+                        elif task["action"] == "CREATE":
+                            if task["status"] == "SCHEDULED":
+                                if task_status_create:
+                                    new_status = task_status_create
+                                    target_update = "COPY_VIM_INFO"
+                                else:
+                                    new_status, db_vim_info_update = \
+                                        self.item2create[task["item"]](ro_task, task_index, task_depends)
+                                    # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
+                                    _update_refresh(new_status)
+                            else:
+                                if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
+                                    new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
+                                    _update_refresh(new_status)
+                    except Exception as e:
+                        new_status = "FAILED"
+                        db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                        if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
+                            self.logger.error("Unexpected exception at _delete_task task={}: {}".
+                                              format(task["task_id"], e), exc_info=True)
+
+                    try:
+                        if db_vim_info_update:
+                            db_vim_update = db_vim_info_update.copy()
+                            db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
+                            ro_task["vim_info"].update(db_vim_info_update)
+
+                        if new_status:
+                            if task_action == "CREATE":
+                                task_status_create = new_status
+                            db_ro_task_update[task_path] = new_status
+                        if target_update or db_vim_update:
+
+                            if target_update == "DELETE":
+                                self._update_target(task, None)
+                            elif target_update == "COPY_VIM_INFO":
+                                self._update_target(task, ro_task["vim_info"])
+                            else:
+                                self._update_target(task, db_vim_update)
+
+                    except Exception as e:
+                        self.logger.error("Unexpected exception at _update_target task={}: {}".
+                                          format(task["task_id"], e), exc_info=True)
+
+            # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
+            # outside this task (by ro_nbi) do not update it
+            db_ro_task_update["locked_by"] = None
+            # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
+            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["to_check_at"] = next_check_at
+            if not self.db.set_one("ro_tasks",
+                                   update_dict=db_ro_task_update,
+                                   q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
+                                   fail_on_empty=False):
+                del db_ro_task_update["to_check_at"]
+                self.db.set_one("ro_tasks",
+                                q_filter={"_id": ro_task["_id"]},
+                                update_dict=db_ro_task_update,
+                                fail_on_empty=True)
+        except DbException as e:
+            self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
+        except Exception as e:
+            self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
+
+    def _update_target(self, task, ro_vim_item_update):
+        try:
+            table, _id, path = task["target_record"].split(":")
+            if ro_vim_item_update:
+                update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
+                               ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
+                if ro_vim_item_update.get("interfaces"):
+                    path_vdu = path[:path.rfind(".")]
+                    path_vdu = path_vdu[:path_vdu.rfind(".")]
+                    path_interfaces = path_vdu + ".interfaces"
+                    for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
+                        if iface:
+                            update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
+                                                k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
+                            if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
+                                update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
+                            if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
+                                update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
+
+                self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
+            else:
+                self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
+                                unset={path: None})
+        except DbException as e:
+            self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
+
+    def new_image(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            if task.get("find_params"):
+                vim_images = target_vim.get_image_list(**task["find_params"])
+                if not vim_images:
+                    raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
+                        task["find_params"]))
+                elif len(vim_images) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                else:
+                    vim_image_id = vim_images[0]["id"]
+
+            ro_vim_item_update = {"vim_id": vim_image_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
+            return "DONE", ro_vim_item_update
+        except (NsWorkerException, vimconn.VimConnException) as e:
+            self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def del_flavor(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        flavor_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if flavor_vim_id:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_flavor(flavor_vim_id)
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
+                ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-flavor={} {}".format(
+            task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def refresh_ok(self, ro_task):
+        """skip calling VIM to get image status. Assumes ok"""
+        if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
+            return "FAILED", {}
+        return "DONE", {}
+
+    def delete_ok(self, ro_task):
+        """skip calling VIM to delete image status. Assumes ok"""
+        return "DONE", {}
+
+    def new_flavor(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            vim_flavor_id = None
+            if task.get("find_params"):
+                try:
+                    flavor_data = task["find_params"]["flavor_data"]
+                    vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
+                except vimconn.VimConnNotFoundException:
+                    pass
+
+            if not vim_flavor_id and task.get("params"):
+                # CREATE
+                flavor_data = task["params"]["flavor_data"]
+                vim_flavor_id = target_vim.new_flavor(flavor_data)
+                created = True
+
+            ro_vim_item_update = {"vim_id": vim_flavor_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
+            return "DONE", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def new_net(self, ro_task, task_index, task_depends):
+        vim_net_id = None
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            if task.get("find_params"):
+                # if management, get configuration of VIM
+                if task["find_params"].get("filter_dict"):
+                    vim_filter = task["find_params"]["filter_dict"]
+                elif task["find_params"].get("mgmt"):   # mamagement network
+                    if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
+                        vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
+                    elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
+                        vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
+                    else:
+                        vim_filter = {"name": task["find_params"]["name"]}
+                else:
+                    raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
+
+                vim_nets = target_vim.get_network_list(vim_filter)
+                if not vim_nets and not task.get("params"):
+                    raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
+                        task.get("find_params")))
+                elif len(vim_nets) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                if vim_nets:
+                    vim_net_id = vim_nets[0]["id"]
+            else:
+                # CREATE
+                params = task["params"]
+                vim_net_id, created_items = target_vim.new_network(**params)
+                created = True
+
+            ro_vim_item_update = {"vim_id": vim_net_id,
+                                  "vim_status": "BUILD",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def refresh_net(self, ro_task):
+        """Call VIM to get network status"""
+        ro_task_id = ro_task["_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+
+        vim_id = ro_task["vim_info"]["vim_id"]
+        net_to_refresh_list = [vim_id]
+        try:
+            vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+            vim_info = vim_dict[vim_id]
+            if vim_info["status"] == "ACTIVE":
+                task_status = "DONE"
+            elif vim_info["status"] == "BUILD":
+                task_status = "BUILD"
+            else:
+                task_status = "FAILED"
+        except vimconn.VimConnException as e:
+            # Mark all tasks at VIM_ERROR status
+            self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+            task_status = "FAILED"
+
+        ro_vim_item_update = {}
+        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+            ro_vim_item_update["vim_status"] = vim_info["status"]
+        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+            ro_vim_item_update["vim_name"] = vim_info.get("name")
+        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+        elif vim_info["status"] == "DELETED":
+            ro_vim_item_update["vim_id"] = None
+            ro_vim_item_update["vim_details"] = "Deleted externally"
+        else:
+            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+        if ro_vim_item_update:
+            self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+        return task_status, ro_vim_item_update
+
+    def del_net(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        net_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if net_vim_id or ro_task["vim_info"]["created_items"]:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                        net_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
+                                                            ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def new_vm(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            created = True
+            params = task["params"]
+            params_copy = deepcopy(params)
+            net_list = params_copy["net_list"]
+            for net in net_list:
+                if "net_id" in net and net["net_id"].startswith("TASK-"):  # change task_id into network_id
+                    network_id = task_depends[net["net_id"]]
+                    if not network_id:
+                        raise NsWorkerException("Cannot create VM because depends on a network not created or found "
+                                                "for {}".format(net["net_id"]))
+                    net["net_id"] = network_id
+            if params_copy["image_id"].startswith("TASK-"):
+                params_copy["image_id"] = task_depends[params_copy["image_id"]]
+            if params_copy["flavor_id"].startswith("TASK-"):
+                params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
+
+            vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
+            interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+
+            ro_vim_item_update = {"vim_id": vim_vm_id,
+                                  "vim_status": "BUILD",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None,
+                                  "interfaces_vim_ids": interfaces,
+                                  "interfaces": [],
+                                  }
+            self.logger.debug(
+                "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def del_vm(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        vm_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if vm_vim_id or ro_task["vim_info"]["created_items"]:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                       vm_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
+                                                           ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def refresh_vm(self, ro_task):
+        """Call VIM to get vm status"""
+        ro_task_id = ro_task["_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+
+        vim_id = ro_task["vim_info"]["vim_id"]
+        if not vim_id:
+            return None, None
+        vm_to_refresh_list = [vim_id]
+        try:
+            vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
+            vim_info = vim_dict[vim_id]
+            if vim_info["status"] == "ACTIVE":
+                task_status = "DONE"
+            elif vim_info["status"] == "BUILD":
+                task_status = "BUILD"
+            else:
+                task_status = "FAILED"
+        except vimconn.VimConnException as e:
+            # Mark all tasks at VIM_ERROR status
+            self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+            task_status = "FAILED"
+
+        ro_vim_item_update = {}
+        # TODO check and update interfaces
+        vim_interfaces = []
+        for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
+            iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
+            # if iface:
+            #     iface.pop("vim_info", None)
+            vim_interfaces.append(iface)
+
+        task = ro_task["tasks"][0]  # TODO look for a task CREATE and active
+        if task.get("mgmt_vnf_interface") is not None:
+            vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
+        mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
+        vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+
+        if ro_task["vim_info"]["interfaces"] != vim_interfaces:
+            ro_vim_item_update["interfaces"] = vim_interfaces
+        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+            ro_vim_item_update["vim_status"] = vim_info["status"]
+        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+            ro_vim_item_update["vim_name"] = vim_info.get("name")
+        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+        elif vim_info["status"] == "DELETED":
+            ro_vim_item_update["vim_id"] = None
+            ro_vim_item_update["vim_details"] = "Deleted externally"
+        else:
+            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+        if ro_vim_item_update:
+            self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
+                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+        return task_status, ro_vim_item_update
+
+    def exec_vm(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            params = task["params"]
+            params_copy = deepcopy(params)
+            params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
+                                                         params_copy.pop("schema_version"), params_copy.pop("salt"))
+
+            target_vim.inject_user_key(**params_copy)
+            self.logger.debug(
+                "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
+            return "DONE", params_copy["key"]
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def run(self):
+        # load database
+        self.logger.debug("Starting")
+        while True:
+            try:
+                task = self.task_queue.get(block=False if self.my_vims else True)
+                if task[0] == "terminate":
+                    break
+                if task[0] == "load_vim":
+                    self._load_vim(task[1])
+                continue
+            except queue.Empty:
+                pass
+
+            try:
+                busy = False
+                ro_task = self._get_db_task()
+                if ro_task:
+                    self._proccess_pending_tasks(ro_task)
+                    busy = True
+                if not busy:
+                    time.sleep(5)
+            except Exception as e:
+                self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
+
+        self.logger.debug("Finishing")