Feature 7184 New Generation RO enhancemnt 07/10007/7 sol006
authortierno <alfonso.tiernosepulveda@telefonica.com>
Mon, 19 Oct 2020 16:38:00 +0000 (16:38 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Wed, 25 Nov 2020 18:32:36 +0000 (18:32 +0000)
New module vim_admin:
- check and load unattending ro_tasks
- check for vim updates and deletion in order to re-load it
- check vim_connectivity at creation edition
- Injection of ssh keys, retry if fails
- allow non load filesystem, because it is used only for cloud-init, but
it can be provided as text by LCM
- load sdn and wim targets
- sdn connectivity
- set network type 'data' when there are SRIOV or PT interfaces

Change-Id: I67147d1d500d60f0b2be1de214c97b1905168c8e
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
12 files changed:
Dockerfile-local
NG-RO/osm_ng_ro/ns.py
NG-RO/osm_ng_ro/ns_thread.py
NG-RO/osm_ng_ro/ro.cfg
NG-RO/osm_ng_ro/ro_main.py
NG-RO/osm_ng_ro/validation.py
NG-RO/osm_ng_ro/vim_admin.py [new file with mode: 0644]
RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py
RO-plugin/osm_ro_plugin/openflow_conn.py
RO-plugin/osm_ro_plugin/sdn_dummy.py
RO-plugin/osm_ro_plugin/vim_dummy.py
RO/osm_ro/scripts/RO-start.sh

index 88fdfc8..b99a090 100644 (file)
@@ -50,6 +50,7 @@ RUN /root/RO/RO/osm_ro/scripts/install-osm-im.sh --develop && \
     python3 -m pip install -e /root/RO/RO-plugin && \
     python3 -m pip install -e /root/RO/RO && \
     python3 -m pip install -e /root/RO/RO-client && \
+    python3 -m pip install -e /root/RO/NG-RO && \
     python3 -m pip install -e /root/RO/RO-VIM-vmware && \
     python3 -m pip install -e /root/RO/RO-VIM-openstack && \
     python3 -m pip install -e /root/RO/RO-VIM-openvim && \
@@ -68,42 +69,27 @@ RUN /root/RO/RO/osm_ro/scripts/install-osm-im.sh --develop && \
     apt-get clean && \
     rm -rf /var/lib/apt/lists/*
 
-VOLUME /var/log/osm
-
 EXPOSE 9090
 
-# Two mysql databases are needed (DB and DB_OVIM). Can be hosted on same or separated containers
-# These ENV must be provided
-# RO_DB_HOST: host of the main
-# RO_DB_OVIM_HOST: ...        if empty RO_DB_HOST is assumed
-# RO_DB_ROOT_PASSWORD: this has to be provided first time for creating database. It will create and init only if empty!
-# RO_DB_OVIM_ROOT_PASSWORD: ...  if empty RO_DB_ROOT_PASSWORD is assumed
-# RO_DB_USER:    default value 'mano'
-# RO_DB_OVIM_USER:       default value 'mano'
-# RO_DB_PASSWORD:        default value 'manopw'
-# RO_DB_OVIM_PASSWORD:        default value 'manopw'
-# RO_DB_PORT:             default value '3306'
-# RO_DB_OVIM_PORT:        default value '3306'
-# RO_DB_NAME:             default value 'mano_db'
-# RO_DB_OVIM_NAME:        default value 'mano_vim_db'
-# RO_LOG_FILE:            default log to stderr if not defined
-
-ENV RO_DB_HOST="" \
-    RO_DB_OVIM_HOST="" \
-    RO_DB_ROOT_PASSWORD="" \
-    RO_DB_OVIM_ROOT_PASSWORD="" \
-    RO_DB_USER=mano \
-    RO_DB_OVIM_USER=mano \
-    RO_DB_PASSWORD=manopw \
-    RO_DB_OVIM_PASSWORD=manopw \
-    RO_DB_PORT=3306 \
-    RO_DB_OVIM_PORT=3306 \
-    RO_DB_NAME=mano_db \
-    RO_DB_OVIM_NAME=mano_vim_db \
-    OPENMANO_TENANT=osm \
-    RO_LOG_LEVEL=DEBUG
-
-CMD RO-start.sh
+ENV OSMRO_NG              True
+
+# database
+ENV OSMRO_DATABASE_DRIVER mongo
+ENV OSMRO_DATABASE_URI    mongodb://mongo:27017
+# ENV OSMRO_DATABASE_COMMONKEY  xxx
+# ENV OSMRO_DATABASE_USER  xxx
+# ENV OSMRO_DATABASE_PASSWORD  xxx
+
+# message
+ENV OSMRO_MESSAGE_DRIVER  kafka
+ENV OSMRO_MESSAGE_HOST    kafka
+ENV OSMRO_MESSAGE_PORT    9092
+
+# logs
+ENV OSMRO_LOG_LEVEL       DEBUG
+
+CMD python3 -m osm_ng_ro.ro_main
 
 # HEALTHCHECK --start-period=30s --interval=10s --timeout=5s --retries=12 \
-#  CMD curl --silent --fail localhost:9090/openmano/tenants || exit 1
+#  CMD curl --silent --fail http://localhost:9090/ro || exit 1
+
index eda6c48..a9c5ec0 100644 (file)
@@ -19,7 +19,7 @@
 import logging
 # import yaml
 from traceback import format_exc as traceback_format_exc
-from osm_ng_ro.ns_thread import NsWorker
+from osm_ng_ro.ns_thread import NsWorker, NsWorkerException, deep_get
 from osm_ng_ro.validation import validate_input, deploy_schema
 from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
@@ -30,7 +30,7 @@ from uuid import uuid4
 from threading import Lock
 from random import choice as random_choice
 from time import time
-from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
+from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
 from cryptography.hazmat.primitives import serialization as crypto_serialization
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.backends import default_backend as crypto_default_backend
@@ -82,10 +82,13 @@ class Ns(object):
         self.msg = None
         self.config = None
         # self.operations = None
-        self.logger = logging.getLogger("ro.ns")
+        self.logger = None
+        # ^ Getting logger inside method self.start because parent logger (ro) is not available yet.
+        # If done now it will not be linked to parent not getting its handler and level
         self.map_topic = {}
         self.write_lock = None
         self.assignment = {}
+        self.assignment_list = []
         self.next_worker = 0
         self.plugins = {}
         self.workers = []
@@ -102,6 +105,7 @@ class Ns(object):
         """
         self.config = config
         self.config["process_id"] = get_process_id()  # used for HA identity
+        self.logger = logging.getLogger("ro.ns")
         # check right version of common
         if versiontuple(common_version) < versiontuple(min_common_version):
             raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
@@ -125,6 +129,8 @@ class Ns(object):
                 elif config["storage"]["driver"] == "mongo":
                     self.fs = fsmongo.FsMongo()
                     self.fs.fs_connect(config["storage"])
+                elif config["storage"]["driver"] is None:
+                    pass
                 else:
                     raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format(
                         config["storage"]["driver"]))
@@ -159,19 +165,56 @@ class Ns(object):
         for worker in self.workers:
             worker.insert_task(("terminate",))
 
-    def _create_worker(self, vim_account_id):
-        # TODO make use of the limit self.config["global"]["server.ns_threads"]
+    def _create_worker(self, target_id, load=True):
+        # Look for a thread not alive
         worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
-        if worker_id is None:
-            worker_id = len(self.workers)
-            self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+        if worker_id:
+            # re-start worker
             self.workers[worker_id].start()
-        self.workers[worker_id].insert_task(("load_vim", vim_account_id))
+        else:
+            worker_id = len(self.workers)
+            if worker_id < self.config["global"]["server.ns_threads"]:
+                # create a new worker
+                self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+                self.workers[worker_id].start()
+            else:
+                # reached maximum number of threads, assign VIM to an existing one
+                worker_id = self.next_worker
+                self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
+        if load:
+            self.workers[worker_id].insert_task(("load_vim", target_id))
         return worker_id
 
-    def _assign_vim(self, vim_account_id):
-        if vim_account_id not in self.assignment:
-            self.assignment[vim_account_id] = self._create_worker(vim_account_id)
+    def assign_vim(self, target_id):
+        if target_id not in self.assignment:
+            self.assignment[target_id] = self._create_worker(target_id)
+            self.assignment_list.append(target_id)
+
+    def reload_vim(self, target_id):
+        # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
+        # this is because database VIM information is cached for threads working with SDN
+        # if target_id in self.assignment:
+        #     worker_id = self.assignment[target_id]
+        #     self.workers[worker_id].insert_task(("reload_vim", target_id))
+        for worker in self.workers:
+            if worker.is_alive():
+                worker.insert_task(("reload_vim", target_id))
+
+    def unload_vim(self, target_id):
+        if target_id in self.assignment:
+            worker_id = self.assignment[target_id]
+            self.workers[worker_id].insert_task(("unload_vim", target_id))
+            del self.assignment[target_id]
+            self.assignment_list.remove(target_id)
+
+    def check_vim(self, target_id):
+        if target_id in self.assignment:
+            worker_id = self.assignment[target_id]
+        else:
+            worker_id = self._create_worker(target_id, load=False)
+
+        worker = self.workers[worker_id]
+        worker.insert_task(("check_vim", target_id))
 
     def _get_cloud_init(self, where):
         """
@@ -185,6 +228,9 @@ class Ns(object):
         if _type == "file":
             base_folder = vnfd["_admin"]["storage"]
             cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], name)
+            if not self.fs:
+                raise NsException("Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver"
+                                  .format(cloud_init_file))
             with self.fs.file_open(cloud_init_file, "r") as ci_file:
                 cloud_init_content = ci_file.read()
         elif _type == "vdu":
@@ -194,20 +240,16 @@ class Ns(object):
         return cloud_init_content
 
     def _parse_jinja2(self, cloud_init_content, params, context):
+
         try:
-            env = Environment()
-            ast = env.parse(cloud_init_content)
-            mandatory_vars = meta.find_undeclared_variables(ast)
-            if mandatory_vars:
-                for var in mandatory_vars:
-                    if not params or var not in params:
-                        raise NsException(
-                            "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
-                            "inside the 'additionalParamsForVnf' block".format(var, context))
-            template = Template(cloud_init_content)
+            env = Environment(undefined=StrictUndefined)
+            template = env.from_string(cloud_init_content)
             return template.render(params or {})
-
-        except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+        except UndefinedError as e:
+            raise NsException(
+                "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
+                "inside the 'additionalParamsForVnf' block".format(e, context))
+        except (TemplateError, TemplateNotFound) as e:
             raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context, e))
 
     def _create_db_ro_nsrs(self, nsr_id, now):
@@ -226,6 +268,9 @@ class Ns(object):
                 crypto_serialization.PublicFormat.OpenSSH
             )
             private_key = private_key.decode('utf8')
+            # Change first line because Paramiko needs a explicit start with 'BEGIN RSA PRIVATE KEY'
+            i = private_key.find("\n")
+            private_key = "-----BEGIN RSA PRIVATE KEY-----" + private_key[i:]
             public_key = public_key.decode('utf8')
         except Exception as e:
             raise NsException("Cannot create ssh-keys: {}".format(e))
@@ -247,27 +292,24 @@ class Ns(object):
         return db_content
 
     def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
-        print("ns.deploy session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+        self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
         validate_input(indata, deploy_schema)
         action_id = indata.get("action_id", str(uuid4()))
         task_index = 0
         # get current deployment
-        db_nsr = None
-        # db_nslcmop = None
         db_nsr_update = {}        # update operation on nsrs
         db_vnfrs_update = {}
-        # db_nslcmop_update = {}        # update operation on nslcmops
         db_vnfrs = {}     # vnf's info indexed by _id
-        vdu2cloud_init = {}
+        nb_ro_tasks = 0  # for logging
+        vdu2cloud_init = indata.get("cloud_init_content") or {}
         step = ''
         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
         self.logger.debug(logging_text + "Enter")
         try:
             step = "Getting ns and vnfr record from db"
-            # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
-            db_ro_tasks = []
             db_new_tasks = []
+            tasks_by_target_record_id = {}
             # read from db: vnf's of this ns
             step = "Getting vnfrs from db"
             db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
@@ -293,12 +335,13 @@ class Ns(object):
                         break
                     index += 1
 
-            def _create_task(item, action, target_record, target_record_id, extra_dict=None):
+            def _create_task(target_id, item, action, target_record, target_record_id, extra_dict=None):
                 nonlocal task_index
                 nonlocal action_id
                 nonlocal nsr_id
 
                 task = {
+                    "target_id": target_id,  # it will be removed before pushing at database
                     "action_id": action_id,
                     "nsr_id": nsr_id,
                     "task_id": "{}:{}".format(action_id, task_index),
@@ -313,17 +356,17 @@ class Ns(object):
                 task_index += 1
                 return task
 
-            def _create_ro_task(vim_account_id, item, action, target_record, target_record_id, extra_dict=None):
+            def _create_ro_task(target_id, task):
                 nonlocal action_id
                 nonlocal task_index
                 nonlocal now
 
-                _id = action_id + ":" + str(task_index)
+                _id = task["task_id"]
                 db_ro_task = {
                     "_id": _id,
                     "locked_by": None,
                     "locked_at": 0.0,
-                    "target_id": "vim:" + vim_account_id,
+                    "target_id": target_id,
                     "vim_info": {
                         "created": False,
                         "created_items": None,
@@ -336,11 +379,11 @@ class Ns(object):
                     "modified_at": now,
                     "created_at": now,
                     "to_check_at": now,
-                    "tasks": [_create_task(item, action, target_record, target_record_id, extra_dict)],
+                    "tasks": [task],
                 }
                 return db_ro_task
 
-            def _process_image_params(target_image, vim_info):
+            def _process_image_params(target_image, vim_info, target_record_id):
                 find_params = {}
                 if target_image.get("image"):
                     find_params["filter_dict"] = {"name": target_image.get("image")}
@@ -350,7 +393,7 @@ class Ns(object):
                     find_params["filter_dict"] = {"checksum": target_image.get("image_checksum")}
                 return {"find_params": find_params}
 
-            def _process_flavor_params(target_flavor, vim_info):
+            def _process_flavor_params(target_flavor, vim_info, target_record_id):
 
                 def _get_resource_allocation_params(quota_descriptor):
                     """
@@ -375,9 +418,10 @@ class Ns(object):
                     "ram": int(target_flavor["memory-mb"]),
                     "vcpus": target_flavor["vcpu-count"],
                 }
+                numa = {}
+                extended = {}
                 if target_flavor.get("guest-epa"):
                     extended = {}
-                    numa = {}
                     epa_vcpu_set = False
                     if target_flavor["guest-epa"].get("numa-node-policy"):
                         numa_node_policy = target_flavor["guest-epa"].get("numa-node-policy")
@@ -438,9 +482,39 @@ class Ns(object):
                 extra_dict["params"] = {"flavor_data": flavor_data_name}
                 return extra_dict
 
-            def _process_net_params(target_vld, vim_info):
+            def _ip_profile_2_ro(ip_profile):
+                if not ip_profile:
+                    return None
+                ro_ip_profile = {
+                    "ip_version": "IPv4" if "v4" in ip_profile.get("ip-version", "ipv4") else "IPv6",
+                    "subnet_address": ip_profile.get("subnet-address"),
+                    "gateway_address": ip_profile.get("gateway-address"),
+                    "dhcp_enabled": ip_profile["dhcp-params"].get("enabled", True),
+                    "dhcp_start_address": ip_profile["dhcp-params"].get("start-address"),
+                    "dhcp_count": ip_profile["dhcp-params"].get("count"),
+
+                }
+                if ip_profile.get("dns-server"):
+                    ro_ip_profile["dns_address"] = ";".join([v["address"] for v in ip_profile["dns-server"]])
+                if ip_profile.get('security-group'):
+                    ro_ip_profile["security_group"] = ip_profile['security-group']
+                return ro_ip_profile
+
+            def _process_net_params(target_vld, vim_info, target_record_id):
                 nonlocal indata
                 extra_dict = {}
+
+                if vim_info.get("sdn"):
+                    # vnf_preffix = "vnfrs:{}".format(vnfr_id)
+                    # ns_preffix = "nsrs:{}".format(nsr_id)
+                    vld_target_record_id, _, _ = target_record_id.rpartition(".")  # remove the ending ".sdn
+                    extra_dict["params"] = {k: vim_info[k] for k in ("sdn-ports", "target_vim", "vlds", "type")
+                                            if vim_info.get(k)}
+                    # TODO needed to add target_id in the dependency.
+                    if vim_info.get("target_vim"):
+                        extra_dict["depends_on"] = [vim_info.get("target_vim") + " " + vld_target_record_id]
+                    return extra_dict
+
                 if vim_info.get("vim_network_name"):
                     extra_dict["find_params"] = {"filter_dict": {"name": vim_info.get("vim_network_name")}}
                 elif vim_info.get("vim_network_id"):
@@ -451,7 +525,7 @@ class Ns(object):
                     # create
                     extra_dict["params"] = {
                         "net_name": "{}-{}".format(indata["name"][:16], target_vld.get("name", target_vld["id"])[:16]),
-                        "ip_profile": vim_info.get('ip_profile'),
+                        "ip_profile": _ip_profile_2_ro(vim_info.get('ip_profile')),
                         "provider_network_profile": vim_info.get('provider_network'),
                     }
                     if not target_vld.get("underlay"):
@@ -460,12 +534,13 @@ class Ns(object):
                         extra_dict["params"]["net_type"] = "ptp" if target_vld.get("type") == "ELINE" else "data"
                 return extra_dict
 
-            def _process_vdu_params(target_vdu, vim_info):
+            def _process_vdu_params(target_vdu, vim_info, target_record_id):
                 nonlocal vnfr_id
                 nonlocal nsr_id
                 nonlocal indata
                 nonlocal vnfr
                 nonlocal vdu2cloud_init
+                nonlocal tasks_by_target_record_id
                 vnf_preffix = "vnfrs:{}".format(vnfr_id)
                 ns_preffix = "nsrs:{}".format(nsr_id)
                 image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
@@ -478,15 +553,16 @@ class Ns(object):
                     else:
                         net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
                     extra_dict["depends_on"].append(net_text)
-                    net_item = {
-                        "name": interface["name"],
-                        "net_id": "TASK-" + net_text,
-                        "vpci": interface.get("vpci"),
-                        "type": "virtual",
-                        # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
-                        # TODO floating_ip: True/False (or it can be None)
-                    }
+                    net_item = {x: v for x, v in interface.items() if x in
+                                ("name", "vpci", "port_security", "port_security_disable_strategy", "floating_ip")}
+                    net_item["net_id"] = "TASK-" + net_text
+                    net_item["type"] = "virtual"
+                    # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
+                    # TODO floating_ip: True/False (or it can be None)
                     if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+                        # mark the net create task as type data
+                        if deep_get(tasks_by_target_record_id, net_text, "params", "net_type"):
+                            tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
                         net_item["use"] = "data"
                         net_item["model"] = interface["type"]
                         net_item["type"] = interface["type"]
@@ -496,12 +572,15 @@ class Ns(object):
                     else:   # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
                         net_item["use"] = "bridge"
                         net_item["model"] = interface.get("type")
+                    if interface.get("ip-address"):
+                        net_item["ip_address"] = interface["ip-address"]
+                    if interface.get("mac-address"):
+                        net_item["mac_address"] = interface["mac-address"]
                     net_list.append(net_item)
                     if interface.get("mgmt-vnf"):
                         extra_dict["mgmt_vnf_interface"] = iface_index
                     elif interface.get("mgmt-interface"):
                         extra_dict["mgmt_vdu_interface"] = iface_index
-
                 # cloud config
                 cloud_config = {}
                 if target_vdu.get("cloud-init"):
@@ -536,84 +615,97 @@ class Ns(object):
                 return extra_dict
 
             def _process_items(target_list, existing_list, db_record, db_update, db_path, item, process_params):
-                nonlocal db_ro_tasks
                 nonlocal db_new_tasks
+                nonlocal tasks_by_target_record_id
                 nonlocal task_index
 
-                # ensure all the target_list elements has an "id". If not assign the index
+                # ensure all the target_list elements has an "id". If not assign the index as id
                 for target_index, tl in enumerate(target_list):
                     if tl and not tl.get("id"):
                         tl["id"] = str(target_index)
 
-                # step 1 networks to be deleted/updated
-                for vld_index, existing_vld in enumerate(existing_list):
-                    target_vld = next((vld for vld in target_list if vld["id"] == existing_vld["id"]), None)
-                    for existing_vim_index, existing_vim_info in enumerate(existing_vld.get("vim_info", ())):
-                        if not existing_vim_info:
+                # step 1 items (networks,vdus,...) to be deleted/updated
+                for item_index, existing_item in enumerate(existing_list):
+                    target_item = next((t for t in target_list if t["id"] == existing_item["id"]), None)
+                    for target_vim, existing_viminfo in existing_item.get("vim_info", {}).items():
+                        if existing_viminfo is None:
                             continue
-                        if target_vld:
-                            target_viminfo = next((target_viminfo for target_viminfo in target_vld.get("vim_info", ())
-                                                   if existing_vim_info["vim_account_id"] == target_viminfo[
-                                                       "vim_account_id"]), None)
+                        if target_item:
+                            target_viminfo = target_item.get("vim_info", {}).get(target_vim)
                         else:
                             target_viminfo = None
-                        if not target_viminfo:
+                        if target_viminfo is None:
                             # must be deleted
-                            self._assign_vim(existing_vim_info["vim_account_id"])
-                            db_new_tasks.append(_create_task(
-                                item, "DELETE",
-                                target_record="{}.{}.vim_info.{}".format(db_record, vld_index, existing_vim_index),
-                                target_record_id="{}.{}".format(db_record, existing_vld["id"])))
+                            self.assign_vim(target_vim)
+                            target_record_id = "{}.{}".format(db_record, existing_item["id"])
+                            item_ = item
+                            if target_vim.startswith("sdn"):
+                                # item must be sdn-net instead of net if target_vim is a sdn
+                                item_ = "sdn_net"
+                                target_record_id += ".sdn"
+                            task = _create_task(
+                                target_vim, item_, "DELETE",
+                                target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
+                                target_record_id=target_record_id)
+                            tasks_by_target_record_id[target_record_id] = task
+                            db_new_tasks.append(task)
                             # TODO delete
                     # TODO check one by one the vims to be created/deleted
 
-                # step 2 networks to be created
-                for target_vld in target_list:
-                    vld_index = -1
-                    for vld_index, existing_vld in enumerate(existing_list):
-                        if existing_vld["id"] == target_vld["id"]:
+                # step 2 items (networks,vdus,...) to be created
+                for target_item in target_list:
+                    item_index = -1
+                    for item_index, existing_item in enumerate(existing_list):
+                        if existing_item["id"] == target_item["id"]:
                             break
                     else:
-                        vld_index += 1
-                        db_update[db_path + ".{}".format(vld_index)] = target_vld
-                        existing_list.append(target_vld)
-                        existing_vld = None
+                        item_index += 1
+                        db_update[db_path + ".{}".format(item_index)] = target_item
+                        existing_list.append(target_item)
+                        existing_item = None
 
-                    for vim_index, vim_info in enumerate(target_vld["vim_info"]):
+                    for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
                         existing_viminfo = None
-                        if existing_vld:
-                            existing_viminfo = next(
-                                (existing_viminfo for existing_viminfo in existing_vld.get("vim_info", ())
-                                 if vim_info["vim_account_id"] == existing_viminfo["vim_account_id"]), None)
+                        if existing_item:
+                            existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
                         # TODO check if different. Delete and create???
                         # TODO delete if not exist
-                        if existing_viminfo:
+                        if existing_viminfo is not None:
                             continue
 
-                        extra_dict = process_params(target_vld, vim_info)
-
-                        self._assign_vim(vim_info["vim_account_id"])
-                        db_ro_tasks.append(_create_ro_task(
-                            vim_info["vim_account_id"], item, "CREATE",
-                            target_record="{}.{}.vim_info.{}".format(db_record, vld_index, vim_index),
-                            target_record_id="{}.{}".format(db_record, target_vld["id"]),
-                            extra_dict=extra_dict))
-
-                        db_update[db_path + ".{}".format(vld_index)] = target_vld
+                        target_record_id = "{}.{}".format(db_record, target_item["id"])
+                        item_ = item
+                        if target_vim.startswith("sdn"):
+                            # item must be sdn-net instead of net if target_vim is a sdn
+                            item_ = "sdn_net"
+                            target_record_id += ".sdn"
+                        extra_dict = process_params(target_item, target_viminfo, target_record_id)
+
+                        self.assign_vim(target_vim)
+                        task = _create_task(
+                            target_vim, item_, "CREATE",
+                            target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
+                            target_record_id=target_record_id,
+                            extra_dict=extra_dict)
+                        tasks_by_target_record_id[target_record_id] = task
+                        db_new_tasks.append(task)
+                        if target_item.get("common_id"):
+                            task["common_id"] = target_item["common_id"]
+
+                        db_update[db_path + ".{}".format(item_index)] = target_item
 
             def _process_action(indata):
-                nonlocal db_ro_tasks
                 nonlocal db_new_tasks
                 nonlocal task_index
                 nonlocal db_vnfrs
                 nonlocal db_ro_nsr
 
-                if indata["action"] == "inject_ssh_key":
-                    key = indata.get("key")
-                    user = indata.get("user")
-                    password = indata.get("password")
+                if indata["action"]["action"] == "inject_ssh_key":
+                    key = indata["action"].get("key")
+                    user = indata["action"].get("user")
+                    password = indata["action"].get("password")
                     for vnf in indata.get("vnf", ()):
-                        if vnf.get("_id") not in db_vnfrs:
+                        if vnf["_id"] not in db_vnfrs:
                             raise NsException("Invalid vnf={}".format(vnf["_id"]))
                         db_vnfr = db_vnfrs[vnf["_id"]]
                         for target_vdu in vnf.get("vdur", ()):
@@ -621,13 +713,13 @@ class Ns(object):
                                                     i_v[1]["id"] == target_vdu["id"]), (None, None))
                             if not vdur:
                                 raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
-                            vim_info = vdur["vim_info"][0]
-                            self._assign_vim(vim_info["vim_account_id"])
+                            target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
+                            self.assign_vim(target_vim)
                             target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
                             extra_dict = {
                                 "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
                                 "params": {
-                                    "ip_address": vdur.gt("ip_address"),
+                                    "ip_address": vdur.get("ip-address"),
                                     "user": user,
                                     "key": key,
                                     "password": password,
@@ -636,10 +728,11 @@ class Ns(object):
                                     "schema_version": db_ro_nsr["_admin"]["schema_version"]
                                 }
                             }
-                            db_ro_tasks.append(_create_ro_task(vim_info["vim_account_id"], "vdu", "EXEC",
-                                                               target_record=target_record,
-                                                               target_record_id=None,
-                                                               extra_dict=extra_dict))
+                            task = _create_task(target_vim, "vdu", "EXEC",
+                                                target_record=target_record,
+                                                target_record_id=None,
+                                                extra_dict=extra_dict)
+                            db_new_tasks.append(task)
 
             with self.write_lock:
                 if indata.get("action"):
@@ -653,13 +746,13 @@ class Ns(object):
                                    db_path="vld", item="net", process_params=_process_net_params)
 
                     step = "process NS images"
-                    _process_items(target_list=indata["image"] or [], existing_list=db_nsr.get("image") or [],
+                    _process_items(target_list=indata.get("image") or [], existing_list=db_nsr.get("image") or [],
                                    db_record="nsrs:{}:image".format(nsr_id),
                                    db_update=db_nsr_update, db_path="image", item="image",
                                    process_params=_process_image_params)
 
                     step = "process NS flavors"
-                    _process_items(target_list=indata["flavor"] or [], existing_list=db_nsr.get("flavor") or [],
+                    _process_items(target_list=indata.get("flavor") or [], existing_list=db_nsr.get("flavor") or [],
                                    db_record="nsrs:{}:flavor".format(nsr_id),
                                    db_update=db_nsr_update, db_path="flavor", item="flavor",
                                    process_params=_process_flavor_params)
@@ -681,17 +774,27 @@ class Ns(object):
                                        db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu",
                                        process_params=_process_vdu_params)
 
-                step = "Updating database, Creating ro_tasks"
-                if db_ro_tasks:
-                    self.db.create_list("ro_tasks", db_ro_tasks)
-                step = "Updating database, Appending tasks to ro_tasks"
-                for task in db_new_tasks:
-                    if not self.db.set_one("ro_tasks", q_filter={"tasks.target_record": task["target_record"]},
+                for db_task in db_new_tasks:
+                    step = "Updating database, Appending tasks to ro_tasks"
+                    target_id = db_task.pop("target_id")
+                    common_id = db_task.get("common_id")
+                    if common_id:
+                        if self.db.set_one("ro_tasks",
+                                           q_filter={"target_id": target_id,
+                                                     "tasks.common_id": common_id},
                                            update_dict={"to_check_at": now, "modified_at": now},
-                                           push={"tasks": task}, fail_on_empty=False):
-                        self.logger.error(logging_text + "Cannot find task for target_record={}".
-                                          format(task["target_record"]))
-                    # TODO something else appart from logging?
+                                           push={"tasks": db_task}, fail_on_empty=False):
+                            continue
+                    if not self.db.set_one("ro_tasks",
+                                           q_filter={"target_id": target_id,
+                                                     "tasks.target_record": db_task["target_record"]},
+                                           update_dict={"to_check_at": now, "modified_at": now},
+                                           push={"tasks": db_task}, fail_on_empty=False):
+                        # Create a ro_task
+                        step = "Updating database, Creating ro_tasks"
+                        db_ro_task = _create_ro_task(target_id, db_task)
+                        nb_ro_tasks += 1
+                        self.db.create("ro_tasks", db_ro_task)
                 step = "Updating database, nsrs"
                 if db_nsr_update:
                     self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
@@ -700,7 +803,8 @@ class Ns(object):
                         step = "Updating database, vnfrs={}".format(vnfr_id)
                         self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
 
-            self.logger.debug(logging_text + "Exit")
+            self.logger.debug(logging_text + "Exit. Created {} ro_tasks; {} tasks".format(nb_ro_tasks,
+                                                                                          len(db_new_tasks)))
             return {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, action_id, True
 
         except Exception as e:
@@ -712,50 +816,18 @@ class Ns(object):
             raise NsException(e)
 
     def delete(self, session, indata, version, nsr_id, *args, **kwargs):
-        print("ns.delete session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
-        # TODO del when ALL "tasks.nsr_id" are None of nsr_id
+        self.logger.debug("ns.delete version={} nsr_id={}".format(version, nsr_id))
         # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
-        retries = 5
-        for retry in range(retries):
-            with self.write_lock:
-                ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
-                if not ro_tasks:
-                    break
-                now = time()
-                conflict = False
-                for ro_task in ro_tasks:
-                    db_update = {}
-                    to_delete = True
-                    for index, task in enumerate(ro_task["tasks"]):
-                        if not task:
-                            pass
-                        elif task["nsr_id"] == nsr_id:
-                            db_update["tasks.{}".format(index)] = None
-                        else:
-                            to_delete = False  # used by other nsr, cannot be deleted
-                    # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
-                    if to_delete:
-                        if not self.db.del_one("ro_tasks",
-                                               q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
-                                               fail_on_empty=False):
-                            conflict = True
-                    elif db_update:
-                        db_update["modified_at"] = now
-                        if not self.db.set_one("ro_tasks",
-                                               q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
-                                               update_dict=db_update,
-                                               fail_on_empty=False):
-                            conflict = True
-                if not conflict:
-                    break
-        else:
-            raise NsException("Exceeded {} retries".format(retries))
-
+        with self.write_lock:
+            try:
+                NsWorker.delete_db_tasks(self.db, nsr_id, None)
+            except NsWorkerException as e:
+                raise NsException(e)
         return None, None, True
 
     def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
-        print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
-                                                                                         nsr_id, action_id))
+        # self.logger.debug("ns.status version={} nsr_id={}, action_id={} indata={}"
+        #                   .format(version, nsr_id, action_id, indata))
         task_list = []
         done = 0
         total = 0
@@ -764,12 +836,14 @@ class Ns(object):
         details = []
         for ro_task in ro_tasks:
             for task in ro_task["tasks"]:
-                if task["action_id"] == action_id:
+                if task and task["action_id"] == action_id:
                     task_list.append(task)
                     total += 1
                     if task["status"] == "FAILED":
                         global_status = "FAILED"
-                        details.append(ro_task.get("vim_details", ''))
+                        error_text = "Error at {} {}: {}".format(task["action"].lower(), task["item"],
+                                                                 ro_task["vim_info"].get("vim_details") or "unknown")
+                        details.append(error_text)
                     elif task["status"] in ("SCHEDULED", "BUILD"):
                         if global_status != "FAILED":
                             global_status = "BUILD"
index 0b96c53..013cae9 100644 (file)
@@ -28,15 +28,20 @@ import threading
 import time
 import queue
 import logging
+import yaml
 from pkg_resources import iter_entry_points
 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
 # from osm_common.fsbase import FsException
 # from osm_common.msgbase import MsgException
 from osm_ro_plugin.vim_dummy import VimDummyConnector
-from osm_ro_plugin import vimconn
+from osm_ro_plugin.sdn_dummy import SdnDummyConnector
+from osm_ro_plugin import vimconn, sdnconn
 from copy import deepcopy
 from unittest.mock import Mock
+from http import HTTPStatus
+from os import mkdir
+from shutil import rmtree
 
 __author__ = "Alfonso Tierno"
 __date__ = "$28-Sep-2017 12:07:15$"
@@ -68,619 +73,252 @@ class FailingConnector:
         for method in dir(vimconn.VimConnector):
             if method[0] != "_":
                 setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
+        for method in dir(sdnconn.SdnConnectorBase):
+            if method[0] != "_":
+                setattr(self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)))
 
 
 class NsWorkerExceptionNotFound(NsWorkerException):
     pass
 
 
-class NsWorker(threading.Thread):
-    REFRESH_BUILD = 5  # 5 seconds
-    REFRESH_ACTIVE = 60  # 1 minute
-    REFRESH_ERROR = 600
-    REFRESH_IMAGE = 3600 * 10
-    REFRESH_DELETE = 3600 * 10
-    QUEUE_SIZE = 2000
-    # TODO delete assigment_lock = Lock()
-    terminate = False
-    # TODO delete assignment = {}
-    MAX_TIME_LOCKED = 3600
-
-    def __init__(self, worker, config, plugins, db):
-        """Init a thread.
-        Arguments:
-            'id' number of thead
-            'name' name of thread
-            'host','user':  host ip or name to manage and user
-            'db', 'db_lock': database class and lock to use it in exclusion
-        """
-        threading.Thread.__init__(self)
-        self.config = config
-        self.plugins = plugins
-        self.plugin_name = "unknown"
-        self.logger = logging.getLogger('ro.worker{}'.format("worker"))
-        self.worker_id = worker
-        self.task_queue = queue.Queue(self.QUEUE_SIZE)
-        self.my_vims = {}   # targetvim: vimplugin class
-        self.db_vims = {}   # targetvim: vim information from database
-        self.vim_targets = []   # targetvim list
-        self.my_id = config["process_id"] + ":" + str(worker)
+class VimInteractionBase:
+    """ Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
+    It implements methods that does nothing and return ok"""
+    def __init__(self, db, my_vims, db_vims, logger):
         self.db = db
-        self.item2create = {
-            "net": self.new_net,
-            "vdu": self.new_vm,
-            "image": self.new_image,
-            "flavor": self.new_flavor,
-        }
-        self.item2refresh = {
-            "net": self.refresh_net,
-            "vdu": self.refresh_vm,
-            "image": self.refresh_ok,
-            "flavor": self.refresh_ok,
-        }
-        self.item2delete = {
-            "net": self.del_net,
-            "vdu": self.del_vm,
-            "image": self.delete_ok,
-            "flavor": self.del_flavor,
-        }
-        self.item2action = {
-            "vdu": self.exec_vm,
-        }
-        self.time_last_task_processed = None
-
-    def insert_task(self, task):
-        try:
-            self.task_queue.put(task, False)
-            return None
-        except queue.Full:
-            raise NsWorkerException("timeout inserting a task")
-
-    def terminate(self):
-        self.insert_task("exit")
+        self.logger = logger
+        self.my_vims = my_vims
+        self.db_vims = db_vims
 
-    def del_task(self, task):
-        with self.task_lock:
-            if task["status"] == "SCHEDULED":
-                task["status"] = "SUPERSEDED"
-                return True
-            else:  # task["status"] == "processing"
-                self.task_lock.release()
-                return False
+    def new(self, ro_task, task_index, task_depends):
+        return "BUILD", {}
 
-    def _load_plugin(self, name, type="vim"):
-        # type can be vim or sdn
-        if "rovim_dummy" not in self.plugins:
-            self.plugins["rovim_dummy"] = VimDummyConnector
-        if name in self.plugins:
-            return self.plugins[name]
-        try:
-            for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
-                self.plugins[name] = v.load()
-        except Exception as e:
-            self.logger.critical("Cannot load osm_{}: {}".format(name, e))
-            if name:
-                self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
-        if name and name not in self.plugins:
-            error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
-                         " registered".format(t=type, n=name)
-            self.logger.critical(error_text)
-            self.plugins[name] = FailingConnector(error_text)
+    def refresh(self, ro_task):
+        """skip calling VIM to get image, flavor status. Assumes ok"""
+        if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
+            return "FAILED", {}
+        return "DONE", {}
 
-        return self.plugins[name]
+    def delete(self, ro_task, task_index):
+        """skip calling VIM to delete image. Assumes ok"""
+        return "DONE", {}
 
-    def _load_vim(self, vim_account_id):
-        target_id = "vim:" + vim_account_id
-        plugin_name = ""
-        vim = None
-        try:
-            step = "Getting vim={} from db".format(vim_account_id)
-            vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+    def exec(self, ro_task, task_index, task_depends):
+        return "DONE", None, None
 
-            # if deep_get(vim, "config", "sdn-controller"):
-            #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
-            #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
 
-            step = "Decrypt password"
-            schema_version = vim.get("schema_version")
-            self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
-                                           schema_version=schema_version, salt=vim_account_id)
-
-            step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
-            plugin_name = "rovim_" + vim["vim_type"]
-            vim_module_conn = self._load_plugin(plugin_name)
-            self.my_vims[target_id] = vim_module_conn(
-                uuid=vim['_id'], name=vim['name'],
-                tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
-                url=vim['vim_url'], url_admin=None,
-                user=vim['vim_user'], passwd=vim['vim_password'],
-                config=vim.get('config'), persistent_info={}
-            )
-            self.vim_targets.append(target_id)
-            self.db_vims[target_id] = vim
-            self.error_status = None
-            self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
-                vim_account_id, plugin_name))
-        except Exception as e:
-            self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
-                vim_account_id, plugin_name, step, e))
-            self.db_vims[target_id] = vim or {}
-            self.my_vims[target_id] = FailingConnector(str(e))
-            self.error_status = "Error loading vimconnector: {}".format(e)
+class VimInteractionNet(VimInteractionBase):
 
-    def _get_db_task(self):
-        """
-        Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
-        :return: None
-        """
-        now = time.time()
-        if not self.time_last_task_processed:
-            self.time_last_task_processed = now
+    def new(self, ro_task, task_index, task_depends):
+        vim_net_id = None
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
         try:
-            while True:
-                locked = self.db.set_one(
-                    "ro_tasks",
-                    q_filter={"target_id": self.vim_targets,
-                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
-                              "to_check_at.lt": self.time_last_task_processed},
-                    update_dict={"locked_by": self.my_id, "locked_at": now},
-                    fail_on_empty=False)
-                if locked:
-                    # read and return
-                    ro_task = self.db.get_one(
-                        "ro_tasks",
-                        q_filter={"target_id": self.vim_targets,
-                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                  "locked_at": now})
-                    return ro_task
-                if self.time_last_task_processed == now:
-                    self.time_last_task_processed = None
-                    return None
+            # FIND
+            if task.get("find_params"):
+                # if management, get configuration of VIM
+                if task["find_params"].get("filter_dict"):
+                    vim_filter = task["find_params"]["filter_dict"]
+                elif task["find_params"].get("mgmt"):   # mamagement network
+                    if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
+                        vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
+                    elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
+                        vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
+                    else:
+                        vim_filter = {"name": task["find_params"]["name"]}
                 else:
-                    self.time_last_task_processed = now
-                    # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
-
-        except DbException as e:
-            self.logger.error("Database exception at _get_db_task: {}".format(e))
-        except Exception as e:
-            self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
-        return None
+                    raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
 
-    def _delete_task(self, ro_task, task_index, task_depends, db_update):
-        """
-        Determine if this task need to be done or superseded
-        :return: None
-        """
-        my_task = ro_task["tasks"][task_index]
-        task_id = my_task["task_id"]
-        needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
-        if my_task["status"] == "FAILED":
-            return None, None  # TODO need to be retry??
-        try:
-            for index, task in enumerate(ro_task["tasks"]):
-                if index == task_index:
-                    continue  # own task
-                if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
-                    # set to finished
-                    db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
-                elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
-                    needed_delete = False
-            if needed_delete:
-                return self.item2delete[my_task["item"]](ro_task, task_index)
+                vim_nets = target_vim.get_network_list(vim_filter)
+                if not vim_nets and not task.get("params"):
+                    raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
+                        task.get("find_params")))
+                elif len(vim_nets) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                if vim_nets:
+                    vim_net_id = vim_nets[0]["id"]
             else:
-                return "SUPERSEDED", None
-        except Exception as e:
-            if not isinstance(e, NsWorkerException):
-                self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
-                                     exc_info=True)
-            return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                # CREATE
+                params = task["params"]
+                vim_net_id, created_items = target_vim.new_network(**params)
+                created = True
 
-    def _create_task(self, ro_task, task_index, task_depends, db_update):
-        """
-        Determine if this task need to be created
-        :return: None
-        """
-        my_task = ro_task["tasks"][task_index]
-        task_id = my_task["task_id"]
-        task_status = None
-        if my_task["status"] == "FAILED":
-            return None, None  # TODO need to be retry??
-        elif my_task["status"] == "SCHEDULED":
-            # check if already created by another task
-            for index, task in enumerate(ro_task["tasks"]):
-                if index == task_index:
-                    continue  # own task
-                if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
-                    return task["status"], "COPY_VIM_INFO"
+            ro_vim_item_update = {"vim_id": vim_net_id,
+                                  "vim_status": "BUILD",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
 
-            try:
-                task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
-                # TODO update other CREATE tasks
-            except Exception as e:
-                if not isinstance(e, NsWorkerException):
-                    self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
-                task_status = "FAILED"
-                ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
-                # TODO update    ro_vim_item_update
-            return task_status, ro_vim_item_update
-        else:
-            return None, None
+    def refresh(self, ro_task):
+        """Call VIM to get network status"""
+        ro_task_id = ro_task["_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
 
-    def _get_dependency(self, task_id, ro_task=None, target_id=None):
-        if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
-            ro_task_dependency = self.db.get_one(
-                "ro_tasks",
-                q_filter={"target_id": target_id,
-                          "tasks.target_record_id": task_id
-                          },
-                fail_on_empty=False)
-            if ro_task_dependency:
-                for task_index, task in enumerate(ro_task_dependency["tasks"]):
-                    if task["target_record_id"] == task_id:
-                        return ro_task_dependency, task_index
+        vim_id = ro_task["vim_info"]["vim_id"]
+        net_to_refresh_list = [vim_id]
+        try:
+            vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+            vim_info = vim_dict[vim_id]
+            if vim_info["status"] == "ACTIVE":
+                task_status = "DONE"
+            elif vim_info["status"] == "BUILD":
+                task_status = "BUILD"
+            else:
+                task_status = "FAILED"
+        except vimconn.VimConnException as e:
+            # Mark all tasks at VIM_ERROR status
+            self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+            task_status = "FAILED"
 
+        ro_vim_item_update = {}
+        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+            ro_vim_item_update["vim_status"] = vim_info["status"]
+        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+            ro_vim_item_update["vim_name"] = vim_info.get("name")
+        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+            if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
+                ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
+        elif vim_info["status"] == "DELETED":
+            ro_vim_item_update["vim_id"] = None
+            ro_vim_item_update["vim_details"] = "Deleted externally"
         else:
-            if ro_task:
-                for task_index, task in enumerate(ro_task["tasks"]):
-                    if task["task_id"] == task_id:
-                        return ro_task, task_index
-            ro_task_dependency = self.db.get_one(
-                "ro_tasks",
-                q_filter={"tasks.ANYINDEX.task_id": task_id,
-                          "tasks.ANYINDEX.target_record.ne": None
-                          },
-                fail_on_empty=False)
-            if ro_task_dependency:
-                for task_index, task in ro_task_dependency["tasks"]:
-                    if task["task_id"] == task_id:
-                        return ro_task_dependency, task_index
-        raise NsWorkerException("Cannot get depending task {}".format(task_id))
-
-    def _proccess_pending_tasks(self, ro_task):
-        ro_task_id = ro_task["_id"]
-        now = time.time()
-        next_check_at = now + (24*60*60)   # one day
-        db_ro_task_update = {}
-
-        def _update_refresh(new_status):
-            # compute next_refresh
-            nonlocal task
-            nonlocal next_check_at
-            nonlocal db_ro_task_update
-            nonlocal ro_task
-
-            next_refresh = time.time()
-            if task["item"] in ("image", "flavor"):
-                next_refresh += self.REFRESH_IMAGE
-            elif new_status == "BUILD":
-                next_refresh += self.REFRESH_BUILD
-            elif new_status == "DONE":
-                next_refresh += self.REFRESH_ACTIVE
-            else:
-                next_refresh += self.REFRESH_ERROR
-            next_check_at = min(next_check_at, next_refresh)
-            db_ro_task_update["vim_info.refresh_at"] = next_refresh
-            ro_task["vim_info"]["refresh_at"] = next_refresh
+            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+        if ro_vim_item_update:
+            self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+        return task_status, ro_vim_item_update
 
+    def delete(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        net_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
         try:
-            # 0 get task_status_create
-            task_status_create = None
-            task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
-                                t["status"] in ("BUILD", "DONE")), None)
-            if task_create:
-                task_status_create = task_create["status"]
-            # 1. look for SCHEDULED or if CREATE also DONE,BUILD
-            for task_action in ("DELETE", "CREATE", "EXEC"):
-                db_vim_update = None
-                for task_index, task in enumerate(ro_task["tasks"]):
-                    target_update = None
-                    if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
-                            task["action"] != task_action or \
-                            (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
-                        continue
-                    task_path = "tasks.{}.status".format(task_index)
-                    try:
-                        if task["status"] == "SCHEDULED":
-                            task_depends = {}
-                            # check if tasks that this depends on have been completed
-                            dependency_not_completed = False
-                            for dependency_task_id in (task.get("depends_on") or ()):
-                                dependency_ro_task, dependency_task_index = \
-                                    self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
-                                dependency_task = dependency_ro_task["tasks"][dependency_task_index]
-                                if dependency_task["status"] == "SCHEDULED":
-                                    dependency_not_completed = True
-                                    next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
-                                    break
-                                elif dependency_task["status"] == "FAILED":
-                                    error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
-                                        task["action"], task["item"], dependency_task["action"],
-                                        dependency_task["item"], dependency_task_id,
-                                        dependency_ro_task["vim_info"].get("vim_details"))
-                                    self.logger.error("task={} {}".format(task["task_id"], error_text))
-                                    raise NsWorkerException(error_text)
-
-                                task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
-                                task_depends["TASK-{}".format(dependency_task_id)] = \
-                                    dependency_ro_task["vim_info"]["vim_id"]
-                            if dependency_not_completed:
-                                # TODO set at vim_info.vim_details that it is waiting
-                                continue
-
-                        if task["action"] == "DELETE":
-                            new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
-                                                                               task_depends, db_ro_task_update)
-                            new_status = "FINISHED" if new_status == "DONE" else new_status
-                            # ^with FINISHED instead of DONE it will not be refreshing
-                            if new_status in ("FINISHED", "SUPERSEDED"):
-                                target_update = "DELETE"
-                        elif task["action"] == "EXEC":
-                            self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
-                            new_status = "FINISHED" if new_status == "DONE" else new_status
-                            # ^with FINISHED instead of DONE it will not be refreshing
-                            if new_status in ("FINISHED", "SUPERSEDED"):
-                                target_update = "DELETE"
-                        elif task["action"] == "CREATE":
-                            if task["status"] == "SCHEDULED":
-                                if task_status_create:
-                                    new_status = task_status_create
-                                    target_update = "COPY_VIM_INFO"
-                                else:
-                                    new_status, db_vim_info_update = \
-                                        self.item2create[task["item"]](ro_task, task_index, task_depends)
-                                    # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
-                                    _update_refresh(new_status)
-                            else:
-                                if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
-                                    new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
-                                    _update_refresh(new_status)
-                    except Exception as e:
-                        new_status = "FAILED"
-                        db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
-                        if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
-                            self.logger.error("Unexpected exception at _delete_task task={}: {}".
-                                              format(task["task_id"], e), exc_info=True)
-
-                    try:
-                        if db_vim_info_update:
-                            db_vim_update = db_vim_info_update.copy()
-                            db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
-                            ro_task["vim_info"].update(db_vim_info_update)
+            if net_vim_id or ro_task["vim_info"]["created_items"]:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
 
-                        if new_status:
-                            if task_action == "CREATE":
-                                task_status_create = new_status
-                            db_ro_task_update[task_path] = new_status
-                        if target_update or db_vim_update:
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
 
-                            if target_update == "DELETE":
-                                self._update_target(task, None)
-                            elif target_update == "COPY_VIM_INFO":
-                                self._update_target(task, ro_task["vim_info"])
-                            else:
-                                self._update_target(task, db_vim_update)
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                        net_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
 
-                    except Exception as e:
-                        self.logger.error("Unexpected exception at _update_target task={}: {}".
-                                          format(task["task_id"], e), exc_info=True)
+        self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
+                                                            ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
 
-            # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
-            # outside this task (by ro_nbi) do not update it
-            db_ro_task_update["locked_by"] = None
-            # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
-            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
-            db_ro_task_update["to_check_at"] = next_check_at
-            if not self.db.set_one("ro_tasks",
-                                   update_dict=db_ro_task_update,
-                                   q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
-                                   fail_on_empty=False):
-                del db_ro_task_update["to_check_at"]
-                self.db.set_one("ro_tasks",
-                                q_filter={"_id": ro_task["_id"]},
-                                update_dict=db_ro_task_update,
-                                fail_on_empty=True)
-        except DbException as e:
-            self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
-        except Exception as e:
-            self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
 
-    def _update_target(self, task, ro_vim_item_update):
-        try:
-            table, _id, path = task["target_record"].split(":")
-            if ro_vim_item_update:
-                update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
-                               ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
-                if ro_vim_item_update.get("interfaces"):
-                    path_vdu = path[:path.rfind(".")]
-                    path_vdu = path_vdu[:path_vdu.rfind(".")]
-                    path_interfaces = path_vdu + ".interfaces"
-                    for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
-                        if iface:
-                            update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
-                                                k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
-                            if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
-                                update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
-                            if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
-                                update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
-
-                self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
-            else:
-                self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
-                                unset={path: None})
-        except DbException as e:
-            self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
+class VimInteractionVdu(VimInteractionBase):
+    max_retries_inject_ssh_key = 20    # 20 times
+    time_retries_inject_ssh_key = 30   # wevery 30 seconds
 
-    def new_image(self, ro_task, task_index, task_depends):
+    def new(self, ro_task, task_index, task_depends):
         task = ro_task["tasks"][task_index]
         task_id = task["task_id"]
         created = False
         created_items = {}
         target_vim = self.my_vims[ro_task["target_id"]]
         try:
-            # FIND
-            if task.get("find_params"):
-                vim_images = target_vim.get_image_list(**task["find_params"])
-                if not vim_images:
-                    raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
-                        task["find_params"]))
-                elif len(vim_images) > 1:
-                    raise NsWorkerException(
-                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
-                else:
-                    vim_image_id = vim_images[0]["id"]
+            created = True
+            params = task["params"]
+            params_copy = deepcopy(params)
+            net_list = params_copy["net_list"]
+            for net in net_list:
+                if "net_id" in net and net["net_id"].startswith("TASK-"):  # change task_id into network_id
+                    network_id = task_depends[net["net_id"]]
+                    if not network_id:
+                        raise NsWorkerException("Cannot create VM because depends on a network not created or found "
+                                                "for {}".format(net["net_id"]))
+                    net["net_id"] = network_id
+            if params_copy["image_id"].startswith("TASK-"):
+                params_copy["image_id"] = task_depends[params_copy["image_id"]]
+            if params_copy["flavor_id"].startswith("TASK-"):
+                params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
 
-            ro_vim_item_update = {"vim_id": vim_image_id,
-                                  "vim_status": "DONE",
+            vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
+            interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+
+            ro_vim_item_update = {"vim_id": vim_vm_id,
+                                  "vim_status": "BUILD",
                                   "created": created,
                                   "created_items": created_items,
-                                  "vim_details": None}
+                                  "vim_details": None,
+                                  "interfaces_vim_ids": interfaces,
+                                  "interfaces": [],
+                                  }
             self.logger.debug(
-                "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
-            return "DONE", ro_vim_item_update
-        except (NsWorkerException, vimconn.VimConnException) as e:
-            self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+                "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e))
             ro_vim_item_update = {"vim_status": "VIM_ERROR",
                                   "created": created,
                                   "vim_details": str(e)}
             return "FAILED", ro_vim_item_update
 
-    def del_flavor(self, ro_task, task_index):
+    def delete(self, ro_task, task_index):
         task = ro_task["tasks"][task_index]
         task_id = task["task_id"]
-        flavor_vim_id = ro_task["vim_info"]["vim_id"]
+        vm_vim_id = ro_task["vim_info"]["vim_id"]
         ro_vim_item_update_ok = {"vim_status": "DELETED",
                                  "created": False,
                                  "vim_details": "DELETED",
                                  "vim_id": None}
         try:
-            if flavor_vim_id:
+            if vm_vim_id or ro_task["vim_info"]["created_items"]:
                 target_vim = self.my_vims[ro_task["target_id"]]
-                target_vim.delete_flavor(flavor_vim_id)
+                target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
 
         except vimconn.VimConnNotFoundException:
             ro_vim_item_update_ok["vim_details"] = "already deleted"
 
         except vimconn.VimConnException as e:
-            self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
-                ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
+            self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                       vm_vim_id, e))
             ro_vim_item_update = {"vim_status": "VIM_ERROR",
                                   "vim_details": "Error while deleting: {}".format(e)}
             return "FAILED", ro_vim_item_update
 
-        self.logger.debug("task={} {} del-flavor={} {}".format(
-            task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+        self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
+                                                           ro_vim_item_update_ok.get("vim_details", "")))
         return "DONE", ro_vim_item_update_ok
 
-    def refresh_ok(self, ro_task):
-        """skip calling VIM to get image status. Assumes ok"""
-        if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
-            return "FAILED", {}
-        return "DONE", {}
-
-    def delete_ok(self, ro_task):
-        """skip calling VIM to delete image status. Assumes ok"""
-        return "DONE", {}
-
-    def new_flavor(self, ro_task, task_index, task_depends):
-        task = ro_task["tasks"][task_index]
-        task_id = task["task_id"]
-        created = False
-        created_items = {}
-        target_vim = self.my_vims[ro_task["target_id"]]
-        try:
-            # FIND
-            vim_flavor_id = None
-            if task.get("find_params"):
-                try:
-                    flavor_data = task["find_params"]["flavor_data"]
-                    vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
-                except vimconn.VimConnNotFoundException:
-                    pass
-
-            if not vim_flavor_id and task.get("params"):
-                # CREATE
-                flavor_data = task["params"]["flavor_data"]
-                vim_flavor_id = target_vim.new_flavor(flavor_data)
-                created = True
-
-            ro_vim_item_update = {"vim_id": vim_flavor_id,
-                                  "vim_status": "DONE",
-                                  "created": created,
-                                  "created_items": created_items,
-                                  "vim_details": None}
-            self.logger.debug(
-                "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
-            return "DONE", ro_vim_item_update
-        except (vimconn.VimConnException, NsWorkerException) as e:
-            self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
-            ro_vim_item_update = {"vim_status": "VIM_ERROR",
-                                  "created": created,
-                                  "vim_details": str(e)}
-            return "FAILED", ro_vim_item_update
-
-    def new_net(self, ro_task, task_index, task_depends):
-        vim_net_id = None
-        task = ro_task["tasks"][task_index]
-        task_id = task["task_id"]
-        created = False
-        created_items = {}
-        target_vim = self.my_vims[ro_task["target_id"]]
-        try:
-            # FIND
-            if task.get("find_params"):
-                # if management, get configuration of VIM
-                if task["find_params"].get("filter_dict"):
-                    vim_filter = task["find_params"]["filter_dict"]
-                elif task["find_params"].get("mgmt"):   # mamagement network
-                    if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
-                        vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
-                    elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
-                        vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
-                    else:
-                        vim_filter = {"name": task["find_params"]["name"]}
-                else:
-                    raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
-
-                vim_nets = target_vim.get_network_list(vim_filter)
-                if not vim_nets and not task.get("params"):
-                    raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
-                        task.get("find_params")))
-                elif len(vim_nets) > 1:
-                    raise NsWorkerException(
-                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
-                if vim_nets:
-                    vim_net_id = vim_nets[0]["id"]
-            else:
-                # CREATE
-                params = task["params"]
-                vim_net_id, created_items = target_vim.new_network(**params)
-                created = True
-
-            ro_vim_item_update = {"vim_id": vim_net_id,
-                                  "vim_status": "BUILD",
-                                  "created": created,
-                                  "created_items": created_items,
-                                  "vim_details": None}
-            self.logger.debug(
-                "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
-            return "BUILD", ro_vim_item_update
-        except (vimconn.VimConnException, NsWorkerException) as e:
-            self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
-            ro_vim_item_update = {"vim_status": "VIM_ERROR",
-                                  "created": created,
-                                  "vim_details": str(e)}
-            return "FAILED", ro_vim_item_update
-
-    def refresh_net(self, ro_task):
-        """Call VIM to get network status"""
-        ro_task_id = ro_task["_id"]
+    def refresh(self, ro_task):
+        """Call VIM to get vm status"""
+        ro_task_id = ro_task["_id"]
         target_vim = self.my_vims[ro_task["target_id"]]
 
         vim_id = ro_task["vim_info"]["vim_id"]
-        net_to_refresh_list = [vim_id]
+        if not vim_id:
+            return None, None
+        vm_to_refresh_list = [vim_id]
         try:
-            vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+            vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
             vim_info = vim_dict[vim_id]
             if vim_info["status"] == "ACTIVE":
                 task_status = "DONE"
@@ -688,20 +326,46 @@ class NsWorker(threading.Thread):
                 task_status = "BUILD"
             else:
                 task_status = "FAILED"
+            # try to load and parse vim_information
+            try:
+                vim_info_info = yaml.safe_load(vim_info["vim_info"])
+                if vim_info_info.get("name"):
+                    vim_info["name"] = vim_info_info["name"]
+            except Exception:
+                pass
         except vimconn.VimConnException as e:
             # Mark all tasks at VIM_ERROR status
-            self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
             task_status = "FAILED"
 
         ro_vim_item_update = {}
+        # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
+        vim_interfaces = []
+        if vim_info.get("interfaces"):
+            for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
+                iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]),
+                             None)
+                # if iface:
+                #     iface.pop("vim_info", None)
+                vim_interfaces.append(iface)
+
+        task_create = next(t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] != "FINISHED")
+        if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
+            vim_interfaces[task_create["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
+        mgmt_vdu_iface = task_create.get("mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0))
+        if vim_interfaces:
+            vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+
+        if ro_task["vim_info"]["interfaces"] != vim_interfaces:
+            ro_vim_item_update["interfaces"] = vim_interfaces
         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
             ro_vim_item_update["vim_status"] = vim_info["status"]
         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
             ro_vim_item_update["vim_name"] = vim_info.get("name")
         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
-            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
-                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+            if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
+                ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
         elif vim_info["status"] == "DELETED":
             ro_vim_item_update["vim_id"] = None
             ro_vim_item_update["vim_details"] = "Deleted externally"
@@ -709,207 +373,1114 @@ class NsWorker(threading.Thread):
             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
         if ro_vim_item_update:
-            self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+            self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
                 ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
                 ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
         return task_status, ro_vim_item_update
 
-    def del_net(self, ro_task, task_index):
+    def exec(self, ro_task, task_index, task_depends):
         task = ro_task["tasks"][task_index]
         task_id = task["task_id"]
-        net_vim_id = ro_task["vim_info"]["vim_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+        db_task_update = {"retries": 0}
+        retries = task.get("retries", 0)
+        try:
+            params = task["params"]
+            params_copy = deepcopy(params)
+            params_copy["ro_key"] = self.db.decrypt(params_copy.pop("private_key"),
+                                                    params_copy.pop("schema_version"), params_copy.pop("salt"))
+            params_copy["ip_addr"] = params_copy.pop("ip_address")
+            target_vim.inject_user_key(**params_copy)
+            self.logger.debug(
+                "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
+            return "DONE", None, db_task_update,    # params_copy["key"]
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            retries += 1
+            if retries < self.max_retries_inject_ssh_key:
+                return "BUILD", None, {"retries": retries, "next_retry": self.time_retries_inject_ssh_key}
+            self.logger.error("task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_details": str(e)}
+            return "FAILED", ro_vim_item_update, db_task_update
+
+
+class VimInteractionImage(VimInteractionBase):
+
+    def new(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            if task.get("find_params"):
+                vim_images = target_vim.get_image_list(**task["find_params"])
+                if not vim_images:
+                    raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
+                        task["find_params"]))
+                elif len(vim_images) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                else:
+                    vim_image_id = vim_images[0]["id"]
+
+            ro_vim_item_update = {"vim_id": vim_image_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
+            return "DONE", ro_vim_item_update
+        except (NsWorkerException, vimconn.VimConnException) as e:
+            self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+
+class VimInteractionFlavor(VimInteractionBase):
+
+    def delete(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        flavor_vim_id = ro_task["vim_info"]["vim_id"]
         ro_vim_item_update_ok = {"vim_status": "DELETED",
                                  "created": False,
                                  "vim_details": "DELETED",
                                  "vim_id": None}
         try:
-            if net_vim_id or ro_task["vim_info"]["created_items"]:
+            if flavor_vim_id:
                 target_vim = self.my_vims[ro_task["target_id"]]
-                target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
+                target_vim.delete_flavor(flavor_vim_id)
 
         except vimconn.VimConnNotFoundException:
             ro_vim_item_update_ok["vim_details"] = "already deleted"
 
         except vimconn.VimConnException as e:
-            self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
-                                                                        net_vim_id, e))
+            self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
+                ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
             ro_vim_item_update = {"vim_status": "VIM_ERROR",
                                   "vim_details": "Error while deleting: {}".format(e)}
             return "FAILED", ro_vim_item_update
 
-        self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
-                                                            ro_vim_item_update_ok.get("vim_details", "")))
+        self.logger.debug("task={} {} del-flavor={} {}".format(
+            task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def new(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            vim_flavor_id = None
+            if task.get("find_params"):
+                try:
+                    flavor_data = task["find_params"]["flavor_data"]
+                    vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
+                except vimconn.VimConnNotFoundException:
+                    pass
+
+            if not vim_flavor_id and task.get("params"):
+                # CREATE
+                flavor_data = task["params"]["flavor_data"]
+                vim_flavor_id = target_vim.new_flavor(flavor_data)
+                created = True
+
+            ro_vim_item_update = {"vim_id": vim_flavor_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
+            return "DONE", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+
+class VimInteractionSdnNet(VimInteractionBase):
+
+    @staticmethod
+    def _match_pci(port_pci, mapping):
+        """
+        Check if port_pci matches with mapping
+        mapping can have brackets to indicate that several chars are accepted. e.g
+        pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
+        :param port_pci: text
+        :param mapping: text, can contain brackets to indicate several chars are available
+        :return: True if matches, False otherwise
+        """
+        if not port_pci or not mapping:
+            return False
+        if port_pci == mapping:
+            return True
+
+        mapping_index = 0
+        pci_index = 0
+        while True:
+            bracket_start = mapping.find("[", mapping_index)
+            if bracket_start == -1:
+                break
+            bracket_end = mapping.find("]", bracket_start)
+            if bracket_end == -1:
+                break
+            length = bracket_start - mapping_index
+            if length and port_pci[pci_index:pci_index + length] != mapping[mapping_index:bracket_start]:
+                return False
+            if port_pci[pci_index + length] not in mapping[bracket_start+1:bracket_end]:
+                return False
+            pci_index += length + 1
+            mapping_index = bracket_end + 1
+
+        if port_pci[pci_index:] != mapping[mapping_index:]:
+            return False
+        return True
+
+    def _get_interfaces(self, vlds_to_connect, vim_account_id):
+        """
+        :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
+        :param vim_account_id:
+        :return:
+        """
+        interfaces = []
+        for vld in vlds_to_connect:
+            table, _, db_id = vld.partition(":")
+            db_id, _, vld = db_id.partition(":")
+            _, _, vld_id = vld.partition(".")
+            if table == "vnfrs":
+                q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
+                iface_key = "vnf-vld-id"
+            else:  # table == "nsrs"
+                q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
+                iface_key = "ns-vld-id"
+            db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
+            for db_vnfr in db_vnfrs:
+                for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
+                    for iface_index, interface in enumerate(vdur["interfaces"]):
+                        if interface.get(iface_key) == vld_id and \
+                                interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+                            # only SR-IOV o PT
+                            interface_ = interface.copy()
+                            interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(db_vnfr["_id"], vdu_index,
+                                                                                      iface_index)
+                            if vdur.get("status") == "ERROR":
+                                interface_["status"] = "ERROR"
+                            interfaces.append(interface_)
+        return interfaces
+
+    def refresh(self, ro_task):
+        # look for task create
+        task_create_index, _ = next(i_t for i_t in enumerate(ro_task["tasks"])
+                                    if i_t[1] and i_t[1]["action"] == "CREATE" and i_t[1]["status"] != "FINISHED")
+
+        return self.new(ro_task, task_create_index, None)
+
+    def new(self, ro_task, task_index, task_depends):
+
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+
+        sdn_net_id = ro_task["vim_info"]["vim_id"]
+
+        created_items = ro_task["vim_info"].get("created_items")
+        connected_ports = ro_task["vim_info"].get("connected_ports", [])
+        new_connected_ports = []
+        last_update = ro_task["vim_info"].get("last_update", 0)
+        sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
+        error_list = []
+        created = ro_task["vim_info"].get("created", False)
+
+        try:
+
+            # CREATE
+            params = task["params"]
+            vlds_to_connect = params["vlds"]
+            associated_vim = params["target_vim"]
+            additional_ports = params.get("sdn-ports") or ()  # external additional ports
+            _, _, vim_account_id = associated_vim.partition(":")
+            if associated_vim:
+                # get associated VIM
+                if associated_vim not in self.db_vims:
+                    self.db_vims[associated_vim] = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+                db_vim = self.db_vims[associated_vim]
+
+            # look for ports to connect
+            ports = self._get_interfaces(vlds_to_connect, vim_account_id)
+            # print(ports)
+
+            sdn_ports = []
+            pending_ports = error_ports = 0
+            vlan_used = None
+            sdn_need_update = False
+            for port in ports:
+                vlan_used = port.get("vlan") or vlan_used
+                # TODO. Do not connect if already done
+                if not port.get("compute_node") or not port.get("pci"):
+                    if port.get("status") == "ERROR":
+                        error_ports += 1
+                    else:
+                        pending_ports += 1
+                    continue
+                pmap = None
+                compute_node_mappings = next((c for c in db_vim["config"].get("sdn-port-mapping", ())
+                                             if c and c["compute_node"] == port["compute_node"]), None)
+                if compute_node_mappings:
+                    # process port_mapping pci of type 0000:af:1[01].[1357]
+                    pmap = next((p for p in compute_node_mappings["ports"]
+                                 if self._match_pci(port["pci"], p.get("pci"))), None)
+                if not pmap:
+                    if not db_vim["config"].get("mapping_not_needed"):
+                        error_list.append("Port mapping not found for compute_node={} pci={}".format(
+                            port["compute_node"], port["pci"]))
+                        continue
+                    pmap = {}
+
+                service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
+                new_port = {
+                    "service_endpoint_id": pmap.get("service_endpoint_id") or service_endpoint_id,
+                    "service_endpoint_encapsulation_type": "dot1q" if port["type"] == "SR-IOV" else None,
+                    "service_endpoint_encapsulation_info": {
+                        "vlan": port.get("vlan"),
+                        "mac": port.get("mac_address"),
+                        "device_id": pmap.get("device_id") or port["compute_node"],  # device_id
+                        "device_interface_id": pmap.get("device_interface_id") or port["pci"],
+                        "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
+                        "switch_port": pmap.get("switch_port"),
+                        "service_mapping_info": pmap.get("service_mapping_info"),
+                    }
+                }
+
+                # TODO
+                # if port["modified_at"] > last_update:
+                #     sdn_need_update = True
+                new_connected_ports.append(port["id"])  # TODO
+                sdn_ports.append(new_port)
+
+            if error_ports:
+                error_list.append("{} interfaces have not been created as VDU is on ERROR status".format(error_ports))
+
+            # connect external ports
+            for index, additional_port in enumerate(additional_ports):
+                additional_port_id = additional_port.get("service_endpoint_id") or "external-{}".format(index)
+                sdn_ports.append({
+                    "service_endpoint_id": additional_port_id,
+                    "service_endpoint_encapsulation_type": additional_port.get("service_endpoint_encapsulation_type",
+                                                                               "dot1q"),
+                    "service_endpoint_encapsulation_info": {
+                        "vlan": additional_port.get("vlan") or vlan_used,
+                        "mac": additional_port.get("mac_address"),
+                        "device_id": additional_port.get("device_id"),
+                        "device_interface_id": additional_port.get("device_interface_id"),
+                        "switch_dpid": additional_port.get("switch_dpid") or additional_port.get("switch_id"),
+                        "switch_port": additional_port.get("switch_port"),
+                        "service_mapping_info": additional_port.get("service_mapping_info"),
+                    }})
+                new_connected_ports.append(additional_port_id)
+            sdn_info = ""
+            # if there are more ports to connect or they have been modified, call create/update
+            if error_list:
+                sdn_status = "ERROR"
+                sdn_info = "; ".join(error_list)
+            elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
+                last_update = time.time()
+                if not sdn_net_id:
+                    if len(sdn_ports) < 2:
+                        sdn_status = "ACTIVE"
+                        if not pending_ports:
+                            self.logger.debug("task={} {} new-sdn-net done, less than 2 ports".
+                                              format(task_id, ro_task["target_id"]))
+                    else:
+                        net_type = params.get("type") or "ELAN"
+                        sdn_net_id, created_items = target_vim.create_connectivity_service(
+                            net_type, sdn_ports)
+                        created = True
+                        self.logger.debug("task={} {} new-sdn-net={} created={}".
+                                          format(task_id, ro_task["target_id"], sdn_net_id, created))
+                else:
+                    created_items = target_vim.edit_connectivity_service(
+                        sdn_net_id, conn_info=created_items, connection_points=sdn_ports)
+                    created = True
+                    self.logger.debug("task={} {} update-sdn-net={} created={}".
+                                      format(task_id, ro_task["target_id"], sdn_net_id, created))
+                connected_ports = new_connected_ports
+            elif sdn_net_id:
+                wim_status_dict = target_vim.get_connectivity_service_status(sdn_net_id, conn_info=created_items)
+                sdn_status = wim_status_dict["sdn_status"]
+                if wim_status_dict.get("sdn_info"):
+                    sdn_info = str(wim_status_dict.get("sdn_info")) or ""
+                if wim_status_dict.get("error_msg"):
+                    sdn_info = wim_status_dict.get("error_msg") or ""
+
+            if pending_ports:
+                if sdn_status != "ERROR":
+                    sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
+                        len(ports)-pending_ports, len(ports))
+                if sdn_status == "ACTIVE":
+                    sdn_status = "BUILD"
+
+            ro_vim_item_update = {"vim_id": sdn_net_id,
+                                  "vim_status": sdn_status,
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "connected_ports": connected_ports,
+                                  "vim_details": sdn_info,
+                                  "last_update": last_update}
+            return sdn_status, ro_vim_item_update
+        except Exception as e:
+            self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
+                              exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def delete(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        sdn_vim_id = ro_task["vim_info"].get("vim_id")
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if sdn_vim_id:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_connectivity_service(sdn_vim_id, ro_task["vim_info"].get("created_items"))
+
+        except Exception as e:
+            if isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value:
+                ro_vim_item_update_ok["vim_details"] = "already deleted"
+            else:
+                self.logger.error("ro_task={} vim={} del-sdn-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                                sdn_vim_id, e),
+                                  exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
+                ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                      "vim_details": "Error while deleting: {}".format(e)}
+                return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-sdn-net={} {}".format(task_id, ro_task["target_id"], sdn_vim_id,
+                                                                ro_vim_item_update_ok.get("vim_details", "")))
         return "DONE", ro_vim_item_update_ok
 
-    def new_vm(self, ro_task, task_index, task_depends):
-        task = ro_task["tasks"][task_index]
-        task_id = task["task_id"]
-        created = False
-        created_items = {}
-        target_vim = self.my_vims[ro_task["target_id"]]
-        try:
-            created = True
-            params = task["params"]
-            params_copy = deepcopy(params)
-            net_list = params_copy["net_list"]
-            for net in net_list:
-                if "net_id" in net and net["net_id"].startswith("TASK-"):  # change task_id into network_id
-                    network_id = task_depends[net["net_id"]]
-                    if not network_id:
-                        raise NsWorkerException("Cannot create VM because depends on a network not created or found "
-                                                "for {}".format(net["net_id"]))
-                    net["net_id"] = network_id
-            if params_copy["image_id"].startswith("TASK-"):
-                params_copy["image_id"] = task_depends[params_copy["image_id"]]
-            if params_copy["flavor_id"].startswith("TASK-"):
-                params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
 
-            vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
-            interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+class NsWorker(threading.Thread):
+    REFRESH_BUILD = 5  # 5 seconds
+    REFRESH_ACTIVE = 60  # 1 minute
+    REFRESH_ERROR = 600
+    REFRESH_IMAGE = 3600 * 10
+    REFRESH_DELETE = 3600 * 10
+    QUEUE_SIZE = 2000
+    # TODO delete assigment_lock = Lock()
+    terminate = False
+    # TODO delete assignment = {}
+    MAX_TIME_LOCKED = 3600
+    MAX_TIME_VIM_LOCKED = 120
+
+    def __init__(self, worker_index, config, plugins, db):
+        """
+
+        :param worker_index: thread index
+        :param config: general configuration of RO, among others the process_id with the docker id where it runs
+        :param plugins: global shared dict with the loaded plugins
+        :param db: database class instance to use
+        """
+        threading.Thread.__init__(self)
+        self.config = config
+        self.plugins = plugins
+        self.plugin_name = "unknown"
+        self.logger = logging.getLogger('ro.worker{}'.format(worker_index))
+        self.worker_index = worker_index
+        self.task_queue = queue.Queue(self.QUEUE_SIZE)
+        self.my_vims = {}   # targetvim: vimplugin class
+        self.db_vims = {}   # targetvim: vim information from database
+        self.vim_targets = []   # targetvim list
+        self.my_id = config["process_id"] + ":" + str(worker_index)
+        self.db = db
+        self.item2class = {
+            "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
+            "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
+            "image": VimInteractionImage(self.db, self.my_vims, self.db_vims, self.logger),
+            "flavor": VimInteractionFlavor(self.db, self.my_vims, self.db_vims, self.logger),
+            "sdn_net": VimInteractionSdnNet(self.db, self.my_vims, self.db_vims, self.logger),
+        }
+        self.time_last_task_processed = None
+        self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+
+    def insert_task(self, task):
+        try:
+            self.task_queue.put(task, False)
+            return None
+        except queue.Full:
+            raise NsWorkerException("timeout inserting a task")
+
+    def terminate(self):
+        self.insert_task("exit")
+
+    def del_task(self, task):
+        with self.task_lock:
+            if task["status"] == "SCHEDULED":
+                task["status"] = "SUPERSEDED"
+                return True
+            else:  # task["status"] == "processing"
+                self.task_lock.release()
+                return False
+
+    def _process_vim_config(self, target_id, db_vim):
+        """
+        Process vim config, creating vim configuration files as ca_cert
+        :param target_id: vim/sdn/wim + id
+        :param db_vim: Vim dictionary obtained from database
+        :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
+        """
+        if not db_vim.get("config"):
+            return
+        file_name = ""
+        try:
+            if db_vim["config"].get("ca_cert_content"):
+                file_name = "{}:{}".format(target_id, self.worker_index)
+                try:
+                    mkdir(file_name)
+                except FileExistsError:
+                    pass
+                file_name = file_name + "/ca_cert"
+                with open(file_name, "w") as f:
+                    f.write(db_vim["config"]["ca_cert_content"])
+                    del db_vim["config"]["ca_cert_content"]
+                    db_vim["config"]["ca_cert"] = file_name
+        except Exception as e:
+            raise NsWorkerException("Error writing to file '{}': {}".format(file_name, e))
+
+    def _load_plugin(self, name, type="vim"):
+        # type can be vim or sdn
+        if "rovim_dummy" not in self.plugins:
+            self.plugins["rovim_dummy"] = VimDummyConnector
+        if "rosdn_dummy" not in self.plugins:
+            self.plugins["rosdn_dummy"] = SdnDummyConnector
+        if name in self.plugins:
+            return self.plugins[name]
+        try:
+            for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+                self.plugins[name] = v.load()
+        except Exception as e:
+            raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
+        if name and name not in self.plugins:
+            raise NsWorkerException("Plugin 'osm_{n}' has not been installed".format(n=name))
+        return self.plugins[name]
+
+    def _unload_vim(self, target_id):
+        """
+        Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
+        :param target_id: Contains type:_id; where type can be 'vim', ...
+        :return: None.
+        """
+        try:
+            target, _, _id = target_id.partition(":")
+            self.db_vims.pop(target_id, None)
+            self.my_vims.pop(target_id, None)
+            self.vim_targets.remove(target_id)
+            rmtree("{}:{}".format(target_id, self.worker_index))
+        except FileNotFoundError:
+            pass  # this is raised by rmtree if folder does not exist
+        except Exception as e:
+            self.logger.error("Cannot unload {}: {}".format(target_id, e))
+
+    def _check_vim(self, target_id):
+        """
+        Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
+        :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
+        :return: None.
+        """
+        target, _, _id = target_id.partition(":")
+        now = time.time()
+        update_dict = {}
+        unset_dict = {}
+        op_text = ""
+        step = ""
+        loaded = target_id in self.my_vims
+        target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+        try:
+            step = "Getting {} from db".format(target_id)
+            db_vim = self.db.get_one(target_database, {"_id": _id})
+            for op_index, operation in enumerate(db_vim["_admin"].get("operations", ())):
+                if operation["operationState"] != "PROCESSING":
+                    continue
+                locked_at = operation.get("locked_at")
+                if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+                    # some other thread is doing this operation
+                    return
+                # lock
+                op_text = "_admin.operations.{}.".format(op_index)
+                if not self.db.set_one(target_database,
+                                       q_filter={"_id": _id,
+                                                 op_text + "operationState": "PROCESSING",
+                                                 op_text + "locked_at": locked_at
+                                                 },
+                                       update_dict={op_text + "locked_at": now,
+                                                    "admin.current_operation": op_index},
+                                       fail_on_empty=False):
+                    return
+                unset_dict[op_text + "locked_at"] = None
+                unset_dict["current_operation"] = None
+                step = "Loading " + target_id
+                error_text = self._load_vim(target_id)
+                if not error_text:
+                    step = "Checking connectivity"
+                    if target == 'vim':
+                        self.my_vims[target_id].check_vim_connectivity()
+                    else:
+                        self.my_vims[target_id].check_credentials()
+                update_dict["_admin.operationalState"] = "ENABLED"
+                update_dict["_admin.detailed-status"] = ""
+                unset_dict[op_text + "detailed-status"] = None
+                update_dict[op_text + "operationState"] = "COMPLETED"
+                return
+
+        except Exception as e:
+            error_text = "{}: {}".format(step, e)
+            self.logger.error("{} for {}: {}".format(step, target_id, e))
+
+        finally:
+            if update_dict or unset_dict:
+                if error_text:
+                    update_dict[op_text + "operationState"] = "FAILED"
+                    update_dict[op_text + "detailed-status"] = error_text
+                    unset_dict.pop(op_text + "detailed-status", None)
+                    update_dict["_admin.operationalState"] = "ERROR"
+                    update_dict["_admin.detailed-status"] = error_text
+                if op_text:
+                    update_dict[op_text + "statusEnteredTime"] = now
+                self.db.set_one(target_database, q_filter={"_id": _id}, update_dict=update_dict, unset=unset_dict,
+                                fail_on_empty=False)
+            if not loaded:
+                self._unload_vim(target_id)
+
+    def _reload_vim(self, target_id):
+        if target_id in self.vim_targets:
+            self._load_vim(target_id)
+        else:
+            # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
+            # just remove it to force load again next time it is needed
+            self.db_vims.pop(target_id, None)
+
+    def _load_vim(self, target_id):
+        """
+        Load or reload a vim_account, sdn_controller or wim_account.
+        Read content from database, load the plugin if not loaded.
+        In case of error loading the plugin, it load a failing VIM_connector
+        It fills self db_vims dictionary, my_vims dictionary and vim_targets list
+        :param target_id: Contains type:_id; where type can be 'vim', ...
+        :return: None if ok, descriptive text if error
+        """
+        target, _, _id = target_id.partition(":")
+        target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+        plugin_name = ""
+        vim = None
+        try:
+            step = "Getting {}={} from db".format(target, _id)
+            # TODO process for wim, sdnc, ...
+            vim = self.db.get_one(target_database, {"_id": _id})
+
+            # if deep_get(vim, "config", "sdn-controller"):
+            #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+            #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+            step = "Decrypting password"
+            schema_version = vim.get("schema_version")
+            self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+                                           schema_version=schema_version, salt=_id)
+            self._process_vim_config(target_id, vim)
+            if target == "vim":
+                plugin_name = "rovim_" + vim["vim_type"]
+                step = "Loading plugin '{}'".format(plugin_name)
+                vim_module_conn = self._load_plugin(plugin_name)
+                step = "Loading {}'".format(target_id)
+                self.my_vims[target_id] = vim_module_conn(
+                    uuid=vim['_id'], name=vim['name'],
+                    tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+                    url=vim['vim_url'], url_admin=None,
+                    user=vim['vim_user'], passwd=vim['vim_password'],
+                    config=vim.get('config') or {}, persistent_info={}
+                )
+            else:  # sdn
+                plugin_name = "rosdn_" + vim["type"]
+                step = "Loading plugin '{}'".format(plugin_name)
+                vim_module_conn = self._load_plugin(plugin_name, "sdn")
+                step = "Loading {}'".format(target_id)
+                wim = deepcopy(vim)
+                wim_config = wim.pop("config", {}) or {}
+                wim["uuid"] = wim["_id"]
+                wim["wim_url"] = wim["url"]
+                if wim.get("dpid"):
+                    wim_config["dpid"] = wim.pop("dpid")
+                if wim.get("switch_id"):
+                    wim_config["switch_id"] = wim.pop("switch_id")
+                self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)  # wim, wim_account, config
+            self.db_vims[target_id] = vim
+            self.error_status = None
+            self.logger.info("Connector loaded for {}, plugin={}".format(target_id, plugin_name))
+        except Exception as e:
+            self.logger.error("Cannot load {} plugin={}: {} {}".format(
+                target_id, plugin_name, step, e))
+            self.db_vims[target_id] = vim or {}
+            self.db_vims[target_id] = FailingConnector(str(e))
+            error_status = "{} Error: {}".format(step, e)
+            return error_status
+        finally:
+            if target_id not in self.vim_targets:
+                self.vim_targets.append(target_id)
+
+    def _get_db_task(self):
+        """
+        Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+        :return: None
+        """
+        now = time.time()
+        if not self.time_last_task_processed:
+            self.time_last_task_processed = now
+        try:
+            while True:
+                locked = self.db.set_one(
+                    "ro_tasks",
+                    q_filter={"target_id": self.vim_targets,
+                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "to_check_at.lt": self.time_last_task_processed},
+                    update_dict={"locked_by": self.my_id, "locked_at": now},
+                    fail_on_empty=False)
+                if locked:
+                    # read and return
+                    ro_task = self.db.get_one(
+                        "ro_tasks",
+                        q_filter={"target_id": self.vim_targets,
+                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                  "locked_at": now})
+                    return ro_task
+                if self.time_last_task_processed == now:
+                    self.time_last_task_processed = None
+                    return None
+                else:
+                    self.time_last_task_processed = now
+                    # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+        except DbException as e:
+            self.logger.error("Database exception at _get_db_task: {}".format(e))
+        except Exception as e:
+            self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+        return None
+
+    def _delete_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to be done or superseded
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        try:
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index or not task:
+                    continue  # own task
+                if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+                    # set to finished
+                    db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+                elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+                    needed_delete = False
+            if needed_delete:
+                return self.item2class[my_task["item"]].delete(ro_task, task_index)
+            else:
+                return "SUPERSEDED", None
+        except Exception as e:
+            if not isinstance(e, NsWorkerException):
+                self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+                                     exc_info=True)
+            return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+    def _create_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to create something at VIM
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        task_status = None
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        elif my_task["status"] == "SCHEDULED":
+            # check if already created by another task
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index or not task:
+                    continue  # own task
+                if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+                    return task["status"], "COPY_VIM_INFO"
+
+            try:
+                task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
+                    ro_task, task_index, task_depends)
+                # TODO update other CREATE tasks
+            except Exception as e:
+                if not isinstance(e, NsWorkerException):
+                    self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+                task_status = "FAILED"
+                ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                # TODO update    ro_vim_item_update
+            return task_status, ro_vim_item_update
+        else:
+            return None, None
+
+    def _get_dependency(self, task_id, ro_task=None, target_id=None):
+        """
+        Look for dependency task
+        :param task_id: Can be one of
+            1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+            2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+            3. task.task_id: "<action_id>:number"
+        :param ro_task:
+        :param target_id:
+        :return: database ro_task plus index of task
+        """
+        if task_id.startswith("vim:") or task_id.startswith("sdn:") or task_id.startswith("wim:"):
+            target_id, _, task_id = task_id.partition(" ")
+
+        if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"target_id": target_id,
+                          "tasks.target_record_id": task_id
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in enumerate(ro_task_dependency["tasks"]):
+                    if task["target_record_id"] == task_id:
+                        return ro_task_dependency, task_index
+
+        else:
+            if ro_task:
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    if task and task["task_id"] == task_id:
+                        return ro_task, task_index
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"tasks.ANYINDEX.task_id": task_id,
+                          "tasks.ANYINDEX.target_record.ne": None
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in ro_task_dependency["tasks"]:
+                    if task["task_id"] == task_id:
+                        return ro_task_dependency, task_index
+        raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+    def _process_pending_tasks(self, ro_task):
+        ro_task_id = ro_task["_id"]
+        now = time.time()
+        next_check_at = now + (24*60*60)   # one day
+        db_ro_task_update = {}
+
+        def _update_refresh(new_status):
+            # compute next_refresh
+            nonlocal task
+            nonlocal next_check_at
+            nonlocal db_ro_task_update
+            nonlocal ro_task
 
-            ro_vim_item_update = {"vim_id": vim_vm_id,
-                                  "vim_status": "BUILD",
-                                  "created": created,
-                                  "created_items": created_items,
-                                  "vim_details": None,
-                                  "interfaces_vim_ids": interfaces,
-                                  "interfaces": [],
-                                  }
-            self.logger.debug(
-                "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
-            return "BUILD", ro_vim_item_update
-        except (vimconn.VimConnException, NsWorkerException) as e:
-            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
-            ro_vim_item_update = {"vim_status": "VIM_ERROR",
-                                  "created": created,
-                                  "vim_details": str(e)}
-            return "FAILED", ro_vim_item_update
+            next_refresh = time.time()
+            if task["item"] in ("image", "flavor"):
+                next_refresh += self.REFRESH_IMAGE
+            elif new_status == "BUILD":
+                next_refresh += self.REFRESH_BUILD
+            elif new_status == "DONE":
+                next_refresh += self.REFRESH_ACTIVE
+            else:
+                next_refresh += self.REFRESH_ERROR
+            next_check_at = min(next_check_at, next_refresh)
+            db_ro_task_update["vim_info.refresh_at"] = next_refresh
+            ro_task["vim_info"]["refresh_at"] = next_refresh
 
-    def del_vm(self, ro_task, task_index):
-        task = ro_task["tasks"][task_index]
-        task_id = task["task_id"]
-        vm_vim_id = ro_task["vim_info"]["vim_id"]
-        ro_vim_item_update_ok = {"vim_status": "DELETED",
-                                 "created": False,
-                                 "vim_details": "DELETED",
-                                 "vim_id": None}
         try:
-            if vm_vim_id or ro_task["vim_info"]["created_items"]:
-                target_vim = self.my_vims[ro_task["target_id"]]
-                target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
+            # 0 get task_status_create
+            task_status_create = None
+            task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
+                                t["status"] in ("BUILD", "DONE")), None)
+            if task_create:
+                task_status_create = task_create["status"]
+            # 1. look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
+            for task_action in ("DELETE", "CREATE", "EXEC"):
+                db_vim_update = None
+                new_status = None
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    if not task:
+                        continue  # task deleted
+                    target_update = None
+                    if (task_action in ("DELETE", "EXEC") and task["status"] not in ("SCHEDULED", "BUILD")) or \
+                            task["action"] != task_action or \
+                            (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
+                        continue
+                    task_path = "tasks.{}.status".format(task_index)
+                    try:
+                        db_vim_info_update = None
+                        if task["status"] == "SCHEDULED":
+                            task_depends = {}
+                            # check if tasks that this depends on have been completed
+                            dependency_not_completed = False
+                            for dependency_task_id in (task.get("depends_on") or ()):
+                                dependency_ro_task, dependency_task_index = \
+                                    self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
+                                dependency_task = dependency_ro_task["tasks"][dependency_task_index]
+                                if dependency_task["status"] == "SCHEDULED":
+                                    dependency_not_completed = True
+                                    next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
+                                    break
+                                elif dependency_task["status"] == "FAILED":
+                                    error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
+                                        task["action"], task["item"], dependency_task["action"],
+                                        dependency_task["item"], dependency_task_id,
+                                        dependency_ro_task["vim_info"].get("vim_details"))
+                                    self.logger.error("task={} {}".format(task["task_id"], error_text))
+                                    raise NsWorkerException(error_text)
 
-        except vimconn.VimConnNotFoundException:
-            ro_vim_item_update_ok["vim_details"] = "already deleted"
+                                task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
+                                task_depends["TASK-{}".format(dependency_task_id)] = \
+                                    dependency_ro_task["vim_info"]["vim_id"]
+                            if dependency_not_completed:
+                                # TODO set at vim_info.vim_details that it is waiting
+                                continue
 
-        except vimconn.VimConnException as e:
-            self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
-                                                                       vm_vim_id, e))
-            ro_vim_item_update = {"vim_status": "VIM_ERROR",
-                                  "vim_details": "Error while deleting: {}".format(e)}
-            return "FAILED", ro_vim_item_update
+                        if task["action"] == "DELETE":
+                            new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
+                                                                               task_depends, db_ro_task_update)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if new_status in ("FINISHED", "SUPERSEDED"):
+                                target_update = "DELETE"
+                        elif task["action"] == "EXEC":
+                            new_status, db_vim_info_update, db_task_update = self.item2class[task["item"]].exec(
+                                ro_task, task_index, task_depends)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if db_task_update:
+                                # load into database the modified db_task_update "retries" and "next_retry"
+                                if db_task_update.get("retries"):
+                                    db_ro_task_update["tasks.{}.retries".format(task_index)] = db_task_update["retries"]
+                                next_check_at = time.time() + db_task_update.get("next_retry", 60)
+                            target_update = None
+                        elif task["action"] == "CREATE":
+                            if task["status"] == "SCHEDULED":
+                                if task_status_create:
+                                    new_status = task_status_create
+                                    target_update = "COPY_VIM_INFO"
+                                else:
+                                    new_status, db_vim_info_update = \
+                                        self.item2class[task["item"]].new(ro_task, task_index, task_depends)
+                                    # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
+                                    _update_refresh(new_status)
+                            else:
+                                if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
+                                    new_status, db_vim_info_update = self.item2class[task["item"]].refresh(ro_task)
+                                    _update_refresh(new_status)
+                    except Exception as e:
+                        new_status = "FAILED"
+                        db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                        if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
+                            self.logger.error("Unexpected exception at _delete_task task={}: {}".
+                                              format(task["task_id"], e), exc_info=True)
 
-        self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
-                                                           ro_vim_item_update_ok.get("vim_details", "")))
-        return "DONE", ro_vim_item_update_ok
+                    try:
+                        if db_vim_info_update:
+                            db_vim_update = db_vim_info_update.copy()
+                            db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
+                            ro_task["vim_info"].update(db_vim_info_update)
 
-    def refresh_vm(self, ro_task):
-        """Call VIM to get vm status"""
-        ro_task_id = ro_task["_id"]
-        target_vim = self.my_vims[ro_task["target_id"]]
+                        if new_status:
+                            if task_action == "CREATE":
+                                task_status_create = new_status
+                            db_ro_task_update[task_path] = new_status
+                        if target_update or db_vim_update:
 
-        vim_id = ro_task["vim_info"]["vim_id"]
-        if not vim_id:
-            return None, None
-        vm_to_refresh_list = [vim_id]
-        try:
-            vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
-            vim_info = vim_dict[vim_id]
-            if vim_info["status"] == "ACTIVE":
-                task_status = "DONE"
-            elif vim_info["status"] == "BUILD":
-                task_status = "BUILD"
-            else:
-                task_status = "FAILED"
-        except vimconn.VimConnException as e:
-            # Mark all tasks at VIM_ERROR status
-            self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
-            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
-            task_status = "FAILED"
+                            if target_update == "DELETE":
+                                self._update_target(task, None)
+                            elif target_update == "COPY_VIM_INFO":
+                                self._update_target(task, ro_task["vim_info"])
+                            else:
+                                self._update_target(task, db_vim_update)
 
-        ro_vim_item_update = {}
-        # TODO check and update interfaces
-        vim_interfaces = []
-        for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
-            iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
-            # if iface:
-            #     iface.pop("vim_info", None)
-            vim_interfaces.append(iface)
-
-        task = ro_task["tasks"][0]  # TODO look for a task CREATE and active
-        if task.get("mgmt_vnf_interface") is not None:
-            vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
-        mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
-        vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+                    except Exception as e:
+                        if isinstance(e, DbException) and e.http_code == HTTPStatus.NOT_FOUND:
+                            # if the vnfrs or nsrs has been removed from database, this task must be removed
+                            self.logger.debug("marking to delete task={}".format(task["task_id"]))
+                            self.tasks_to_delete.append(task)
+                        else:
+                            self.logger.error("Unexpected exception at _update_target task={}: {}".
+                                              format(task["task_id"], e), exc_info=True)
 
-        if ro_task["vim_info"]["interfaces"] != vim_interfaces:
-            ro_vim_item_update["interfaces"] = vim_interfaces
-        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
-            ro_vim_item_update["vim_status"] = vim_info["status"]
-        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
-            ro_vim_item_update["vim_name"] = vim_info.get("name")
-        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
-            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
-                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
-        elif vim_info["status"] == "DELETED":
-            ro_vim_item_update["vim_id"] = None
-            ro_vim_item_update["vim_details"] = "Deleted externally"
-        else:
-            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
-                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+            # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
+            # outside this task (by ro_nbi) do not update it
+            db_ro_task_update["locked_by"] = None
+            # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
+            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["to_check_at"] = next_check_at
+            if not self.db.set_one("ro_tasks",
+                                   update_dict=db_ro_task_update,
+                                   q_filter=q_filter,
+                                   fail_on_empty=False):
+                del db_ro_task_update["to_check_at"]
+                del q_filter["to_check_at"]
+                self.db.set_one("ro_tasks",
+                                q_filter=q_filter,
+                                update_dict=db_ro_task_update,
+                                fail_on_empty=True)
+        except DbException as e:
+            self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
+        except Exception as e:
+            self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
+
+    def _update_target(self, task, ro_vim_item_update):
+        table, _, temp = task["target_record"].partition(":")
+        _id, _, path_vim_status = temp.partition(":")
+        path_item = path_vim_status[:path_vim_status.rfind(".")]
+        path_item = path_item[:path_item.rfind(".")]
+        # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
+        # path_item: dot separated list targeting record information, e.g. "vdur.10"
         if ro_vim_item_update:
-            self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
-                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
-                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
-        return task_status, ro_vim_item_update
+            update_dict = {path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k in
+                           ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
+            if path_vim_status.startswith("vdur."):
+                # for backward compatibility, add vdur.name apart from vdur.vim_name
+                if ro_vim_item_update.get("vim_name"):
+                    update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
+                # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
+                if ro_vim_item_update.get("vim_id"):
+                    update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
+                # update general status
+                if ro_vim_item_update.get("vim_status"):
+                    update_dict[path_item + ".status"] = ro_vim_item_update["vim_status"]
+            if ro_vim_item_update.get("interfaces"):
+                path_interfaces = path_item + ".interfaces"
+                for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
+                    if iface:
+                        update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
+                                            k in ('vlan', 'compute_node', 'pci')})
+                        # put ip_address and mac_address with ip-address and mac-address
+                        if iface.get('ip_address'):
+                            update_dict[path_interfaces + ".{}.".format(i) + "ip-address"] = iface['ip_address']
+                        if iface.get('mac_address'):
+                            update_dict[path_interfaces + ".{}.".format(i) + "mac-address"] = iface['mac_address']
+                        if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
+                            update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
+                        if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
+                            update_dict[path_item + ".ip-address"] = iface.get("ip_address").split(";")[0]
+
+            self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
+        else:
+            update_dict = {path_item + ".status": "DELETED"}
+            self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict, unset={path_vim_status: None})
 
-    def exec_vm(self, ro_task, task_index, task_depends):
-        task = ro_task["tasks"][task_index]
-        task_id = task["task_id"]
-        target_vim = self.my_vims[ro_task["target_id"]]
-        try:
-            params = task["params"]
-            params_copy = deepcopy(params)
-            params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
-                                                         params_copy.pop("schema_version"), params_copy.pop("salt"))
+    def _process_delete_db_tasks(self):
+        """
+        Delete task from database because vnfrs or nsrs or both have been deleted
+        :return: None. Uses and modify self.tasks_to_delete
+        """
+        while self.tasks_to_delete:
+            task = self.tasks_to_delete[0]
+            vnfrs_deleted = None
+            nsr_id = task["nsr_id"]
+            if task["target_record"].startswith("vnfrs:"):
+                # check if nsrs is present
+                if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
+                    vnfrs_deleted = task["target_record"].split(":")[1]
+            try:
+                self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
+            except Exception as e:
+                self.logger.error("Error deleting task={}: {}".format(task["task_id"], e))
+            self.tasks_to_delete.pop(0)
 
-            target_vim.inject_user_key(**params_copy)
-            self.logger.debug(
-                "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
-            return "DONE", params_copy["key"]
-        except (vimconn.VimConnException, NsWorkerException) as e:
-            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
-            ro_vim_item_update = {"vim_details": str(e)}
-            return "FAILED", ro_vim_item_update
+    @staticmethod
+    def delete_db_tasks(db, nsr_id, vnfrs_deleted):
+        """
+        Static method because it is called from osm_ng_ro.ns
+        :param db: instance of database to use
+        :param nsr_id: affected nsrs id
+        :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
+        :return: None, exception is fails
+        """
+        retries = 5
+        for retry in range(retries):
+            ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+            now = time.time()
+            conflict = False
+            for ro_task in ro_tasks:
+                db_update = {}
+                to_delete_ro_task = True
+                for index, task in enumerate(ro_task["tasks"]):
+                    if not task:
+                        pass
+                    elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or \
+                            (vnfrs_deleted and task["target_record"].startswith("vnfrs:"+vnfrs_deleted)):
+                        db_update["tasks.{}".format(index)] = None
+                    else:
+                        to_delete_ro_task = False  # used by other nsr, ro_task cannot be deleted
+                # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
+                if to_delete_ro_task:
+                    if not db.del_one("ro_tasks",
+                                      q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+                                      fail_on_empty=False):
+                        conflict = True
+                elif db_update:
+                    db_update["modified_at"] = now
+                    if not db.set_one("ro_tasks",
+                                      q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+                                      update_dict=db_update,
+                                      fail_on_empty=False):
+                        conflict = True
+            if not conflict:
+                return
+        else:
+            raise NsWorkerException("Exceeded {} retries".format(retries))
 
     def run(self):
         # load database
         self.logger.debug("Starting")
         while True:
+            # step 1: get commands from queue
             try:
                 task = self.task_queue.get(block=False if self.my_vims else True)
                 if task[0] == "terminate":
                     break
-                if task[0] == "load_vim":
+                elif task[0] == "load_vim":
                     self._load_vim(task[1])
+                elif task[0] == "unload_vim":
+                    self._unload_vim(task[1])
+                elif task[0] == "reload_vim":
+                    self._reload_vim(task[1])
+                elif task[0] == "check_vim":
+                    self._check_vim(task[1])
                 continue
-            except queue.Empty:
-                pass
+            except Exception as e:
+                if isinstance(e, queue.Empty):
+                    pass
+                else:
+                    self.logger.critical("Error processing task: {}".format(e), exc_info=True)
 
+            # step 2: process pending_tasks, delete not needed tasks
             try:
+                if self.tasks_to_delete:
+                    self._process_delete_db_tasks()
                 busy = False
                 ro_task = self._get_db_task()
                 if ro_task:
-                    self._proccess_pending_tasks(ro_task)
+                    self._process_pending_tasks(ro_task)
                     busy = True
                 if not busy:
                     time.sleep(5)
index 4af2830..808fc0a 100644 (file)
@@ -35,14 +35,14 @@ tools.staticdir.dir: "/app/RO/RO-NG/osm_ng_ro/html_public"
 [global]
 # use env OSMRO_SERVER_XXX, OSMRO_LOG_XXX, OSMRO_TEST_XXX or OSMRO_AUTH_XXX to override. Use value in yaml format
 server.socket_host: "0.0.0.0"
-server.socket_port: 9998
+server.socket_port: 9090
 
 # server.ssl_module: "builtin"
 # server.ssl_certificate: "./http/cert.pem"
 # server.ssl_private_key: "./http/privkey.pem"
 # server.ssl_pass_phrase: "osm4u"
 server.thread_pool: 10
-server.ns_threads: 1
+server.ns_threads: 10
 
 # Uncomment for allow basic authentication apart from bearer
 # auth.allow_basic_authentication: True
@@ -69,7 +69,7 @@ name: "osm"
 
 [storage]
 # use env OSMRO_STORAGE_XXX to override
-driver: "local"            # local filesystem
+driver: None  # "local"            # local filesystem
 # for local provide file path
 path: "/app/storage"       #"/home/atierno/OSM/osm/NBI/local/storage"
 
index 35a93fe..768f205 100644 (file)
@@ -30,6 +30,7 @@ import sys
 
 from osm_ng_ro.ns import Ns, NsException
 from osm_ng_ro.validation import ValidationError
+from osm_ng_ro.vim_admin import VimAdminThread
 from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
 from osm_common.msgbase import MsgException
@@ -46,6 +47,8 @@ version_date = "May 2020"
 database_version = '1.2'
 auth_database_version = '1.0'
 ro_server = None           # instance of Server class
+vim_admin_thread = None   # instance of VimAdminThread class
+
 # vim_threads = None  # instance of VimThread class
 
 """
@@ -579,7 +582,7 @@ def _start_service():
     Set database, storage, message configuration
     Init database with admin/admin user password
     """
-    global ro_server
+    global ro_server, vim_admin_thread
     # global vim_threads
     cherrypy.log.error("Starting osm_ng_ro")
     # update general cherrypy configuration
@@ -603,11 +606,7 @@ def _start_service():
                 # update [/] configuration
                 engine_config["/"]["tools." + k2.replace('_', '.')] = yaml.safe_load(v)
             elif k1 in ("message", "database", "storage", "authentication"):
-                # update [message], [database], ... configuration
-                if k2 in ("port", "db_port"):
-                    engine_config[k1][k2] = int(v)
-                else:
-                    engine_config[k1][k2] = v
+                engine_config[k1][k2] = yaml.safe_load(v)
 
         except Exception as e:
             raise RoException("Cannot load env '{}': {}".format(k, e))
@@ -644,7 +643,6 @@ def _start_service():
     if engine_config["global"].get("log.level"):
         logger_cherry.setLevel(engine_config["global"]["log.level"])
         logger_nbi.setLevel(engine_config["global"]["log.level"])
-
     # logging other modules
     for k1, logname in {"message": "ro.msg", "database": "ro.db", "storage": "ro.fs"}.items():
         engine_config[k1]["logger_name"] = logname
@@ -665,16 +663,13 @@ def _start_service():
     cherrypy.tree.apps['/ro'].root.ns.init_db(target_version=database_version)
 
     # # start subscriptions thread:
-    # vim_threads = []
-    # for thread_id in range(engine_config["global"]["server.ns_threads"]):
-    #     vim_thread = VimThread(thread_id, config=engine_config, engine=ro_server.ns)
-    #     vim_thread.start()
-    #     vim_threads.append(vim_thread)
+    vim_admin_thread = VimAdminThread(config=engine_config, engine=ro_server.ns)
+    vim_admin_thread.start()
     # # Do not capture except SubscriptionException
 
-    backend = engine_config["authentication"]["backend"]
-    cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend"
-                       .format(ro_version, ro_version_date, backend))
+    backend = engine_config["authentication"]["backend"]
+    cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend"
+                       .format(ro_version, ro_version_date, backend))
 
 
 def _stop_service():
@@ -682,11 +677,11 @@ def _stop_service():
     Callback function called when cherrypy.engine stops
     TODO: Ending database connections.
     """
-    # global vim_threads
-    # if vim_threads:
-    #     for vim_thread in vim_threads:
-    #         vim_thread.terminate()
-    # vim_threads = None
+    global vim_admin_thread
+    # terminate vim_admin_thread
+    if vim_admin_thread:
+        vim_admin_thread.terminate()
+    vim_admin_thread = None
     cherrypy.tree.apps['/ro'].root.ns.stop()
     cherrypy.log.error("Stopping osm_ng_ro")
 
@@ -734,6 +729,8 @@ if __name__ == '__main__':
                 print("No configuration file 'ro.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
                 exit(1)
         ro_main(config_file)
+    except KeyboardInterrupt:
+        print("KeyboardInterrupt. Finishing", file=sys.stderr)
     except getopt.GetoptError as e:
         print(str(e), file=sys.stderr)
         # usage()
index 060a3eb..54d8eed 100644 (file)
@@ -31,26 +31,50 @@ ssh_key_schema = {"type": "string", "minLength": 1}
 id_schema = {"type": "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
 bool_schema = {"type": "boolean"}
 null_schema = {"type": "null"}
+object_schema = {"type": "object"}
 
-image_schema = {
-    "title": "image input validation",
+deploy_item_schema = {
+    "title": "deploy item validation. Each vld, vdu, flavor, image, ...",
     "$schema": "http://json-schema.org/draft-04/schema#",
     "type": "object",
-    # TODO
+    "properties": {
+        "id": string_schema,
+        "vim_info": object_schema,
+        "common_id": string_schema,
+    },
+    "additionalProperties": True
 }
 
-flavor_schema = {
-    "title": "image input validation",
+deploy_item_list = {
+    "type": "array",
+    "items": deploy_item_schema,
+}
+
+deploy_vnf_schema = {
+    "title": "deploy.vnf.item validation",
     "$schema": "http://json-schema.org/draft-04/schema#",
     "type": "object",
-    # TODO
+    "properties": {
+        "_id": id_schema,
+        "vdur": deploy_item_list,
+        "vld": deploy_item_list,
+    },
+    "additionalProperties": True,
+    "required": ["_id"],
 }
 
-ns_schema = {
-    "title": "image input validation",
+deploy_action_schema = {
+    "title": "deploy.action validation",
     "$schema": "http://json-schema.org/draft-04/schema#",
     "type": "object",
-    # TODO
+    "properties": {
+        "action": {"enum": ["inject_ssh_key"]},
+        "key": ssh_key_schema,
+        "user": string_schema,
+        "password": string_schema,
+    },
+    "additionalProperties": False,
+    "required": ["action"],
 }
 
 deploy_schema = {
@@ -59,34 +83,22 @@ deploy_schema = {
     "type": "object",
     "properties": {
         "action_id": string_schema,
-        "name": name_schema,
-        "action": {"enum" ["inject_ssh_key"]},
-        "key": ssh_key_schema,
-        "user": name_schema,
-        "password": string_schema,
+        "cloud_init_content": object_schema,
+        "name": string_schema,
+        "action": deploy_action_schema,
         "vnf": {
-            "type": "object",
-            "properties": {
-                "_id": id_schema,
-                # TODO
-            },
-            "required": ["_id"],
-            "additionalProperties": True,
-        },
-        "image": {
             "type": "array",
-            "minItems": 1,
-            "items": image_schema
+            "items": deploy_vnf_schema,
         },
-        "flavor": {
-            "type": "array",
-            "minItems": 1,
-            "items": flavor_schema
+        "image": deploy_item_list,
+        "flavor": deploy_item_list,
+        "ns": {
+            "type": "object",
+            "properties": {
+                "vld": deploy_item_list,
+            }
         },
-        "ns": ns_schema,
     },
-
-    "required": ["name"],
     "additionalProperties": False
 }
 
diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py
new file mode 100644 (file)
index 0000000..d7a7c1d
--- /dev/null
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module implements a thread that reads from kafka bus reading VIM messages.
+It is based on asyncio.
+It is in charge of load tasks assigned to VIMs that nobody is in chage of it
+"""
+
+import logging
+import threading
+import asyncio
+from http import HTTPStatus
+
+from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from osm_ng_ro.ns import NsException
+from time import time
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class VimAdminException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+        self.http_code = http_code
+        Exception.__init__(self, message)
+
+
+class VimAdminThread(threading.Thread):
+    MAX_TIME_LOCKED = 3600  # 1h
+    MAX_TIME_UNATTENDED = 60  # 600  # 10min
+    kafka_topics = ("vim_account", "wim_account", "sdn")
+
+    def __init__(self, config, engine):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        :param engine: an instance of Engine class, used for deleting instances
+        """
+        threading.Thread.__init__(self)
+        self.to_terminate = False
+        self.config = config
+        self.db = None
+        self.msg = None
+        self.engine = engine
+        self.loop = None
+        self.last_rotask_time = 0
+        self.logger = logging.getLogger("ro.vimadmin")
+        self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
+        self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+
+    async def vim_watcher(self):
+        """ Reads database periodically looking for tasks not processed by nobody because of a restar
+        in order to load this vim"""
+        while not self.to_terminate:
+            now = time()
+            if not self.last_rotask_time:
+                self.last_rotask_time = 0
+            ro_tasks = self.db.get_list("ro_tasks",
+                                        q_filter={"target_id.ncont": self.engine.assignment_list,
+                                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                                  "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                                                  "to_check_at.gt": self.last_rotask_time,
+                                                  "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+            self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+            for ro_task in ro_tasks:
+                if ro_task["target_id"] not in self.engine.assignment_list:
+                    self.engine.assign_vim(ro_task["target_id"])
+                    self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+            await asyncio.sleep(300, loop=self.loop)
+
+    async def aiomain(self):
+        kafka_working = True
+        while not self.to_terminate:
+            try:
+                if not self.aiomain_task_kafka:
+                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
+                    await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+                    kafka_working = True
+                    self.logger.debug("Starting vim_account subscription task")
+                    self.aiomain_task_kafka = asyncio.ensure_future(
+                        self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
+                                         aiocallback=self._msg_callback),
+                        loop=self.loop)
+                if not self.aiomain_task_vim:
+                    self.aiomain_task_vim = asyncio.ensure_future(
+                        self.vim_watcher(),
+                        loop=self.loop)
+                done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
+                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                try:
+                    if self.aiomain_task_kafka in done:
+                        exc = self.aiomain_task_kafka.exception()
+                        self.logger.error("kafka subscription task exception: {}".format(exc))
+                        self.aiomain_task_kafka = None
+                    if self.aiomain_task_vim in done:
+                        exc = self.aiomain_task_vim.exception()
+                        self.logger.error("vim_account watcher task exception: {}".format(exc))
+                        self.aiomain_task_vim = None
+                except asyncio.CancelledError:
+                    pass
+
+            except Exception as e:
+                if self.to_terminate:
+                    return
+                if kafka_working:
+                    # logging only first time
+                    self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+                    kafka_working = False
+            await asyncio.sleep(10, loop=self.loop)
+
+    def run(self):
+        """
+        Start of the thread
+        :return: None
+        """
+        self.loop = asyncio.new_event_loop()
+        try:
+            if not self.db:
+                if self.config["database"]["driver"] == "mongo":
+                    self.db = dbmongo.DbMongo()
+                    self.db.db_connect(self.config["database"])
+                elif self.config["database"]["driver"] == "memory":
+                    self.db = dbmemory.DbMemory()
+                    self.db.db_connect(self.config["database"])
+                else:
+                    raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
+                        self.config["database"]["driver"]))
+            if not self.msg:
+                config_msg = self.config["message"].copy()
+                config_msg["loop"] = self.loop
+                if config_msg["driver"] == "local":
+                    self.msg = msglocal.MsgLocal()
+                    self.msg.connect(config_msg)
+                elif config_msg["driver"] == "kafka":
+                    self.msg = msgkafka.MsgKafka()
+                    self.msg.connect(config_msg)
+                else:
+                    raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
+                        config_msg["driver"]))
+        except (DbException, MsgException) as e:
+            raise VimAdminException(str(e), http_code=e.http_code)
+
+        self.logger.debug("Starting")
+        while not self.to_terminate:
+            try:
+                self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
+            # except asyncio.CancelledError:
+            #     break  # if cancelled it should end, breaking loop
+            except Exception as e:
+                if not self.to_terminate:
+                    self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+
+        self.logger.debug("Finishing")
+        self._stop()
+        self.loop.close()
+
+    async def _msg_callback(self, topic, command, params):
+        """
+        Callback to process a received message from kafka
+        :param topic:  topic received
+        :param command:  command received
+        :param params: rest of parameters
+        :return: None
+        """
+        try:
+            if command == "echo":
+                return
+            if topic in self.kafka_topics:
+                target = topic[0:3]   # vim, wim or sdn
+                target_id = target + ":" + params["_id"]
+                if command in ("edited", "edit"):
+                    self.engine.reload_vim(target_id)
+                    self.logger.debug("ordered to reload {}".format(target_id))
+                elif command in ("deleted", "delete"):
+                    self.engine.unload_vim(target_id)
+                    self.logger.debug("ordered to unload {}".format(target_id))
+                elif command in ("create", "created"):
+                    self.engine.check_vim(target_id)
+                    self.logger.debug("ordered to check {}".format(target_id))
+
+        except (NsException, DbException, MsgException) as e:
+            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+        except Exception as e:
+            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
+                                  exc_info=True)
+
+    def _stop(self):
+        """
+        Close all connections
+        :return: None
+        """
+        try:
+            if self.db:
+                self.db.db_disconnect()
+            if self.msg:
+                self.msg.disconnect()
+        except (DbException, MsgException) as e:
+            raise VimAdminException(str(e), http_code=e.http_code)
+
+    def terminate(self):
+        """
+        This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
+        but not immediately.
+        :return: None
+        """
+        self.to_terminate = True
+        if self.aiomain_task_kafka:
+            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+        if self.aiomain_task_vim:
+            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
index acc5ba8..0645e54 100644 (file)
@@ -1225,6 +1225,7 @@ class vimconnector(vimconn.VimConnector):
                 type: 'virtual', 'PCI-PASSTHROUGH'('PF'), 'SR-IOV'('VF'), 'VFnotShared'
                 vim_id: filled/added by this function
                 floating_ip: True/False (or it can be None)
+                port_security: True/False
             'cloud_config': (optional) dictionary with:
                 'key-pairs': (optional) list of strings with the public key to be inserted to the default user
                 'users': (optional) list of users to be inserted, each item is a dict with:
@@ -1706,7 +1707,8 @@ class vimconnector(vimconn.VimConnector):
                 else:
                     vm['status'] = "OTHER"
                     vm['error_msg'] = "VIM status reported " + vm_vim['status']
-
+                vm_vim.pop("OS-EXT-SRV-ATTR:user_data", None)
+                vm_vim.pop("user_data", None)
                 vm['vim_info'] = self.serialize(vm_vim)
 
                 vm["interfaces"] = []
index f7910c9..f60deea 100644 (file)
@@ -162,6 +162,7 @@ class SdnConnectorOpenFlow(SdnConnectorBase):
     def __init__(self, wim, wim_account, config=None, logger=None, of_connector=None):
         self.logger = logger or logging.getLogger('openmano.sdn.openflow_conn')
         self.of_connector = of_connector
+        config = config or {}
         self.of_controller_nets_with_same_vlan = config.get("of_controller_nets_with_same_vlan", False)
 
     def check_credentials(self):
index 619a679..a2d189a 100644 (file)
@@ -79,7 +79,7 @@ class SdnDummyConnector(SdnConnectorBase):
         """
         self.logger.debug("get_connectivity_service_status: service_uuid='{}' conn_info='{}'".format(service_uuid,
                                                                                                      conn_info))
-        return {'sdn_status': 'ACTIVE', 'sdn_info': self.connectivity.get(service_uuid)}
+        return {'sdn_status': 'ACTIVE', 'sdn_info': self.connections.get(service_uuid)}
 
     def create_connectivity_service(self, service_type, connection_points,
                                     **kwargs):
index 3189239..e90d213 100644 (file)
@@ -25,6 +25,7 @@ from osm_ro_plugin import vimconn
 from uuid import uuid4
 from copy import deepcopy
 import logging
+from random import randrange
 
 __author__ = "Alfonso Tierno"
 __date__ = "2020-04-20"
@@ -248,6 +249,12 @@ class VimDummyConnector(vimconn.VimConnector):
                 "vim_interface_id": str(iface_index),
                 "vim_net_id": iface["net_id"],
             }
+            if iface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH") and self.config.get("sdn-port-mapping"):
+                compute_index = randrange(len(self.config["sdn-port-mapping"]))
+                port_index = randrange(len(self.config["sdn-port-mapping"][compute_index]["ports"]))
+                interface["compute_node"] = self.config["sdn-port-mapping"][compute_index]["compute_node"]
+                interface["pci"] = self.config["sdn-port-mapping"][compute_index]["ports"][port_index]["pci"]
+
             interfaces.append(interface)
         vm = {
             "id": vm_id,
index abe3c5c..16ff008 100755 (executable)
 # then it checks if database is present and creates it if needed.
 # Finally it launches RO server.
 
+#  if New Generation RO just launch it
+if [ -n "$OSMRO_NG" ] ; then
+    python3 -m osm_ng_ro.ro_main || exit 1
+    exit 0
+fi
+
 [ -z "$RO_DB_OVIM_HOST" ] && export RO_DB_OVIM_HOST="$RO_DB_HOST"
 [ -z "$RO_DB_OVIM_ROOT_PASSWORD" ] && export RO_DB_OVIM_ROOT_PASSWORD="$RO_DB_ROOT_PASSWORD"