Refactor method deploy 74/11674/23
authorgallardo <sgallardor@indra.es>
Fri, 11 Feb 2022 12:41:59 +0000 (12:41 +0000)
committergallardo <sgallardor@indra.es>
Wed, 2 Mar 2022 14:29:04 +0000 (14:29 +0000)
This change removes the method `_process_items` and refactors the method
`deploy` to do the following tasks:

- `calculate_all_differences_to_deploy`: returns the list of items to be
  created and deleted
- `define_all_tasks`: creates all the task structures
- `upload_all_tasks`: saves all tasks in the common DB

Change-Id: I5cb59dd697875634a4a92df8673c9842a4578d5f
Signed-off-by: gallardo <sgallardor@indra.es>
NG-RO/osm_ng_ro/ns.py

index d187dd8..f3b073e 100644 (file)
@@ -108,6 +108,18 @@ class Ns(object):
         self.next_worker = 0
         self.plugins = {}
         self.workers = []
+        self.process_params_function_map = {
+            "net": Ns._process_net_params,
+            "image": Ns._process_image_params,
+            "flavor": Ns._process_flavor_params,
+            "vdu": Ns._process_vdu_params,
+        }
+        self.db_path_map = {
+            "net": "vld",
+            "image": "image",
+            "flavor": "flavor",
+            "vdu": "vdur",
+        }
 
     def init_db(self, target_version):
         pass
@@ -1081,6 +1093,379 @@ class Ns(object):
 
         return extra_dict
 
+    def calculate_diff_items(
+        self,
+        indata,
+        db_nsr,
+        db_ro_nsr,
+        db_nsr_update,
+        item,
+        tasks_by_target_record_id,
+        action_id,
+        nsr_id,
+        task_index,
+        vnfr_id=None,
+        vnfr=None,
+    ):
+        """Function that returns the incremental changes (creation, deletion)
+        related to a specific item `item` to be done. This function should be
+        called for NS instantiation, NS termination, NS update to add a new VNF
+        or a new VLD, remove a VNF or VLD, etc.
+        Item can be `net, `flavor`, `image` or `vdu`.
+        It takes a list of target items from indata (which came from the REST API)
+        and compares with the existing items from db_ro_nsr, identifying the
+        incremental changes to be done. During the comparison, it calls the method
+        `process_params` (which was passed as parameter, and is particular for each
+        `item`)
+
+        Args:
+            indata (Dict[str, Any]): deployment info
+            db_nsr: NSR record from DB
+            db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+            db_nsr_update (Dict[str, Any]): NSR info to update in DB
+            item (str): element to process (net, vdu...)
+            tasks_by_target_record_id (Dict[str, Any]):
+                [<target_record_id>, <task>]
+            action_id (str): action id
+            nsr_id (str): NSR id
+            task_index (number): task index to add to task name
+            vnfr_id (str): VNFR id
+            vnfr (Dict[str, Any]): VNFR info
+
+        Returns:
+            List: list with the incremental changes (deletes, creates) for each item
+            number: current task index
+        """
+
+        diff_items = []
+        db_path = ""
+        db_record = ""
+        target_list = []
+        existing_list = []
+        process_params = None
+        vdu2cloud_init = indata.get("cloud_init_content") or {}
+        ro_nsr_public_key = db_ro_nsr["public_key"]
+
+        # According to the type of item, the path, the target_list,
+        # the existing_list and the method to process params are set
+        db_path = self.db_path_map[item]
+        process_params = self.process_params_function_map[item]
+        if item in ("net", "vdu"):
+            if vnfr is None:
+                db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+                target_list = indata.get("ns", []).get(db_path, []) 
+                existing_list = db_nsr.get(db_path, [])
+            else:
+                db_record = "vnfrs:{}:{}".format(vnfr_id, db_path)
+                target_vnf = next(
+                    (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id),
+                    None,
+                )
+                target_list = target_vnf.get(db_path, []) if target_vnf else []
+                existing_list = vnfr.get(db_path, [])
+        elif item in ("image", "flavor"):
+            db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+            target_list = indata.get(item, [])
+            existing_list = db_nsr.get(item, [])
+        else:
+            raise NsException("Item not supported: {}", item)
+
+        # ensure all the target_list elements has an "id". If not assign the index as id
+        if target_list is None:
+            target_list = []
+        for target_index, tl in enumerate(target_list):
+            if tl and not tl.get("id"):
+                tl["id"] = str(target_index)
+
+        # step 1 items (networks,vdus,...) to be deleted/updated
+        for item_index, existing_item in enumerate(existing_list):
+            target_item = next(
+                (t for t in target_list if t["id"] == existing_item["id"]),
+                None,
+            )
+
+            for target_vim, existing_viminfo in existing_item.get(
+                "vim_info", {}
+            ).items():
+                if existing_viminfo is None:
+                    continue
+
+                if target_item:
+                    target_viminfo = target_item.get("vim_info", {}).get(target_vim)
+                else:
+                    target_viminfo = None
+
+                if target_viminfo is None:
+                    # must be deleted
+                    self._assign_vim(target_vim)
+                    target_record_id = "{}.{}".format(db_record, existing_item["id"])
+                    item_ = item
+
+                    if target_vim.startswith("sdn"):
+                        # item must be sdn-net instead of net if target_vim is a sdn
+                        item_ = "sdn_net"
+                        target_record_id += ".sdn"
+
+                    deployment_info = {
+                        "action_id": action_id,
+                        "nsr_id": nsr_id,
+                        "task_index": task_index,
+                    }
+
+                    diff_items.append(
+                        {
+                            "deployment_info": deployment_info,
+                            "target_id": target_vim,
+                            "item": item_,
+                            "action": "DELETE",
+                            "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+                            "target_record_id": target_record_id,
+                        }
+                    )
+                    task_index += 1
+
+        # step 2 items (networks,vdus,...) to be created
+        for target_item in target_list:
+            item_index = -1
+
+            for item_index, existing_item in enumerate(existing_list):
+                if existing_item["id"] == target_item["id"]:
+                    break
+            else:
+                item_index += 1
+                db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+                existing_list.append(target_item)
+                existing_item = None
+
+            for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
+                existing_viminfo = None
+
+                if existing_item:
+                    existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
+
+                if existing_viminfo is not None:
+                    continue
+
+                target_record_id = "{}.{}".format(db_record, target_item["id"])
+                item_ = item
+
+                if target_vim.startswith("sdn"):
+                    # item must be sdn-net instead of net if target_vim is a sdn
+                    item_ = "sdn_net"
+                    target_record_id += ".sdn"
+
+                kwargs = {}
+                if process_params == Ns._process_vdu_params:
+                    kwargs.update(
+                        {
+                            "vnfr_id": vnfr_id,
+                            "nsr_id": nsr_id,
+                            "vnfr": vnfr,
+                            "vdu2cloud_init": vdu2cloud_init,
+                            "tasks_by_target_record_id": tasks_by_target_record_id,
+                            "logger": self.logger,
+                            "db": self.db,
+                            "fs": self.fs,
+                            "ro_nsr_public_key": ro_nsr_public_key,
+                        }
+                    )
+
+                extra_dict = process_params(
+                    target_item,
+                    indata,
+                    target_viminfo,
+                    target_record_id,
+                    **kwargs,
+                )
+                self._assign_vim(target_vim)
+
+                deployment_info = {
+                    "action_id": action_id,
+                    "nsr_id": nsr_id,
+                    "task_index": task_index,
+                }
+
+                diff_items.append(
+                    {
+                        "deployment_info": deployment_info,
+                        "target_id": target_vim,
+                        "item": item_,
+                        "action": "CREATE",
+                        "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+                        "target_record_id": target_record_id,
+                        "extra_dict": extra_dict,
+                        "common_id": target_item.get("common_id", None),
+                    }
+                )
+                task_index += 1
+
+                db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+
+        return diff_items, task_index
+
+    def calculate_all_differences_to_deploy(
+        self,
+        indata,
+        nsr_id,
+        db_nsr,
+        db_vnfrs,
+        db_ro_nsr,
+        db_nsr_update,
+        db_vnfrs_update,
+        action_id,
+        tasks_by_target_record_id,
+    ):
+        """This method calculates the ordered list of items (`changes_list`)
+        to be created and deleted.
+
+        Args:
+            indata (Dict[str, Any]): deployment info
+            nsr_id (str): NSR id
+            db_nsr: NSR record from DB
+            db_vnfrs: VNFRS record from DB
+            db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+            db_nsr_update (Dict[str, Any]): NSR info to update in DB
+            db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB
+            action_id (str): action id
+            tasks_by_target_record_id (Dict[str, Any]):
+                [<target_record_id>, <task>]
+
+        Returns:
+            List: ordered list of items to be created and deleted.
+        """
+
+        task_index = 0
+        # set list with diffs:
+        changes_list = []
+
+        # NS vld, image and flavor
+        for item in ["net", "image", "flavor"]:
+            self.logger.debug("process NS={} {}".format(nsr_id, item))
+            diff_items, task_index = self.calculate_diff_items(
+                indata=indata,
+                db_nsr=db_nsr,
+                db_ro_nsr=db_ro_nsr,
+                db_nsr_update=db_nsr_update,
+                item=item,
+                tasks_by_target_record_id=tasks_by_target_record_id,
+                action_id=action_id,
+                nsr_id=nsr_id,
+                task_index=task_index,
+                vnfr_id=None,
+            )
+            changes_list += diff_items
+
+        # VNF vlds and vdus
+        for vnfr_id, vnfr in db_vnfrs.items():
+            # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
+            for item in ["net", "vdu"]:
+                self.logger.debug("process VNF={} {}".format(vnfr_id, item))
+                diff_items, task_index = self.calculate_diff_items(
+                    indata=indata,
+                    db_nsr=db_nsr,
+                    db_ro_nsr=db_ro_nsr,
+                    db_nsr_update=db_vnfrs_update[vnfr["_id"]],
+                    item=item,
+                    tasks_by_target_record_id=tasks_by_target_record_id,
+                    action_id=action_id,
+                    nsr_id=nsr_id,
+                    task_index=task_index,
+                    vnfr_id=vnfr_id,
+                    vnfr=vnfr,
+                )
+                changes_list += diff_items
+
+        return changes_list
+
+    def define_all_tasks(
+        self,
+        changes_list,
+        db_new_tasks,
+        tasks_by_target_record_id,
+    ):
+        """Function to create all the task structures obtanied from
+        the method calculate_all_differences_to_deploy
+
+        Args:
+            changes_list (List): ordered list of items to be created or deleted
+            db_new_tasks (List): tasks list to be created
+            action_id (str): action id
+            tasks_by_target_record_id (Dict[str, Any]):
+                [<target_record_id>, <task>]
+
+        """
+
+        for change in changes_list:
+            task = Ns._create_task(
+                deployment_info=change["deployment_info"],
+                target_id=change["target_id"],
+                item=change["item"],
+                action=change["action"],
+                target_record=change["target_record"],
+                target_record_id=change["target_record_id"],
+                extra_dict=change.get("extra_dict", None),
+            )
+
+            tasks_by_target_record_id[change["target_record_id"]] = task
+            db_new_tasks.append(task)
+
+            if change.get("common_id"):
+                task["common_id"] = change["common_id"]
+
+    def upload_all_tasks(
+        self,
+        db_new_tasks,
+        now,
+    ):
+        """Function to save all tasks in the common DB
+
+        Args:
+            db_new_tasks (List): tasks list to be created
+            now (time): current time
+
+        """
+
+        nb_ro_tasks = 0  # for logging
+
+        for db_task in db_new_tasks:
+            target_id = db_task.pop("target_id")
+            common_id = db_task.get("common_id")
+
+            if common_id:
+                if self.db.set_one(
+                    "ro_tasks",
+                    q_filter={
+                        "target_id": target_id,
+                        "tasks.common_id": common_id,
+                    },
+                    update_dict={"to_check_at": now, "modified_at": now},
+                    push={"tasks": db_task},
+                    fail_on_empty=False,
+                ):
+                    continue
+
+            if not self.db.set_one(
+                "ro_tasks",
+                q_filter={
+                    "target_id": target_id,
+                    "tasks.target_record": db_task["target_record"],
+                },
+                update_dict={"to_check_at": now, "modified_at": now},
+                push={"tasks": db_task},
+                fail_on_empty=False,
+            ):
+                # Create a ro_task
+                self.logger.debug("Updating database, Creating ro_tasks")
+                db_ro_task = Ns._create_ro_task(target_id, db_task)
+                nb_ro_tasks += 1
+                self.db.create("ro_tasks", db_ro_task)
+
+        self.logger.debug(
+            "Created {} ro_tasks; {} tasks - db_new_tasks={}".format(
+                nb_ro_tasks, len(db_new_tasks), db_new_tasks
+            )
+        )
+
     def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
         self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
         validate_input(indata, deploy_schema)
@@ -1090,8 +1475,6 @@ class Ns(object):
         db_nsr_update = {}  # update operation on nsrs
         db_vnfrs_update = {}
         db_vnfrs = {}  # vnf's info indexed by _id
-        nb_ro_tasks = 0  # for logging
-        vdu2cloud_init = indata.get("cloud_init_content") or {}
         step = ""
         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
         self.logger.debug(logging_text + "Enter")
@@ -1118,8 +1501,6 @@ class Ns(object):
             if not db_ro_nsr:
                 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
 
-            ro_nsr_public_key = db_ro_nsr["public_key"]
-
             # check that action_id is not in the list of actions. Suffixed with :index
             if action_id in db_ro_nsr["actions"]:
                 index = 1
@@ -1137,168 +1518,6 @@ class Ns(object):
 
                     index += 1
 
-            def _process_items(
-                target_list,
-                existing_list,
-                db_record,
-                db_update,
-                db_path,
-                item,
-                process_params,
-            ):
-                nonlocal db_new_tasks
-                nonlocal tasks_by_target_record_id
-                nonlocal action_id
-                nonlocal nsr_id
-                nonlocal task_index
-                nonlocal indata
-
-                # ensure all the target_list elements has an "id". If not assign the index as id
-                for target_index, tl in enumerate(target_list):
-                    if tl and not tl.get("id"):
-                        tl["id"] = str(target_index)
-
-                # step 1 items (networks,vdus,...) to be deleted/updated
-                for item_index, existing_item in enumerate(existing_list):
-                    target_item = next(
-                        (t for t in target_list if t["id"] == existing_item["id"]), None
-                    )
-
-                    for target_vim, existing_viminfo in existing_item.get(
-                        "vim_info", {}
-                    ).items():
-                        if existing_viminfo is None:
-                            continue
-
-                        if target_item:
-                            target_viminfo = target_item.get("vim_info", {}).get(
-                                target_vim
-                            )
-                        else:
-                            target_viminfo = None
-
-                        if target_viminfo is None:
-                            # must be deleted
-                            self._assign_vim(target_vim)
-                            target_record_id = "{}.{}".format(
-                                db_record, existing_item["id"]
-                            )
-                            item_ = item
-
-                            if target_vim.startswith("sdn"):
-                                # item must be sdn-net instead of net if target_vim is a sdn
-                                item_ = "sdn_net"
-                                target_record_id += ".sdn"
-
-                            deployment_info = {
-                                "action_id": action_id,
-                                "nsr_id": nsr_id,
-                                "task_index": task_index,
-                            }
-
-                            task = Ns._create_task(
-                                deployment_info=deployment_info,
-                                target_id=target_vim,
-                                item=item_,
-                                action="DELETE",
-                                target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
-                                target_record_id=target_record_id,
-                            )
-
-                            task_index = deployment_info.get("task_index")
-
-                            tasks_by_target_record_id[target_record_id] = task
-                            db_new_tasks.append(task)
-                            # TODO delete
-                    # TODO check one by one the vims to be created/deleted
-
-                # step 2 items (networks,vdus,...) to be created
-                for target_item in target_list:
-                    item_index = -1
-
-                    for item_index, existing_item in enumerate(existing_list):
-                        if existing_item["id"] == target_item["id"]:
-                            break
-                    else:
-                        item_index += 1
-                        db_update[db_path + ".{}".format(item_index)] = target_item
-                        existing_list.append(target_item)
-                        existing_item = None
-
-                    for target_vim, target_viminfo in target_item.get(
-                        "vim_info", {}
-                    ).items():
-                        existing_viminfo = None
-
-                        if existing_item:
-                            existing_viminfo = existing_item.get("vim_info", {}).get(
-                                target_vim
-                            )
-
-                        # TODO check if different. Delete and create???
-                        # TODO delete if not exist
-                        if existing_viminfo is not None:
-                            continue
-
-                        target_record_id = "{}.{}".format(db_record, target_item["id"])
-                        item_ = item
-
-                        if target_vim.startswith("sdn"):
-                            # item must be sdn-net instead of net if target_vim is a sdn
-                            item_ = "sdn_net"
-                            target_record_id += ".sdn"
-
-                        kwargs = {}
-                        if process_params == Ns._process_vdu_params:
-                            kwargs.update(
-                                {
-                                    "vnfr_id": vnfr_id,
-                                    "nsr_id": nsr_id,
-                                    "vnfr": vnfr,
-                                    "vdu2cloud_init": vdu2cloud_init,
-                                    "tasks_by_target_record_id": tasks_by_target_record_id,
-                                    "logger": self.logger,
-                                    "db": self.db,
-                                    "fs": self.fs,
-                                    "ro_nsr_public_key": ro_nsr_public_key,
-                                }
-                            )
-
-                        extra_dict = process_params(
-                            target_item,
-                            indata,
-                            target_viminfo,
-                            target_record_id,
-                            **kwargs,
-                        )
-                        self._assign_vim(target_vim)
-
-                        deployment_info = {
-                            "action_id": action_id,
-                            "nsr_id": nsr_id,
-                            "task_index": task_index,
-                        }
-
-                        task = Ns._create_task(
-                            deployment_info=deployment_info,
-                            target_id=target_vim,
-                            item=item_,
-                            action="CREATE",
-                            target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
-                            target_record_id=target_record_id,
-                            extra_dict=extra_dict,
-                        )
-
-                        task_index = deployment_info.get("task_index")
-
-                        tasks_by_target_record_id[target_record_id] = task
-                        db_new_tasks.append(task)
-
-                        if target_item.get("common_id"):
-                            task["common_id"] = target_item["common_id"]
-
-                        db_update[db_path + ".{}".format(item_index)] = target_item
-
             def _process_action(indata):
                 nonlocal db_new_tasks
                 nonlocal action_id
@@ -1384,108 +1603,30 @@ class Ns(object):
                     _process_action(indata)
                 else:
                     # compute network differences
-                    # NS.vld
-                    step = "process NS VLDs"
-                    _process_items(
-                        target_list=indata["ns"]["vld"] or [],
-                        existing_list=db_nsr.get("vld") or [],
-                        db_record="nsrs:{}:vld".format(nsr_id),
-                        db_update=db_nsr_update,
-                        db_path="vld",
-                        item="net",
-                        process_params=Ns._process_net_params,
-                    )
-
-                    step = "process NS images"
-                    _process_items(
-                        target_list=indata.get("image") or [],
-                        existing_list=db_nsr.get("image") or [],
-                        db_record="nsrs:{}:image".format(nsr_id),
-                        db_update=db_nsr_update,
-                        db_path="image",
-                        item="image",
-                        process_params=Ns._process_image_params,
+                    # NS
+                    step = "process NS elements"
+                    changes_list = self.calculate_all_differences_to_deploy(
+                        indata=indata,
+                        nsr_id=nsr_id,
+                        db_nsr=db_nsr,
+                        db_vnfrs=db_vnfrs,
+                        db_ro_nsr=db_ro_nsr,
+                        db_nsr_update=db_nsr_update,
+                        db_vnfrs_update=db_vnfrs_update,
+                        action_id=action_id,
+                        tasks_by_target_record_id=tasks_by_target_record_id,
                     )
-
-                    step = "process NS flavors"
-                    _process_items(
-                        target_list=indata.get("flavor") or [],
-                        existing_list=db_nsr.get("flavor") or [],
-                        db_record="nsrs:{}:flavor".format(nsr_id),
-                        db_update=db_nsr_update,
-                        db_path="flavor",
-                        item="flavor",
-                        process_params=Ns._process_flavor_params,
+                    self.define_all_tasks(
+                        changes_list=changes_list,
+                        db_new_tasks=db_new_tasks,
+                        tasks_by_target_record_id=tasks_by_target_record_id,
                     )
 
-                    # VNF.vld
-                    for vnfr_id, vnfr in db_vnfrs.items():
-                        # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
-                        step = "process VNF={} VLDs".format(vnfr_id)
-                        target_vnf = next(
-                            (
-                                vnf
-                                for vnf in indata.get("vnf", ())
-                                if vnf["_id"] == vnfr_id
-                            ),
-                            None,
-                        )
-                        target_list = target_vnf.get("vld") if target_vnf else None
-                        _process_items(
-                            target_list=target_list or [],
-                            existing_list=vnfr.get("vld") or [],
-                            db_record="vnfrs:{}:vld".format(vnfr_id),
-                            db_update=db_vnfrs_update[vnfr["_id"]],
-                            db_path="vld",
-                            item="net",
-                            process_params=Ns._process_net_params,
-                        )
-
-                        target_list = target_vnf.get("vdur") if target_vnf else None
-                        step = "process VNF={} VDUs".format(vnfr_id)
-                        _process_items(
-                            target_list=target_list or [],
-                            existing_list=vnfr.get("vdur") or [],
-                            db_record="vnfrs:{}:vdur".format(vnfr_id),
-                            db_update=db_vnfrs_update[vnfr["_id"]],
-                            db_path="vdur",
-                            item="vdu",
-                            process_params=Ns._process_vdu_params,
-                        )
-
-                for db_task in db_new_tasks:
-                    step = "Updating database, Appending tasks to ro_tasks"
-                    target_id = db_task.pop("target_id")
-                    common_id = db_task.get("common_id")
-
-                    if common_id:
-                        if self.db.set_one(
-                            "ro_tasks",
-                            q_filter={
-                                "target_id": target_id,
-                                "tasks.common_id": common_id,
-                            },
-                            update_dict={"to_check_at": now, "modified_at": now},
-                            push={"tasks": db_task},
-                            fail_on_empty=False,
-                        ):
-                            continue
-
-                    if not self.db.set_one(
-                        "ro_tasks",
-                        q_filter={
-                            "target_id": target_id,
-                            "tasks.target_record": db_task["target_record"],
-                        },
-                        update_dict={"to_check_at": now, "modified_at": now},
-                        push={"tasks": db_task},
-                        fail_on_empty=False,
-                    ):
-                        # Create a ro_task
-                        step = "Updating database, Creating ro_tasks"
-                        db_ro_task = Ns._create_ro_task(target_id, db_task)
-                        nb_ro_tasks += 1
-                        self.db.create("ro_tasks", db_ro_task)
+                step = "Updating database, Appending tasks to ro_tasks"
+                self.upload_all_tasks(
+                    db_new_tasks=db_new_tasks,
+                    now=now,
+                )
 
                 step = "Updating database, nsrs"
                 if db_nsr_update:
@@ -1497,10 +1638,7 @@ class Ns(object):
                         self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
 
             self.logger.debug(
-                logging_text
-                + "Exit. Created {} ro_tasks; {} tasks".format(
-                    nb_ro_tasks, len(db_new_tasks)
-                )
+                logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
             )
 
             return (