From a1c2b40a68be5591cf36775aa48bc965116ae340 Mon Sep 17 00:00:00 2001 From: gallardo Date: Fri, 11 Feb 2022 12:41:59 +0000 Subject: [PATCH] Refactor method deploy This change removes the method `_process_items` and refactors the method `deploy` to do the following tasks: - `calculate_all_differences_to_deploy`: returns the list of items to be created and deleted - `define_all_tasks`: creates all the task structures - `upload_all_tasks`: saves all tasks in the common DB Change-Id: I5cb59dd697875634a4a92df8673c9842a4578d5f Signed-off-by: gallardo --- NG-RO/osm_ng_ro/ns.py | 676 +++++++++++++++++++++++++----------------- 1 file changed, 407 insertions(+), 269 deletions(-) diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index d187dd84..f3b073e1 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -108,6 +108,18 @@ class Ns(object): self.next_worker = 0 self.plugins = {} self.workers = [] + self.process_params_function_map = { + "net": Ns._process_net_params, + "image": Ns._process_image_params, + "flavor": Ns._process_flavor_params, + "vdu": Ns._process_vdu_params, + } + self.db_path_map = { + "net": "vld", + "image": "image", + "flavor": "flavor", + "vdu": "vdur", + } def init_db(self, target_version): pass @@ -1081,6 +1093,379 @@ class Ns(object): return extra_dict + def calculate_diff_items( + self, + indata, + db_nsr, + db_ro_nsr, + db_nsr_update, + item, + tasks_by_target_record_id, + action_id, + nsr_id, + task_index, + vnfr_id=None, + vnfr=None, + ): + """Function that returns the incremental changes (creation, deletion) + related to a specific item `item` to be done. This function should be + called for NS instantiation, NS termination, NS update to add a new VNF + or a new VLD, remove a VNF or VLD, etc. + Item can be `net, `flavor`, `image` or `vdu`. + It takes a list of target items from indata (which came from the REST API) + and compares with the existing items from db_ro_nsr, identifying the + incremental changes to be done. During the comparison, it calls the method + `process_params` (which was passed as parameter, and is particular for each + `item`) + + Args: + indata (Dict[str, Any]): deployment info + db_nsr: NSR record from DB + db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" + db_nsr_update (Dict[str, Any]): NSR info to update in DB + item (str): element to process (net, vdu...) + tasks_by_target_record_id (Dict[str, Any]): + [, ] + action_id (str): action id + nsr_id (str): NSR id + task_index (number): task index to add to task name + vnfr_id (str): VNFR id + vnfr (Dict[str, Any]): VNFR info + + Returns: + List: list with the incremental changes (deletes, creates) for each item + number: current task index + """ + + diff_items = [] + db_path = "" + db_record = "" + target_list = [] + existing_list = [] + process_params = None + vdu2cloud_init = indata.get("cloud_init_content") or {} + ro_nsr_public_key = db_ro_nsr["public_key"] + + # According to the type of item, the path, the target_list, + # the existing_list and the method to process params are set + db_path = self.db_path_map[item] + process_params = self.process_params_function_map[item] + if item in ("net", "vdu"): + if vnfr is None: + db_record = "nsrs:{}:{}".format(nsr_id, db_path) + target_list = indata.get("ns", []).get(db_path, []) + existing_list = db_nsr.get(db_path, []) + else: + db_record = "vnfrs:{}:{}".format(vnfr_id, db_path) + target_vnf = next( + (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), + None, + ) + target_list = target_vnf.get(db_path, []) if target_vnf else [] + existing_list = vnfr.get(db_path, []) + elif item in ("image", "flavor"): + db_record = "nsrs:{}:{}".format(nsr_id, db_path) + target_list = indata.get(item, []) + existing_list = db_nsr.get(item, []) + else: + raise NsException("Item not supported: {}", item) + + # ensure all the target_list elements has an "id". If not assign the index as id + if target_list is None: + target_list = [] + for target_index, tl in enumerate(target_list): + if tl and not tl.get("id"): + tl["id"] = str(target_index) + + # step 1 items (networks,vdus,...) to be deleted/updated + for item_index, existing_item in enumerate(existing_list): + target_item = next( + (t for t in target_list if t["id"] == existing_item["id"]), + None, + ) + + for target_vim, existing_viminfo in existing_item.get( + "vim_info", {} + ).items(): + if existing_viminfo is None: + continue + + if target_item: + target_viminfo = target_item.get("vim_info", {}).get(target_vim) + else: + target_viminfo = None + + if target_viminfo is None: + # must be deleted + self._assign_vim(target_vim) + target_record_id = "{}.{}".format(db_record, existing_item["id"]) + item_ = item + + if target_vim.startswith("sdn"): + # item must be sdn-net instead of net if target_vim is a sdn + item_ = "sdn_net" + target_record_id += ".sdn" + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + diff_items.append( + { + "deployment_info": deployment_info, + "target_id": target_vim, + "item": item_, + "action": "DELETE", + "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", + "target_record_id": target_record_id, + } + ) + task_index += 1 + + # step 2 items (networks,vdus,...) to be created + for target_item in target_list: + item_index = -1 + + for item_index, existing_item in enumerate(existing_list): + if existing_item["id"] == target_item["id"]: + break + else: + item_index += 1 + db_nsr_update[db_path + ".{}".format(item_index)] = target_item + existing_list.append(target_item) + existing_item = None + + for target_vim, target_viminfo in target_item.get("vim_info", {}).items(): + existing_viminfo = None + + if existing_item: + existing_viminfo = existing_item.get("vim_info", {}).get(target_vim) + + if existing_viminfo is not None: + continue + + target_record_id = "{}.{}".format(db_record, target_item["id"]) + item_ = item + + if target_vim.startswith("sdn"): + # item must be sdn-net instead of net if target_vim is a sdn + item_ = "sdn_net" + target_record_id += ".sdn" + + kwargs = {} + if process_params == Ns._process_vdu_params: + kwargs.update( + { + "vnfr_id": vnfr_id, + "nsr_id": nsr_id, + "vnfr": vnfr, + "vdu2cloud_init": vdu2cloud_init, + "tasks_by_target_record_id": tasks_by_target_record_id, + "logger": self.logger, + "db": self.db, + "fs": self.fs, + "ro_nsr_public_key": ro_nsr_public_key, + } + ) + + extra_dict = process_params( + target_item, + indata, + target_viminfo, + target_record_id, + **kwargs, + ) + self._assign_vim(target_vim) + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + diff_items.append( + { + "deployment_info": deployment_info, + "target_id": target_vim, + "item": item_, + "action": "CREATE", + "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", + "target_record_id": target_record_id, + "extra_dict": extra_dict, + "common_id": target_item.get("common_id", None), + } + ) + task_index += 1 + + db_nsr_update[db_path + ".{}".format(item_index)] = target_item + + return diff_items, task_index + + def calculate_all_differences_to_deploy( + self, + indata, + nsr_id, + db_nsr, + db_vnfrs, + db_ro_nsr, + db_nsr_update, + db_vnfrs_update, + action_id, + tasks_by_target_record_id, + ): + """This method calculates the ordered list of items (`changes_list`) + to be created and deleted. + + Args: + indata (Dict[str, Any]): deployment info + nsr_id (str): NSR id + db_nsr: NSR record from DB + db_vnfrs: VNFRS record from DB + db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" + db_nsr_update (Dict[str, Any]): NSR info to update in DB + db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB + action_id (str): action id + tasks_by_target_record_id (Dict[str, Any]): + [, ] + + Returns: + List: ordered list of items to be created and deleted. + """ + + task_index = 0 + # set list with diffs: + changes_list = [] + + # NS vld, image and flavor + for item in ["net", "image", "flavor"]: + self.logger.debug("process NS={} {}".format(nsr_id, item)) + diff_items, task_index = self.calculate_diff_items( + indata=indata, + db_nsr=db_nsr, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_nsr_update, + item=item, + tasks_by_target_record_id=tasks_by_target_record_id, + action_id=action_id, + nsr_id=nsr_id, + task_index=task_index, + vnfr_id=None, + ) + changes_list += diff_items + + # VNF vlds and vdus + for vnfr_id, vnfr in db_vnfrs.items(): + # vnfr_id need to be set as global variable for among others nested method _process_vdu_params + for item in ["net", "vdu"]: + self.logger.debug("process VNF={} {}".format(vnfr_id, item)) + diff_items, task_index = self.calculate_diff_items( + indata=indata, + db_nsr=db_nsr, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_vnfrs_update[vnfr["_id"]], + item=item, + tasks_by_target_record_id=tasks_by_target_record_id, + action_id=action_id, + nsr_id=nsr_id, + task_index=task_index, + vnfr_id=vnfr_id, + vnfr=vnfr, + ) + changes_list += diff_items + + return changes_list + + def define_all_tasks( + self, + changes_list, + db_new_tasks, + tasks_by_target_record_id, + ): + """Function to create all the task structures obtanied from + the method calculate_all_differences_to_deploy + + Args: + changes_list (List): ordered list of items to be created or deleted + db_new_tasks (List): tasks list to be created + action_id (str): action id + tasks_by_target_record_id (Dict[str, Any]): + [, ] + + """ + + for change in changes_list: + task = Ns._create_task( + deployment_info=change["deployment_info"], + target_id=change["target_id"], + item=change["item"], + action=change["action"], + target_record=change["target_record"], + target_record_id=change["target_record_id"], + extra_dict=change.get("extra_dict", None), + ) + + tasks_by_target_record_id[change["target_record_id"]] = task + db_new_tasks.append(task) + + if change.get("common_id"): + task["common_id"] = change["common_id"] + + def upload_all_tasks( + self, + db_new_tasks, + now, + ): + """Function to save all tasks in the common DB + + Args: + db_new_tasks (List): tasks list to be created + now (time): current time + + """ + + nb_ro_tasks = 0 # for logging + + for db_task in db_new_tasks: + target_id = db_task.pop("target_id") + common_id = db_task.get("common_id") + + if common_id: + if self.db.set_one( + "ro_tasks", + q_filter={ + "target_id": target_id, + "tasks.common_id": common_id, + }, + update_dict={"to_check_at": now, "modified_at": now}, + push={"tasks": db_task}, + fail_on_empty=False, + ): + continue + + if not self.db.set_one( + "ro_tasks", + q_filter={ + "target_id": target_id, + "tasks.target_record": db_task["target_record"], + }, + update_dict={"to_check_at": now, "modified_at": now}, + push={"tasks": db_task}, + fail_on_empty=False, + ): + # Create a ro_task + self.logger.debug("Updating database, Creating ro_tasks") + db_ro_task = Ns._create_ro_task(target_id, db_task) + nb_ro_tasks += 1 + self.db.create("ro_tasks", db_ro_task) + + self.logger.debug( + "Created {} ro_tasks; {} tasks - db_new_tasks={}".format( + nb_ro_tasks, len(db_new_tasks), db_new_tasks + ) + ) + def deploy(self, session, indata, version, nsr_id, *args, **kwargs): self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata)) validate_input(indata, deploy_schema) @@ -1090,8 +1475,6 @@ class Ns(object): db_nsr_update = {} # update operation on nsrs db_vnfrs_update = {} db_vnfrs = {} # vnf's info indexed by _id - nb_ro_tasks = 0 # for logging - vdu2cloud_init = indata.get("cloud_init_content") or {} step = "" logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) self.logger.debug(logging_text + "Enter") @@ -1118,8 +1501,6 @@ class Ns(object): if not db_ro_nsr: db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now) - ro_nsr_public_key = db_ro_nsr["public_key"] - # check that action_id is not in the list of actions. Suffixed with :index if action_id in db_ro_nsr["actions"]: index = 1 @@ -1137,168 +1518,6 @@ class Ns(object): index += 1 - def _process_items( - target_list, - existing_list, - db_record, - db_update, - db_path, - item, - process_params, - ): - nonlocal db_new_tasks - nonlocal tasks_by_target_record_id - nonlocal action_id - nonlocal nsr_id - nonlocal task_index - nonlocal indata - - # ensure all the target_list elements has an "id". If not assign the index as id - for target_index, tl in enumerate(target_list): - if tl and not tl.get("id"): - tl["id"] = str(target_index) - - # step 1 items (networks,vdus,...) to be deleted/updated - for item_index, existing_item in enumerate(existing_list): - target_item = next( - (t for t in target_list if t["id"] == existing_item["id"]), None - ) - - for target_vim, existing_viminfo in existing_item.get( - "vim_info", {} - ).items(): - if existing_viminfo is None: - continue - - if target_item: - target_viminfo = target_item.get("vim_info", {}).get( - target_vim - ) - else: - target_viminfo = None - - if target_viminfo is None: - # must be deleted - self._assign_vim(target_vim) - target_record_id = "{}.{}".format( - db_record, existing_item["id"] - ) - item_ = item - - if target_vim.startswith("sdn"): - # item must be sdn-net instead of net if target_vim is a sdn - item_ = "sdn_net" - target_record_id += ".sdn" - - deployment_info = { - "action_id": action_id, - "nsr_id": nsr_id, - "task_index": task_index, - } - - task = Ns._create_task( - deployment_info=deployment_info, - target_id=target_vim, - item=item_, - action="DELETE", - target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", - target_record_id=target_record_id, - ) - - task_index = deployment_info.get("task_index") - - tasks_by_target_record_id[target_record_id] = task - db_new_tasks.append(task) - # TODO delete - # TODO check one by one the vims to be created/deleted - - # step 2 items (networks,vdus,...) to be created - for target_item in target_list: - item_index = -1 - - for item_index, existing_item in enumerate(existing_list): - if existing_item["id"] == target_item["id"]: - break - else: - item_index += 1 - db_update[db_path + ".{}".format(item_index)] = target_item - existing_list.append(target_item) - existing_item = None - - for target_vim, target_viminfo in target_item.get( - "vim_info", {} - ).items(): - existing_viminfo = None - - if existing_item: - existing_viminfo = existing_item.get("vim_info", {}).get( - target_vim - ) - - # TODO check if different. Delete and create??? - # TODO delete if not exist - if existing_viminfo is not None: - continue - - target_record_id = "{}.{}".format(db_record, target_item["id"]) - item_ = item - - if target_vim.startswith("sdn"): - # item must be sdn-net instead of net if target_vim is a sdn - item_ = "sdn_net" - target_record_id += ".sdn" - - kwargs = {} - if process_params == Ns._process_vdu_params: - kwargs.update( - { - "vnfr_id": vnfr_id, - "nsr_id": nsr_id, - "vnfr": vnfr, - "vdu2cloud_init": vdu2cloud_init, - "tasks_by_target_record_id": tasks_by_target_record_id, - "logger": self.logger, - "db": self.db, - "fs": self.fs, - "ro_nsr_public_key": ro_nsr_public_key, - } - ) - - extra_dict = process_params( - target_item, - indata, - target_viminfo, - target_record_id, - **kwargs, - ) - self._assign_vim(target_vim) - - deployment_info = { - "action_id": action_id, - "nsr_id": nsr_id, - "task_index": task_index, - } - - task = Ns._create_task( - deployment_info=deployment_info, - target_id=target_vim, - item=item_, - action="CREATE", - target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", - target_record_id=target_record_id, - extra_dict=extra_dict, - ) - - task_index = deployment_info.get("task_index") - - tasks_by_target_record_id[target_record_id] = task - db_new_tasks.append(task) - - if target_item.get("common_id"): - task["common_id"] = target_item["common_id"] - - db_update[db_path + ".{}".format(item_index)] = target_item - def _process_action(indata): nonlocal db_new_tasks nonlocal action_id @@ -1384,108 +1603,30 @@ class Ns(object): _process_action(indata) else: # compute network differences - # NS.vld - step = "process NS VLDs" - _process_items( - target_list=indata["ns"]["vld"] or [], - existing_list=db_nsr.get("vld") or [], - db_record="nsrs:{}:vld".format(nsr_id), - db_update=db_nsr_update, - db_path="vld", - item="net", - process_params=Ns._process_net_params, - ) - - step = "process NS images" - _process_items( - target_list=indata.get("image") or [], - existing_list=db_nsr.get("image") or [], - db_record="nsrs:{}:image".format(nsr_id), - db_update=db_nsr_update, - db_path="image", - item="image", - process_params=Ns._process_image_params, + # NS + step = "process NS elements" + changes_list = self.calculate_all_differences_to_deploy( + indata=indata, + nsr_id=nsr_id, + db_nsr=db_nsr, + db_vnfrs=db_vnfrs, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_nsr_update, + db_vnfrs_update=db_vnfrs_update, + action_id=action_id, + tasks_by_target_record_id=tasks_by_target_record_id, ) - - step = "process NS flavors" - _process_items( - target_list=indata.get("flavor") or [], - existing_list=db_nsr.get("flavor") or [], - db_record="nsrs:{}:flavor".format(nsr_id), - db_update=db_nsr_update, - db_path="flavor", - item="flavor", - process_params=Ns._process_flavor_params, + self.define_all_tasks( + changes_list=changes_list, + db_new_tasks=db_new_tasks, + tasks_by_target_record_id=tasks_by_target_record_id, ) - # VNF.vld - for vnfr_id, vnfr in db_vnfrs.items(): - # vnfr_id need to be set as global variable for among others nested method _process_vdu_params - step = "process VNF={} VLDs".format(vnfr_id) - target_vnf = next( - ( - vnf - for vnf in indata.get("vnf", ()) - if vnf["_id"] == vnfr_id - ), - None, - ) - target_list = target_vnf.get("vld") if target_vnf else None - _process_items( - target_list=target_list or [], - existing_list=vnfr.get("vld") or [], - db_record="vnfrs:{}:vld".format(vnfr_id), - db_update=db_vnfrs_update[vnfr["_id"]], - db_path="vld", - item="net", - process_params=Ns._process_net_params, - ) - - target_list = target_vnf.get("vdur") if target_vnf else None - step = "process VNF={} VDUs".format(vnfr_id) - _process_items( - target_list=target_list or [], - existing_list=vnfr.get("vdur") or [], - db_record="vnfrs:{}:vdur".format(vnfr_id), - db_update=db_vnfrs_update[vnfr["_id"]], - db_path="vdur", - item="vdu", - process_params=Ns._process_vdu_params, - ) - - for db_task in db_new_tasks: - step = "Updating database, Appending tasks to ro_tasks" - target_id = db_task.pop("target_id") - common_id = db_task.get("common_id") - - if common_id: - if self.db.set_one( - "ro_tasks", - q_filter={ - "target_id": target_id, - "tasks.common_id": common_id, - }, - update_dict={"to_check_at": now, "modified_at": now}, - push={"tasks": db_task}, - fail_on_empty=False, - ): - continue - - if not self.db.set_one( - "ro_tasks", - q_filter={ - "target_id": target_id, - "tasks.target_record": db_task["target_record"], - }, - update_dict={"to_check_at": now, "modified_at": now}, - push={"tasks": db_task}, - fail_on_empty=False, - ): - # Create a ro_task - step = "Updating database, Creating ro_tasks" - db_ro_task = Ns._create_ro_task(target_id, db_task) - nb_ro_tasks += 1 - self.db.create("ro_tasks", db_ro_task) + step = "Updating database, Appending tasks to ro_tasks" + self.upload_all_tasks( + db_new_tasks=db_new_tasks, + now=now, + ) step = "Updating database, nsrs" if db_nsr_update: @@ -1497,10 +1638,7 @@ class Ns(object): self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update) self.logger.debug( - logging_text - + "Exit. Created {} ro_tasks; {} tasks".format( - nb_ro_tasks, len(db_new_tasks) - ) + logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) ) return ( -- 2.17.1