+ def calculate_diff_items(
+ self,
+ indata,
+ db_nsr,
+ db_ro_nsr,
+ db_nsr_update,
+ item,
+ tasks_by_target_record_id,
+ action_id,
+ nsr_id,
+ task_index,
+ vnfr_id=None,
+ vnfr=None,
+ ):
+ """Function that returns the incremental changes (creation, deletion)
+ related to a specific item `item` to be done. This function should be
+ called for NS instantiation, NS termination, NS update to add a new VNF
+ or a new VLD, remove a VNF or VLD, etc.
+ Item can be `net, `flavor`, `image` or `vdu`.
+ It takes a list of target items from indata (which came from the REST API)
+ and compares with the existing items from db_ro_nsr, identifying the
+ incremental changes to be done. During the comparison, it calls the method
+ `process_params` (which was passed as parameter, and is particular for each
+ `item`)
+
+ Args:
+ indata (Dict[str, Any]): deployment info
+ db_nsr: NSR record from DB
+ db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+ db_nsr_update (Dict[str, Any]): NSR info to update in DB
+ item (str): element to process (net, vdu...)
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+ action_id (str): action id
+ nsr_id (str): NSR id
+ task_index (number): task index to add to task name
+ vnfr_id (str): VNFR id
+ vnfr (Dict[str, Any]): VNFR info
+
+ Returns:
+ List: list with the incremental changes (deletes, creates) for each item
+ number: current task index
+ """
+
+ diff_items = []
+ db_path = ""
+ db_record = ""
+ target_list = []
+ existing_list = []
+ process_params = None
+ vdu2cloud_init = indata.get("cloud_init_content") or {}
+ ro_nsr_public_key = db_ro_nsr["public_key"]
+
+ # According to the type of item, the path, the target_list,
+ # the existing_list and the method to process params are set
+ db_path = self.db_path_map[item]
+ process_params = self.process_params_function_map[item]
+ if item in ("net", "vdu"):
+ if vnfr is None:
+ db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+ target_list = indata.get("ns", []).get(db_path, [])
+ existing_list = db_nsr.get(db_path, [])
+ else:
+ db_record = "vnfrs:{}:{}".format(vnfr_id, db_path)
+ target_vnf = next(
+ (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id),
+ None,
+ )
+ target_list = target_vnf.get(db_path, []) if target_vnf else []
+ existing_list = vnfr.get(db_path, [])
+ elif item in ("image", "flavor"):
+ db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+ target_list = indata.get(item, [])
+ existing_list = db_nsr.get(item, [])
+ else:
+ raise NsException("Item not supported: {}", item)
+
+ # ensure all the target_list elements has an "id". If not assign the index as id
+ if target_list is None:
+ target_list = []
+ for target_index, tl in enumerate(target_list):
+ if tl and not tl.get("id"):
+ tl["id"] = str(target_index)
+
+ # step 1 items (networks,vdus,...) to be deleted/updated
+ for item_index, existing_item in enumerate(existing_list):
+ target_item = next(
+ (t for t in target_list if t["id"] == existing_item["id"]),
+ None,
+ )
+
+ for target_vim, existing_viminfo in existing_item.get(
+ "vim_info", {}
+ ).items():
+ if existing_viminfo is None:
+ continue
+
+ if target_item:
+ target_viminfo = target_item.get("vim_info", {}).get(target_vim)
+ else:
+ target_viminfo = None
+
+ if target_viminfo is None:
+ # must be deleted
+ self._assign_vim(target_vim)
+ target_record_id = "{}.{}".format(db_record, existing_item["id"])
+ item_ = item
+
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ diff_items.append(
+ {
+ "deployment_info": deployment_info,
+ "target_id": target_vim,
+ "item": item_,
+ "action": "DELETE",
+ "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+ "target_record_id": target_record_id,
+ }
+ )
+ task_index += 1
+
+ # step 2 items (networks,vdus,...) to be created
+ for target_item in target_list:
+ item_index = -1
+
+ for item_index, existing_item in enumerate(existing_list):
+ if existing_item["id"] == target_item["id"]:
+ break
+ else:
+ item_index += 1
+ db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+ existing_list.append(target_item)
+ existing_item = None
+
+ for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
+ existing_viminfo = None
+
+ if existing_item:
+ existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
+
+ if existing_viminfo is not None:
+ continue
+
+ target_record_id = "{}.{}".format(db_record, target_item["id"])
+ item_ = item
+
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+
+ kwargs = {}
+ if process_params == Ns._process_vdu_params:
+ kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "vnfr": vnfr,
+ "vdu2cloud_init": vdu2cloud_init,
+ "tasks_by_target_record_id": tasks_by_target_record_id,
+ "logger": self.logger,
+ "db": self.db,
+ "fs": self.fs,
+ "ro_nsr_public_key": ro_nsr_public_key,
+ }
+ )
+
+ extra_dict = process_params(
+ target_item,
+ indata,
+ target_viminfo,
+ target_record_id,
+ **kwargs,
+ )
+ self._assign_vim(target_vim)
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ diff_items.append(
+ {
+ "deployment_info": deployment_info,
+ "target_id": target_vim,
+ "item": item_,
+ "action": "CREATE",
+ "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+ "target_record_id": target_record_id,
+ "extra_dict": extra_dict,
+ "common_id": target_item.get("common_id", None),
+ }
+ )
+ task_index += 1
+
+ db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+
+ return diff_items, task_index
+
+ def calculate_all_differences_to_deploy(
+ self,
+ indata,
+ nsr_id,
+ db_nsr,
+ db_vnfrs,
+ db_ro_nsr,
+ db_nsr_update,
+ db_vnfrs_update,
+ action_id,
+ tasks_by_target_record_id,
+ ):
+ """This method calculates the ordered list of items (`changes_list`)
+ to be created and deleted.
+
+ Args:
+ indata (Dict[str, Any]): deployment info
+ nsr_id (str): NSR id
+ db_nsr: NSR record from DB
+ db_vnfrs: VNFRS record from DB
+ db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+ db_nsr_update (Dict[str, Any]): NSR info to update in DB
+ db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB
+ action_id (str): action id
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+
+ Returns:
+ List: ordered list of items to be created and deleted.
+ """
+
+ task_index = 0
+ # set list with diffs:
+ changes_list = []
+
+ # NS vld, image and flavor
+ for item in ["net", "image", "flavor"]:
+ self.logger.debug("process NS={} {}".format(nsr_id, item))
+ diff_items, task_index = self.calculate_diff_items(
+ indata=indata,
+ db_nsr=db_nsr,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_nsr_update,
+ item=item,
+ tasks_by_target_record_id=tasks_by_target_record_id,
+ action_id=action_id,
+ nsr_id=nsr_id,
+ task_index=task_index,
+ vnfr_id=None,
+ )
+ changes_list += diff_items
+
+ # VNF vlds and vdus
+ for vnfr_id, vnfr in db_vnfrs.items():
+ # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
+ for item in ["net", "vdu"]:
+ self.logger.debug("process VNF={} {}".format(vnfr_id, item))
+ diff_items, task_index = self.calculate_diff_items(
+ indata=indata,
+ db_nsr=db_nsr,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_vnfrs_update[vnfr["_id"]],
+ item=item,
+ tasks_by_target_record_id=tasks_by_target_record_id,
+ action_id=action_id,
+ nsr_id=nsr_id,
+ task_index=task_index,
+ vnfr_id=vnfr_id,
+ vnfr=vnfr,
+ )
+ changes_list += diff_items
+
+ return changes_list
+
+ def define_all_tasks(
+ self,
+ changes_list,
+ db_new_tasks,
+ tasks_by_target_record_id,
+ ):
+ """Function to create all the task structures obtanied from
+ the method calculate_all_differences_to_deploy
+
+ Args:
+ changes_list (List): ordered list of items to be created or deleted
+ db_new_tasks (List): tasks list to be created
+ action_id (str): action id
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+
+ """
+
+ for change in changes_list:
+ task = Ns._create_task(
+ deployment_info=change["deployment_info"],
+ target_id=change["target_id"],
+ item=change["item"],
+ action=change["action"],
+ target_record=change["target_record"],
+ target_record_id=change["target_record_id"],
+ extra_dict=change.get("extra_dict", None),
+ )
+
+ tasks_by_target_record_id[change["target_record_id"]] = task
+ db_new_tasks.append(task)
+
+ if change.get("common_id"):
+ task["common_id"] = change["common_id"]
+
+ def upload_all_tasks(
+ self,
+ db_new_tasks,
+ now,
+ ):
+ """Function to save all tasks in the common DB
+
+ Args:
+ db_new_tasks (List): tasks list to be created
+ now (time): current time
+
+ """
+
+ nb_ro_tasks = 0 # for logging
+
+ for db_task in db_new_tasks:
+ target_id = db_task.pop("target_id")
+ common_id = db_task.get("common_id")
+
+ if common_id:
+ if self.db.set_one(
+ "ro_tasks",
+ q_filter={
+ "target_id": target_id,
+ "tasks.common_id": common_id,
+ },
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": db_task},
+ fail_on_empty=False,
+ ):
+ continue
+
+ if not self.db.set_one(
+ "ro_tasks",
+ q_filter={
+ "target_id": target_id,
+ "tasks.target_record": db_task["target_record"],
+ },
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": db_task},
+ fail_on_empty=False,
+ ):
+ # Create a ro_task
+ self.logger.debug("Updating database, Creating ro_tasks")
+ db_ro_task = Ns._create_ro_task(target_id, db_task)
+ nb_ro_tasks += 1
+ self.db.create("ro_tasks", db_ro_task)
+
+ self.logger.debug(
+ "Created {} ro_tasks; {} tasks - db_new_tasks={}".format(
+ nb_ro_tasks, len(db_new_tasks), db_new_tasks
+ )
+ )
+