self.next_worker = 0
self.plugins = {}
self.workers = []
+ self.process_params_function_map = {
+ "net": Ns._process_net_params,
+ "image": Ns._process_image_params,
+ "flavor": Ns._process_flavor_params,
+ "vdu": Ns._process_vdu_params,
+ }
+ self.db_path_map = {
+ "net": "vld",
+ "image": "image",
+ "flavor": "flavor",
+ "vdu": "vdur",
+ }
def init_db(self, target_version):
pass
return extra_dict
+ def calculate_diff_items(
+ self,
+ indata,
+ db_nsr,
+ db_ro_nsr,
+ db_nsr_update,
+ item,
+ tasks_by_target_record_id,
+ action_id,
+ nsr_id,
+ task_index,
+ vnfr_id=None,
+ vnfr=None,
+ ):
+ """Function that returns the incremental changes (creation, deletion)
+ related to a specific item `item` to be done. This function should be
+ called for NS instantiation, NS termination, NS update to add a new VNF
+ or a new VLD, remove a VNF or VLD, etc.
+ Item can be `net, `flavor`, `image` or `vdu`.
+ It takes a list of target items from indata (which came from the REST API)
+ and compares with the existing items from db_ro_nsr, identifying the
+ incremental changes to be done. During the comparison, it calls the method
+ `process_params` (which was passed as parameter, and is particular for each
+ `item`)
+
+ Args:
+ indata (Dict[str, Any]): deployment info
+ db_nsr: NSR record from DB
+ db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+ db_nsr_update (Dict[str, Any]): NSR info to update in DB
+ item (str): element to process (net, vdu...)
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+ action_id (str): action id
+ nsr_id (str): NSR id
+ task_index (number): task index to add to task name
+ vnfr_id (str): VNFR id
+ vnfr (Dict[str, Any]): VNFR info
+
+ Returns:
+ List: list with the incremental changes (deletes, creates) for each item
+ number: current task index
+ """
+
+ diff_items = []
+ db_path = ""
+ db_record = ""
+ target_list = []
+ existing_list = []
+ process_params = None
+ vdu2cloud_init = indata.get("cloud_init_content") or {}
+ ro_nsr_public_key = db_ro_nsr["public_key"]
+
+ # According to the type of item, the path, the target_list,
+ # the existing_list and the method to process params are set
+ db_path = self.db_path_map[item]
+ process_params = self.process_params_function_map[item]
+ if item in ("net", "vdu"):
+ if vnfr is None:
+ db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+ target_list = indata.get("ns", []).get(db_path, [])
+ existing_list = db_nsr.get(db_path, [])
+ else:
+ db_record = "vnfrs:{}:{}".format(vnfr_id, db_path)
+ target_vnf = next(
+ (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id),
+ None,
+ )
+ target_list = target_vnf.get(db_path, []) if target_vnf else []
+ existing_list = vnfr.get(db_path, [])
+ elif item in ("image", "flavor"):
+ db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+ target_list = indata.get(item, [])
+ existing_list = db_nsr.get(item, [])
+ else:
+ raise NsException("Item not supported: {}", item)
+
+ # ensure all the target_list elements has an "id". If not assign the index as id
+ if target_list is None:
+ target_list = []
+ for target_index, tl in enumerate(target_list):
+ if tl and not tl.get("id"):
+ tl["id"] = str(target_index)
+
+ # step 1 items (networks,vdus,...) to be deleted/updated
+ for item_index, existing_item in enumerate(existing_list):
+ target_item = next(
+ (t for t in target_list if t["id"] == existing_item["id"]),
+ None,
+ )
+
+ for target_vim, existing_viminfo in existing_item.get(
+ "vim_info", {}
+ ).items():
+ if existing_viminfo is None:
+ continue
+
+ if target_item:
+ target_viminfo = target_item.get("vim_info", {}).get(target_vim)
+ else:
+ target_viminfo = None
+
+ if target_viminfo is None:
+ # must be deleted
+ self._assign_vim(target_vim)
+ target_record_id = "{}.{}".format(db_record, existing_item["id"])
+ item_ = item
+
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ diff_items.append(
+ {
+ "deployment_info": deployment_info,
+ "target_id": target_vim,
+ "item": item_,
+ "action": "DELETE",
+ "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+ "target_record_id": target_record_id,
+ }
+ )
+ task_index += 1
+
+ # step 2 items (networks,vdus,...) to be created
+ for target_item in target_list:
+ item_index = -1
+
+ for item_index, existing_item in enumerate(existing_list):
+ if existing_item["id"] == target_item["id"]:
+ break
+ else:
+ item_index += 1
+ db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+ existing_list.append(target_item)
+ existing_item = None
+
+ for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
+ existing_viminfo = None
+
+ if existing_item:
+ existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
+
+ if existing_viminfo is not None:
+ continue
+
+ target_record_id = "{}.{}".format(db_record, target_item["id"])
+ item_ = item
+
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+
+ kwargs = {}
+ if process_params == Ns._process_vdu_params:
+ kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "vnfr": vnfr,
+ "vdu2cloud_init": vdu2cloud_init,
+ "tasks_by_target_record_id": tasks_by_target_record_id,
+ "logger": self.logger,
+ "db": self.db,
+ "fs": self.fs,
+ "ro_nsr_public_key": ro_nsr_public_key,
+ }
+ )
+
+ extra_dict = process_params(
+ target_item,
+ indata,
+ target_viminfo,
+ target_record_id,
+ **kwargs,
+ )
+ self._assign_vim(target_vim)
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ diff_items.append(
+ {
+ "deployment_info": deployment_info,
+ "target_id": target_vim,
+ "item": item_,
+ "action": "CREATE",
+ "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
+ "target_record_id": target_record_id,
+ "extra_dict": extra_dict,
+ "common_id": target_item.get("common_id", None),
+ }
+ )
+ task_index += 1
+
+ db_nsr_update[db_path + ".{}".format(item_index)] = target_item
+
+ return diff_items, task_index
+
+ def calculate_all_differences_to_deploy(
+ self,
+ indata,
+ nsr_id,
+ db_nsr,
+ db_vnfrs,
+ db_ro_nsr,
+ db_nsr_update,
+ db_vnfrs_update,
+ action_id,
+ tasks_by_target_record_id,
+ ):
+ """This method calculates the ordered list of items (`changes_list`)
+ to be created and deleted.
+
+ Args:
+ indata (Dict[str, Any]): deployment info
+ nsr_id (str): NSR id
+ db_nsr: NSR record from DB
+ db_vnfrs: VNFRS record from DB
+ db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
+ db_nsr_update (Dict[str, Any]): NSR info to update in DB
+ db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB
+ action_id (str): action id
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+
+ Returns:
+ List: ordered list of items to be created and deleted.
+ """
+
+ task_index = 0
+ # set list with diffs:
+ changes_list = []
+
+ # NS vld, image and flavor
+ for item in ["net", "image", "flavor"]:
+ self.logger.debug("process NS={} {}".format(nsr_id, item))
+ diff_items, task_index = self.calculate_diff_items(
+ indata=indata,
+ db_nsr=db_nsr,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_nsr_update,
+ item=item,
+ tasks_by_target_record_id=tasks_by_target_record_id,
+ action_id=action_id,
+ nsr_id=nsr_id,
+ task_index=task_index,
+ vnfr_id=None,
+ )
+ changes_list += diff_items
+
+ # VNF vlds and vdus
+ for vnfr_id, vnfr in db_vnfrs.items():
+ # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
+ for item in ["net", "vdu"]:
+ self.logger.debug("process VNF={} {}".format(vnfr_id, item))
+ diff_items, task_index = self.calculate_diff_items(
+ indata=indata,
+ db_nsr=db_nsr,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_vnfrs_update[vnfr["_id"]],
+ item=item,
+ tasks_by_target_record_id=tasks_by_target_record_id,
+ action_id=action_id,
+ nsr_id=nsr_id,
+ task_index=task_index,
+ vnfr_id=vnfr_id,
+ vnfr=vnfr,
+ )
+ changes_list += diff_items
+
+ return changes_list
+
+ def define_all_tasks(
+ self,
+ changes_list,
+ db_new_tasks,
+ tasks_by_target_record_id,
+ ):
+ """Function to create all the task structures obtanied from
+ the method calculate_all_differences_to_deploy
+
+ Args:
+ changes_list (List): ordered list of items to be created or deleted
+ db_new_tasks (List): tasks list to be created
+ action_id (str): action id
+ tasks_by_target_record_id (Dict[str, Any]):
+ [<target_record_id>, <task>]
+
+ """
+
+ for change in changes_list:
+ task = Ns._create_task(
+ deployment_info=change["deployment_info"],
+ target_id=change["target_id"],
+ item=change["item"],
+ action=change["action"],
+ target_record=change["target_record"],
+ target_record_id=change["target_record_id"],
+ extra_dict=change.get("extra_dict", None),
+ )
+
+ tasks_by_target_record_id[change["target_record_id"]] = task
+ db_new_tasks.append(task)
+
+ if change.get("common_id"):
+ task["common_id"] = change["common_id"]
+
+ def upload_all_tasks(
+ self,
+ db_new_tasks,
+ now,
+ ):
+ """Function to save all tasks in the common DB
+
+ Args:
+ db_new_tasks (List): tasks list to be created
+ now (time): current time
+
+ """
+
+ nb_ro_tasks = 0 # for logging
+
+ for db_task in db_new_tasks:
+ target_id = db_task.pop("target_id")
+ common_id = db_task.get("common_id")
+
+ if common_id:
+ if self.db.set_one(
+ "ro_tasks",
+ q_filter={
+ "target_id": target_id,
+ "tasks.common_id": common_id,
+ },
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": db_task},
+ fail_on_empty=False,
+ ):
+ continue
+
+ if not self.db.set_one(
+ "ro_tasks",
+ q_filter={
+ "target_id": target_id,
+ "tasks.target_record": db_task["target_record"],
+ },
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": db_task},
+ fail_on_empty=False,
+ ):
+ # Create a ro_task
+ self.logger.debug("Updating database, Creating ro_tasks")
+ db_ro_task = Ns._create_ro_task(target_id, db_task)
+ nb_ro_tasks += 1
+ self.db.create("ro_tasks", db_ro_task)
+
+ self.logger.debug(
+ "Created {} ro_tasks; {} tasks - db_new_tasks={}".format(
+ nb_ro_tasks, len(db_new_tasks), db_new_tasks
+ )
+ )
+
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
db_nsr_update = {} # update operation on nsrs
db_vnfrs_update = {}
db_vnfrs = {} # vnf's info indexed by _id
- nb_ro_tasks = 0 # for logging
- vdu2cloud_init = indata.get("cloud_init_content") or {}
step = ""
logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
self.logger.debug(logging_text + "Enter")
if not db_ro_nsr:
db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
- ro_nsr_public_key = db_ro_nsr["public_key"]
-
# check that action_id is not in the list of actions. Suffixed with :index
if action_id in db_ro_nsr["actions"]:
index = 1
index += 1
- def _process_items(
- target_list,
- existing_list,
- db_record,
- db_update,
- db_path,
- item,
- process_params,
- ):
- nonlocal db_new_tasks
- nonlocal tasks_by_target_record_id
- nonlocal action_id
- nonlocal nsr_id
- nonlocal task_index
- nonlocal indata
-
- # ensure all the target_list elements has an "id". If not assign the index as id
- for target_index, tl in enumerate(target_list):
- if tl and not tl.get("id"):
- tl["id"] = str(target_index)
-
- # step 1 items (networks,vdus,...) to be deleted/updated
- for item_index, existing_item in enumerate(existing_list):
- target_item = next(
- (t for t in target_list if t["id"] == existing_item["id"]), None
- )
-
- for target_vim, existing_viminfo in existing_item.get(
- "vim_info", {}
- ).items():
- if existing_viminfo is None:
- continue
-
- if target_item:
- target_viminfo = target_item.get("vim_info", {}).get(
- target_vim
- )
- else:
- target_viminfo = None
-
- if target_viminfo is None:
- # must be deleted
- self._assign_vim(target_vim)
- target_record_id = "{}.{}".format(
- db_record, existing_item["id"]
- )
- item_ = item
-
- if target_vim.startswith("sdn"):
- # item must be sdn-net instead of net if target_vim is a sdn
- item_ = "sdn_net"
- target_record_id += ".sdn"
-
- deployment_info = {
- "action_id": action_id,
- "nsr_id": nsr_id,
- "task_index": task_index,
- }
-
- task = Ns._create_task(
- deployment_info=deployment_info,
- target_id=target_vim,
- item=item_,
- action="DELETE",
- target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
- target_record_id=target_record_id,
- )
-
- task_index = deployment_info.get("task_index")
-
- tasks_by_target_record_id[target_record_id] = task
- db_new_tasks.append(task)
- # TODO delete
- # TODO check one by one the vims to be created/deleted
-
- # step 2 items (networks,vdus,...) to be created
- for target_item in target_list:
- item_index = -1
-
- for item_index, existing_item in enumerate(existing_list):
- if existing_item["id"] == target_item["id"]:
- break
- else:
- item_index += 1
- db_update[db_path + ".{}".format(item_index)] = target_item
- existing_list.append(target_item)
- existing_item = None
-
- for target_vim, target_viminfo in target_item.get(
- "vim_info", {}
- ).items():
- existing_viminfo = None
-
- if existing_item:
- existing_viminfo = existing_item.get("vim_info", {}).get(
- target_vim
- )
-
- # TODO check if different. Delete and create???
- # TODO delete if not exist
- if existing_viminfo is not None:
- continue
-
- target_record_id = "{}.{}".format(db_record, target_item["id"])
- item_ = item
-
- if target_vim.startswith("sdn"):
- # item must be sdn-net instead of net if target_vim is a sdn
- item_ = "sdn_net"
- target_record_id += ".sdn"
-
- kwargs = {}
- if process_params == Ns._process_vdu_params:
- kwargs.update(
- {
- "vnfr_id": vnfr_id,
- "nsr_id": nsr_id,
- "vnfr": vnfr,
- "vdu2cloud_init": vdu2cloud_init,
- "tasks_by_target_record_id": tasks_by_target_record_id,
- "logger": self.logger,
- "db": self.db,
- "fs": self.fs,
- "ro_nsr_public_key": ro_nsr_public_key,
- }
- )
-
- extra_dict = process_params(
- target_item,
- indata,
- target_viminfo,
- target_record_id,
- **kwargs,
- )
- self._assign_vim(target_vim)
-
- deployment_info = {
- "action_id": action_id,
- "nsr_id": nsr_id,
- "task_index": task_index,
- }
-
- task = Ns._create_task(
- deployment_info=deployment_info,
- target_id=target_vim,
- item=item_,
- action="CREATE",
- target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
- target_record_id=target_record_id,
- extra_dict=extra_dict,
- )
-
- task_index = deployment_info.get("task_index")
-
- tasks_by_target_record_id[target_record_id] = task
- db_new_tasks.append(task)
-
- if target_item.get("common_id"):
- task["common_id"] = target_item["common_id"]
-
- db_update[db_path + ".{}".format(item_index)] = target_item
-
def _process_action(indata):
nonlocal db_new_tasks
nonlocal action_id
_process_action(indata)
else:
# compute network differences
- # NS.vld
- step = "process NS VLDs"
- _process_items(
- target_list=indata["ns"]["vld"] or [],
- existing_list=db_nsr.get("vld") or [],
- db_record="nsrs:{}:vld".format(nsr_id),
- db_update=db_nsr_update,
- db_path="vld",
- item="net",
- process_params=Ns._process_net_params,
- )
-
- step = "process NS images"
- _process_items(
- target_list=indata.get("image") or [],
- existing_list=db_nsr.get("image") or [],
- db_record="nsrs:{}:image".format(nsr_id),
- db_update=db_nsr_update,
- db_path="image",
- item="image",
- process_params=Ns._process_image_params,
+ # NS
+ step = "process NS elements"
+ changes_list = self.calculate_all_differences_to_deploy(
+ indata=indata,
+ nsr_id=nsr_id,
+ db_nsr=db_nsr,
+ db_vnfrs=db_vnfrs,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_nsr_update,
+ db_vnfrs_update=db_vnfrs_update,
+ action_id=action_id,
+ tasks_by_target_record_id=tasks_by_target_record_id,
)
-
- step = "process NS flavors"
- _process_items(
- target_list=indata.get("flavor") or [],
- existing_list=db_nsr.get("flavor") or [],
- db_record="nsrs:{}:flavor".format(nsr_id),
- db_update=db_nsr_update,
- db_path="flavor",
- item="flavor",
- process_params=Ns._process_flavor_params,
+ self.define_all_tasks(
+ changes_list=changes_list,
+ db_new_tasks=db_new_tasks,
+ tasks_by_target_record_id=tasks_by_target_record_id,
)
- # VNF.vld
- for vnfr_id, vnfr in db_vnfrs.items():
- # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
- step = "process VNF={} VLDs".format(vnfr_id)
- target_vnf = next(
- (
- vnf
- for vnf in indata.get("vnf", ())
- if vnf["_id"] == vnfr_id
- ),
- None,
- )
- target_list = target_vnf.get("vld") if target_vnf else None
- _process_items(
- target_list=target_list or [],
- existing_list=vnfr.get("vld") or [],
- db_record="vnfrs:{}:vld".format(vnfr_id),
- db_update=db_vnfrs_update[vnfr["_id"]],
- db_path="vld",
- item="net",
- process_params=Ns._process_net_params,
- )
-
- target_list = target_vnf.get("vdur") if target_vnf else None
- step = "process VNF={} VDUs".format(vnfr_id)
- _process_items(
- target_list=target_list or [],
- existing_list=vnfr.get("vdur") or [],
- db_record="vnfrs:{}:vdur".format(vnfr_id),
- db_update=db_vnfrs_update[vnfr["_id"]],
- db_path="vdur",
- item="vdu",
- process_params=Ns._process_vdu_params,
- )
-
- for db_task in db_new_tasks:
- step = "Updating database, Appending tasks to ro_tasks"
- target_id = db_task.pop("target_id")
- common_id = db_task.get("common_id")
-
- if common_id:
- if self.db.set_one(
- "ro_tasks",
- q_filter={
- "target_id": target_id,
- "tasks.common_id": common_id,
- },
- update_dict={"to_check_at": now, "modified_at": now},
- push={"tasks": db_task},
- fail_on_empty=False,
- ):
- continue
-
- if not self.db.set_one(
- "ro_tasks",
- q_filter={
- "target_id": target_id,
- "tasks.target_record": db_task["target_record"],
- },
- update_dict={"to_check_at": now, "modified_at": now},
- push={"tasks": db_task},
- fail_on_empty=False,
- ):
- # Create a ro_task
- step = "Updating database, Creating ro_tasks"
- db_ro_task = Ns._create_ro_task(target_id, db_task)
- nb_ro_tasks += 1
- self.db.create("ro_tasks", db_ro_task)
+ step = "Updating database, Appending tasks to ro_tasks"
+ self.upload_all_tasks(
+ db_new_tasks=db_new_tasks,
+ now=now,
+ )
step = "Updating database, nsrs"
if db_nsr_update:
self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
self.logger.debug(
- logging_text
- + "Exit. Created {} ro_tasks; {} tasks".format(
- nb_ro_tasks, len(db_new_tasks)
- )
+ logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
)
return (