X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=e7291ea782c8394c4c3e7d638b955ea4f3d46210;hb=179e00207688285e282103994967b22f3b5852ca;hp=d187dd8469f2c22940df6d660204ab45aa03a8d1;hpb=0b1e734d5b51b23442aa903b4096bcf7bc48ec8d;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index d187dd84..e7291ea7 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -108,6 +108,20 @@ class Ns(object): self.next_worker = 0 self.plugins = {} self.workers = [] + self.process_params_function_map = { + "net": Ns._process_net_params, + "image": Ns._process_image_params, + "flavor": Ns._process_flavor_params, + "vdu": Ns._process_vdu_params, + "affinity-or-anti-affinity-group": Ns._process_affinity_group_params, + } + self.db_path_map = { + "net": "vld", + "image": "image", + "flavor": "flavor", + "vdu": "vdur", + "affinity-or-anti-affinity-group": "affinity-or-anti-affinity-group", + } def init_db(self, target_version): pass @@ -988,10 +1002,13 @@ class Ns(object): if deep_get( tasks_by_target_record_id, net_text, + "extra_dict", "params", "net_type", ): - tasks_by_target_record_id[net_text]["params"]["net_type"] = "data" + tasks_by_target_record_id[net_text]["extra_dict"]["params"][ + "net_type" + ] = "data" net_item["use"] = "data" net_item["model"] = interface["type"] @@ -1052,14 +1069,47 @@ class Ns(object): if ssh_keys: cloud_config["key-pairs"] = ssh_keys - disk_list = None + persistent_root_disk = {} + disk_list = [] + vnfd_id = vnfr["vnfd-id"] + vnfd = db.get_one("vnfds", {"_id": vnfd_id}) + for vdu in vnfd.get("vdu", ()): + if vdu["name"] == target_vdu["vdu-name"]: + for vsd in vnfd.get("virtual-storage-desc", ()): + if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]: + root_disk = vsd + if root_disk.get( + "type-of-storage" + ) == "persistent-storage:persistent-storage" and root_disk.get( + "size-of-storage" + ): + persistent_root_disk[vsd["id"]] = { + "image_id": vdu.get("sw-image-desc"), + "size": root_disk["size-of-storage"], + } + disk_list.append(persistent_root_disk[vsd["id"]]) + if target_vdu.get("virtual-storages"): - disk_list = [ - {"size": disk["size-of-storage"]} - for disk in target_vdu["virtual-storages"] - if disk.get("type-of-storage") - == "persistent-storage:persistent-storage" - ] + for disk in target_vdu["virtual-storages"]: + if ( + disk.get("type-of-storage") + == "persistent-storage:persistent-storage" + and disk["id"] not in persistent_root_disk.keys() + ): + disk_list.append({"size": disk["size-of-storage"]}) + + affinity_group_list = [] + + if target_vdu.get("affinity-or-anti-affinity-group-id"): + affinity_group = {} + for affinity_group_id in target_vdu["affinity-or-anti-affinity-group-id"]: + affinity_group_text = ( + ns_preffix + ":affinity-or-anti-affinity-group." + affinity_group_id + ) + + extra_dict["depends_on"].append(affinity_group_text) + affinity_group["affinity_group_id"] = "TASK-" + affinity_group_text + affinity_group_list.append(affinity_group) extra_dict["params"] = { "name": "{}-{}-{}-{}".format( @@ -1072,6 +1122,7 @@ class Ns(object): "start": True, "image_id": "TASK-" + image_text, "flavor_id": "TASK-" + flavor_text, + "affinity_group_list": affinity_group_list, "net_list": net_list, "cloud_config": cloud_config or None, "disk_list": disk_list, @@ -1081,6 +1132,412 @@ class Ns(object): return extra_dict + @staticmethod + def _process_affinity_group_params( + target_affinity_group: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """Get affinity or anti-affinity group parameters. + + Args: + target_affinity_group (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + extra_dict = {} + + affinity_group_data = { + "name": target_affinity_group["name"], + "type": target_affinity_group["type"], + "scope": target_affinity_group["scope"], + } + + extra_dict["params"] = { + "affinity_group_data": affinity_group_data, + } + + return extra_dict + + def calculate_diff_items( + self, + indata, + db_nsr, + db_ro_nsr, + db_nsr_update, + item, + tasks_by_target_record_id, + action_id, + nsr_id, + task_index, + vnfr_id=None, + vnfr=None, + ): + """Function that returns the incremental changes (creation, deletion) + related to a specific item `item` to be done. This function should be + called for NS instantiation, NS termination, NS update to add a new VNF + or a new VLD, remove a VNF or VLD, etc. + Item can be `net, `flavor`, `image` or `vdu`. + It takes a list of target items from indata (which came from the REST API) + and compares with the existing items from db_ro_nsr, identifying the + incremental changes to be done. During the comparison, it calls the method + `process_params` (which was passed as parameter, and is particular for each + `item`) + + Args: + indata (Dict[str, Any]): deployment info + db_nsr: NSR record from DB + db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" + db_nsr_update (Dict[str, Any]): NSR info to update in DB + item (str): element to process (net, vdu...) + tasks_by_target_record_id (Dict[str, Any]): + [, ] + action_id (str): action id + nsr_id (str): NSR id + task_index (number): task index to add to task name + vnfr_id (str): VNFR id + vnfr (Dict[str, Any]): VNFR info + + Returns: + List: list with the incremental changes (deletes, creates) for each item + number: current task index + """ + + diff_items = [] + db_path = "" + db_record = "" + target_list = [] + existing_list = [] + process_params = None + vdu2cloud_init = indata.get("cloud_init_content") or {} + ro_nsr_public_key = db_ro_nsr["public_key"] + + # According to the type of item, the path, the target_list, + # the existing_list and the method to process params are set + db_path = self.db_path_map[item] + process_params = self.process_params_function_map[item] + if item in ("net", "vdu"): + if vnfr is None: + db_record = "nsrs:{}:{}".format(nsr_id, db_path) + target_list = indata.get("ns", []).get(db_path, []) + existing_list = db_nsr.get(db_path, []) + else: + db_record = "vnfrs:{}:{}".format(vnfr_id, db_path) + target_vnf = next( + (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), + None, + ) + target_list = target_vnf.get(db_path, []) if target_vnf else [] + existing_list = vnfr.get(db_path, []) + elif item in ("image", "flavor", "affinity-or-anti-affinity-group"): + db_record = "nsrs:{}:{}".format(nsr_id, db_path) + target_list = indata.get(item, []) + existing_list = db_nsr.get(item, []) + else: + raise NsException("Item not supported: {}", item) + + # ensure all the target_list elements has an "id". If not assign the index as id + if target_list is None: + target_list = [] + for target_index, tl in enumerate(target_list): + if tl and not tl.get("id"): + tl["id"] = str(target_index) + + # step 1 items (networks,vdus,...) to be deleted/updated + for item_index, existing_item in enumerate(existing_list): + target_item = next( + (t for t in target_list if t["id"] == existing_item["id"]), + None, + ) + + for target_vim, existing_viminfo in existing_item.get( + "vim_info", {} + ).items(): + if existing_viminfo is None: + continue + + if target_item: + target_viminfo = target_item.get("vim_info", {}).get(target_vim) + else: + target_viminfo = None + + if target_viminfo is None: + # must be deleted + self._assign_vim(target_vim) + target_record_id = "{}.{}".format(db_record, existing_item["id"]) + item_ = item + + if target_vim.startswith("sdn"): + # item must be sdn-net instead of net if target_vim is a sdn + item_ = "sdn_net" + target_record_id += ".sdn" + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + diff_items.append( + { + "deployment_info": deployment_info, + "target_id": target_vim, + "item": item_, + "action": "DELETE", + "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", + "target_record_id": target_record_id, + } + ) + task_index += 1 + + # step 2 items (networks,vdus,...) to be created + for target_item in target_list: + item_index = -1 + + for item_index, existing_item in enumerate(existing_list): + if existing_item["id"] == target_item["id"]: + break + else: + item_index += 1 + db_nsr_update[db_path + ".{}".format(item_index)] = target_item + existing_list.append(target_item) + existing_item = None + + for target_vim, target_viminfo in target_item.get("vim_info", {}).items(): + existing_viminfo = None + + if existing_item: + existing_viminfo = existing_item.get("vim_info", {}).get(target_vim) + + if existing_viminfo is not None: + continue + + target_record_id = "{}.{}".format(db_record, target_item["id"]) + item_ = item + + if target_vim.startswith("sdn"): + # item must be sdn-net instead of net if target_vim is a sdn + item_ = "sdn_net" + target_record_id += ".sdn" + + kwargs = {} + if process_params == Ns._process_vdu_params: + kwargs.update( + { + "vnfr_id": vnfr_id, + "nsr_id": nsr_id, + "vnfr": vnfr, + "vdu2cloud_init": vdu2cloud_init, + "tasks_by_target_record_id": tasks_by_target_record_id, + "logger": self.logger, + "db": self.db, + "fs": self.fs, + "ro_nsr_public_key": ro_nsr_public_key, + } + ) + + extra_dict = process_params( + target_item, + indata, + target_viminfo, + target_record_id, + **kwargs, + ) + self._assign_vim(target_vim) + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + new_item = { + "deployment_info": deployment_info, + "target_id": target_vim, + "item": item_, + "action": "CREATE", + "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", + "target_record_id": target_record_id, + "extra_dict": extra_dict, + "common_id": target_item.get("common_id", None), + } + diff_items.append(new_item) + tasks_by_target_record_id[target_record_id] = new_item + task_index += 1 + + db_nsr_update[db_path + ".{}".format(item_index)] = target_item + + return diff_items, task_index + + def calculate_all_differences_to_deploy( + self, + indata, + nsr_id, + db_nsr, + db_vnfrs, + db_ro_nsr, + db_nsr_update, + db_vnfrs_update, + action_id, + tasks_by_target_record_id, + ): + """This method calculates the ordered list of items (`changes_list`) + to be created and deleted. + + Args: + indata (Dict[str, Any]): deployment info + nsr_id (str): NSR id + db_nsr: NSR record from DB + db_vnfrs: VNFRS record from DB + db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" + db_nsr_update (Dict[str, Any]): NSR info to update in DB + db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB + action_id (str): action id + tasks_by_target_record_id (Dict[str, Any]): + [, ] + + Returns: + List: ordered list of items to be created and deleted. + """ + + task_index = 0 + # set list with diffs: + changes_list = [] + + # NS vld, image and flavor + for item in ["net", "image", "flavor", "affinity-or-anti-affinity-group"]: + self.logger.debug("process NS={} {}".format(nsr_id, item)) + diff_items, task_index = self.calculate_diff_items( + indata=indata, + db_nsr=db_nsr, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_nsr_update, + item=item, + tasks_by_target_record_id=tasks_by_target_record_id, + action_id=action_id, + nsr_id=nsr_id, + task_index=task_index, + vnfr_id=None, + ) + changes_list += diff_items + + # VNF vlds and vdus + for vnfr_id, vnfr in db_vnfrs.items(): + # vnfr_id need to be set as global variable for among others nested method _process_vdu_params + for item in ["net", "vdu"]: + self.logger.debug("process VNF={} {}".format(vnfr_id, item)) + diff_items, task_index = self.calculate_diff_items( + indata=indata, + db_nsr=db_nsr, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_vnfrs_update[vnfr["_id"]], + item=item, + tasks_by_target_record_id=tasks_by_target_record_id, + action_id=action_id, + nsr_id=nsr_id, + task_index=task_index, + vnfr_id=vnfr_id, + vnfr=vnfr, + ) + changes_list += diff_items + + return changes_list + + def define_all_tasks( + self, + changes_list, + db_new_tasks, + tasks_by_target_record_id, + ): + """Function to create all the task structures obtanied from + the method calculate_all_differences_to_deploy + + Args: + changes_list (List): ordered list of items to be created or deleted + db_new_tasks (List): tasks list to be created + action_id (str): action id + tasks_by_target_record_id (Dict[str, Any]): + [, ] + + """ + + for change in changes_list: + task = Ns._create_task( + deployment_info=change["deployment_info"], + target_id=change["target_id"], + item=change["item"], + action=change["action"], + target_record=change["target_record"], + target_record_id=change["target_record_id"], + extra_dict=change.get("extra_dict", None), + ) + + tasks_by_target_record_id[change["target_record_id"]] = task + db_new_tasks.append(task) + + if change.get("common_id"): + task["common_id"] = change["common_id"] + + def upload_all_tasks( + self, + db_new_tasks, + now, + ): + """Function to save all tasks in the common DB + + Args: + db_new_tasks (List): tasks list to be created + now (time): current time + + """ + + nb_ro_tasks = 0 # for logging + + for db_task in db_new_tasks: + target_id = db_task.pop("target_id") + common_id = db_task.get("common_id") + + if common_id: + if self.db.set_one( + "ro_tasks", + q_filter={ + "target_id": target_id, + "tasks.common_id": common_id, + }, + update_dict={"to_check_at": now, "modified_at": now}, + push={"tasks": db_task}, + fail_on_empty=False, + ): + continue + + if not self.db.set_one( + "ro_tasks", + q_filter={ + "target_id": target_id, + "tasks.target_record": db_task["target_record"], + }, + update_dict={"to_check_at": now, "modified_at": now}, + push={"tasks": db_task}, + fail_on_empty=False, + ): + # Create a ro_task + self.logger.debug("Updating database, Creating ro_tasks") + db_ro_task = Ns._create_ro_task(target_id, db_task) + nb_ro_tasks += 1 + self.db.create("ro_tasks", db_ro_task) + + self.logger.debug( + "Created {} ro_tasks; {} tasks - db_new_tasks={}".format( + nb_ro_tasks, len(db_new_tasks), db_new_tasks + ) + ) + def deploy(self, session, indata, version, nsr_id, *args, **kwargs): self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata)) validate_input(indata, deploy_schema) @@ -1090,8 +1547,6 @@ class Ns(object): db_nsr_update = {} # update operation on nsrs db_vnfrs_update = {} db_vnfrs = {} # vnf's info indexed by _id - nb_ro_tasks = 0 # for logging - vdu2cloud_init = indata.get("cloud_init_content") or {} step = "" logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) self.logger.debug(logging_text + "Enter") @@ -1118,8 +1573,6 @@ class Ns(object): if not db_ro_nsr: db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now) - ro_nsr_public_key = db_ro_nsr["public_key"] - # check that action_id is not in the list of actions. Suffixed with :index if action_id in db_ro_nsr["actions"]: index = 1 @@ -1137,168 +1590,6 @@ class Ns(object): index += 1 - def _process_items( - target_list, - existing_list, - db_record, - db_update, - db_path, - item, - process_params, - ): - nonlocal db_new_tasks - nonlocal tasks_by_target_record_id - nonlocal action_id - nonlocal nsr_id - nonlocal task_index - nonlocal indata - - # ensure all the target_list elements has an "id". If not assign the index as id - for target_index, tl in enumerate(target_list): - if tl and not tl.get("id"): - tl["id"] = str(target_index) - - # step 1 items (networks,vdus,...) to be deleted/updated - for item_index, existing_item in enumerate(existing_list): - target_item = next( - (t for t in target_list if t["id"] == existing_item["id"]), None - ) - - for target_vim, existing_viminfo in existing_item.get( - "vim_info", {} - ).items(): - if existing_viminfo is None: - continue - - if target_item: - target_viminfo = target_item.get("vim_info", {}).get( - target_vim - ) - else: - target_viminfo = None - - if target_viminfo is None: - # must be deleted - self._assign_vim(target_vim) - target_record_id = "{}.{}".format( - db_record, existing_item["id"] - ) - item_ = item - - if target_vim.startswith("sdn"): - # item must be sdn-net instead of net if target_vim is a sdn - item_ = "sdn_net" - target_record_id += ".sdn" - - deployment_info = { - "action_id": action_id, - "nsr_id": nsr_id, - "task_index": task_index, - } - - task = Ns._create_task( - deployment_info=deployment_info, - target_id=target_vim, - item=item_, - action="DELETE", - target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", - target_record_id=target_record_id, - ) - - task_index = deployment_info.get("task_index") - - tasks_by_target_record_id[target_record_id] = task - db_new_tasks.append(task) - # TODO delete - # TODO check one by one the vims to be created/deleted - - # step 2 items (networks,vdus,...) to be created - for target_item in target_list: - item_index = -1 - - for item_index, existing_item in enumerate(existing_list): - if existing_item["id"] == target_item["id"]: - break - else: - item_index += 1 - db_update[db_path + ".{}".format(item_index)] = target_item - existing_list.append(target_item) - existing_item = None - - for target_vim, target_viminfo in target_item.get( - "vim_info", {} - ).items(): - existing_viminfo = None - - if existing_item: - existing_viminfo = existing_item.get("vim_info", {}).get( - target_vim - ) - - # TODO check if different. Delete and create??? - # TODO delete if not exist - if existing_viminfo is not None: - continue - - target_record_id = "{}.{}".format(db_record, target_item["id"]) - item_ = item - - if target_vim.startswith("sdn"): - # item must be sdn-net instead of net if target_vim is a sdn - item_ = "sdn_net" - target_record_id += ".sdn" - - kwargs = {} - if process_params == Ns._process_vdu_params: - kwargs.update( - { - "vnfr_id": vnfr_id, - "nsr_id": nsr_id, - "vnfr": vnfr, - "vdu2cloud_init": vdu2cloud_init, - "tasks_by_target_record_id": tasks_by_target_record_id, - "logger": self.logger, - "db": self.db, - "fs": self.fs, - "ro_nsr_public_key": ro_nsr_public_key, - } - ) - - extra_dict = process_params( - target_item, - indata, - target_viminfo, - target_record_id, - **kwargs, - ) - self._assign_vim(target_vim) - - deployment_info = { - "action_id": action_id, - "nsr_id": nsr_id, - "task_index": task_index, - } - - task = Ns._create_task( - deployment_info=deployment_info, - target_id=target_vim, - item=item_, - action="CREATE", - target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", - target_record_id=target_record_id, - extra_dict=extra_dict, - ) - - task_index = deployment_info.get("task_index") - - tasks_by_target_record_id[target_record_id] = task - db_new_tasks.append(task) - - if target_item.get("common_id"): - task["common_id"] = target_item["common_id"] - - db_update[db_path + ".{}".format(item_index)] = target_item - def _process_action(indata): nonlocal db_new_tasks nonlocal action_id @@ -1384,108 +1675,30 @@ class Ns(object): _process_action(indata) else: # compute network differences - # NS.vld - step = "process NS VLDs" - _process_items( - target_list=indata["ns"]["vld"] or [], - existing_list=db_nsr.get("vld") or [], - db_record="nsrs:{}:vld".format(nsr_id), - db_update=db_nsr_update, - db_path="vld", - item="net", - process_params=Ns._process_net_params, + # NS + step = "process NS elements" + changes_list = self.calculate_all_differences_to_deploy( + indata=indata, + nsr_id=nsr_id, + db_nsr=db_nsr, + db_vnfrs=db_vnfrs, + db_ro_nsr=db_ro_nsr, + db_nsr_update=db_nsr_update, + db_vnfrs_update=db_vnfrs_update, + action_id=action_id, + tasks_by_target_record_id=tasks_by_target_record_id, ) - - step = "process NS images" - _process_items( - target_list=indata.get("image") or [], - existing_list=db_nsr.get("image") or [], - db_record="nsrs:{}:image".format(nsr_id), - db_update=db_nsr_update, - db_path="image", - item="image", - process_params=Ns._process_image_params, + self.define_all_tasks( + changes_list=changes_list, + db_new_tasks=db_new_tasks, + tasks_by_target_record_id=tasks_by_target_record_id, ) - step = "process NS flavors" - _process_items( - target_list=indata.get("flavor") or [], - existing_list=db_nsr.get("flavor") or [], - db_record="nsrs:{}:flavor".format(nsr_id), - db_update=db_nsr_update, - db_path="flavor", - item="flavor", - process_params=Ns._process_flavor_params, - ) - - # VNF.vld - for vnfr_id, vnfr in db_vnfrs.items(): - # vnfr_id need to be set as global variable for among others nested method _process_vdu_params - step = "process VNF={} VLDs".format(vnfr_id) - target_vnf = next( - ( - vnf - for vnf in indata.get("vnf", ()) - if vnf["_id"] == vnfr_id - ), - None, - ) - target_list = target_vnf.get("vld") if target_vnf else None - _process_items( - target_list=target_list or [], - existing_list=vnfr.get("vld") or [], - db_record="vnfrs:{}:vld".format(vnfr_id), - db_update=db_vnfrs_update[vnfr["_id"]], - db_path="vld", - item="net", - process_params=Ns._process_net_params, - ) - - target_list = target_vnf.get("vdur") if target_vnf else None - step = "process VNF={} VDUs".format(vnfr_id) - _process_items( - target_list=target_list or [], - existing_list=vnfr.get("vdur") or [], - db_record="vnfrs:{}:vdur".format(vnfr_id), - db_update=db_vnfrs_update[vnfr["_id"]], - db_path="vdur", - item="vdu", - process_params=Ns._process_vdu_params, - ) - - for db_task in db_new_tasks: - step = "Updating database, Appending tasks to ro_tasks" - target_id = db_task.pop("target_id") - common_id = db_task.get("common_id") - - if common_id: - if self.db.set_one( - "ro_tasks", - q_filter={ - "target_id": target_id, - "tasks.common_id": common_id, - }, - update_dict={"to_check_at": now, "modified_at": now}, - push={"tasks": db_task}, - fail_on_empty=False, - ): - continue - - if not self.db.set_one( - "ro_tasks", - q_filter={ - "target_id": target_id, - "tasks.target_record": db_task["target_record"], - }, - update_dict={"to_check_at": now, "modified_at": now}, - push={"tasks": db_task}, - fail_on_empty=False, - ): - # Create a ro_task - step = "Updating database, Creating ro_tasks" - db_ro_task = Ns._create_ro_task(target_id, db_task) - nb_ro_tasks += 1 - self.db.create("ro_tasks", db_ro_task) + step = "Updating database, Appending tasks to ro_tasks" + self.upload_all_tasks( + db_new_tasks=db_new_tasks, + now=now, + ) step = "Updating database, nsrs" if db_nsr_update: @@ -1497,10 +1710,7 @@ class Ns(object): self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update) self.logger.debug( - logging_text - + "Exit. Created {} ro_tasks; {} tasks".format( - nb_ro_tasks, len(db_new_tasks) - ) + logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) ) return (