X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=3f14c587538ee0f570e667347efba4765a226173;hp=68d2a721e3a67046ee04fa0539633e71a510a9c5;hb=78f474e69fde9d64e8716978b5ea38f9f5aace48;hpb=8f2060bb4e1d8eb2f522f09f0970d7b24dfd9ce9 diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index 68d2a721..3f14c587 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -19,8 +19,6 @@ from http import HTTPStatus import logging from random import choice as random_choice - -import yaml from threading import Lock from time import time from traceback import format_exc as traceback_format_exc @@ -51,6 +49,7 @@ from osm_common.fsbase import FsBase, FsException from osm_common.msgbase import MsgException from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException from osm_ng_ro.validation import deploy_schema, validate_input +import yaml __author__ = "Alfonso Tierno " min_common_version = "0.1.16" @@ -509,6 +508,7 @@ class Ns(object): "vim_name": None, "vim_status": None, "vim_details": None, + "vim_message": None, "refresh_at": None, }, "modified_at": now, @@ -958,6 +958,37 @@ class Ns(object): extra_dict = {"depends_on": [image_text, flavor_text]} net_list = [] + # If the position info is provided for all the interfaces, it will be sorted + # according to position number ascendingly. + if all(i.get("position") for i in target_vdu["interfaces"]): + sorted_interfaces = sorted( + target_vdu["interfaces"], + key=lambda x: (x.get("position") is None, x.get("position")), + ) + target_vdu["interfaces"] = sorted_interfaces + + # If the position info is provided for some interfaces but not all of them, the interfaces + # which has specific position numbers will be placed and others' positions will not be taken care. + else: + if any(i.get("position") for i in target_vdu["interfaces"]): + n = len(target_vdu["interfaces"]) + sorted_interfaces = [-1] * n + k, m = 0, 0 + while k < n: + if target_vdu["interfaces"][k].get("position"): + idx = target_vdu["interfaces"][k]["position"] + sorted_interfaces[idx - 1] = target_vdu["interfaces"][k] + k += 1 + while m < n: + if not target_vdu["interfaces"][m].get("position"): + idy = sorted_interfaces.index(-1) + sorted_interfaces[idy] = target_vdu["interfaces"][m] + m += 1 + + target_vdu["interfaces"] = sorted_interfaces + + # If the position info is not provided for the interfaces, interfaces will be attached + # according to the order in the VNFD. for iface_index, interface in enumerate(target_vdu["interfaces"]): if interface.get("ns-vld-id"): net_text = ns_preffix + ":vld." + interface["ns-vld-id"] @@ -1195,7 +1226,7 @@ class Ns(object): """ vnfr = kwargs.get("vnfr") vdu2cloud_init = kwargs.get("vdu2cloud_init") - #logger = kwargs.get("logger") + # logger = kwargs.get("logger") db = kwargs.get("db") fs = kwargs.get("fs") ro_nsr_public_key = kwargs.get("ro_nsr_public_key") @@ -1230,9 +1261,15 @@ class Ns(object): "floating_ip", ) } - existing_ifaces = existing_vdu["vim_info"][target_id].get("interfaces", []) + existing_ifaces = existing_vdu["vim_info"][target_id].get( + "interfaces_backup", [] + ) net_id = next( - (i["vim_net_id"] for i in existing_ifaces if i["ip_address"] == interface["ip-address"]), + ( + i["vim_net_id"] + for i in existing_ifaces + if i["ip_address"] == interface["ip-address"] + ), None, ) @@ -1311,8 +1348,13 @@ class Ns(object): affinity_group = {} for affinity_group_id in existing_vdu["affinity-or-anti-affinity-group-id"]: for group in db_nsr.get("affinity-or-anti-affinity-group"): - if group["id"] == affinity_group_id and group["vim_info"][target_id].get("vim_id", None) is not None: - affinity_group["affinity_group_id"] = group["vim_info"][target_id].get("vim_id", None) + if ( + group["id"] == affinity_group_id + and group["vim_info"][target_id].get("vim_id", None) is not None + ): + affinity_group["affinity_group_id"] = group["vim_info"][ + target_id + ].get("vim_id", None) affinity_group_list.append(affinity_group) extra_dict["params"] = { @@ -1658,9 +1700,7 @@ class Ns(object): extra_dict=change.get("extra_dict", None), ) - self.logger.warning( - "ns.define_all_tasks task={}".format(task) - ) + self.logger.warning("ns.define_all_tasks task={}".format(task)) tasks_by_target_record_id[change["target_record_id"]] = task db_new_tasks.append(task) @@ -1686,12 +1726,17 @@ class Ns(object): target_id = db_task.pop("target_id") common_id = db_task.get("common_id") + # Do not chek tasks with vim_status DELETED + # because in manual heealing there are two tasks for the same vdur: + # one with vim_status deleted and the other one with the actual VM status. + if common_id: if self.db.set_one( "ro_tasks", q_filter={ "target_id": target_id, "tasks.common_id": common_id, + "vim_info.vim_status.ne": "DELETED", }, update_dict={"to_check_at": now, "modified_at": now}, push={"tasks": db_task}, @@ -1704,6 +1749,7 @@ class Ns(object): q_filter={ "target_id": target_id, "tasks.target_record": db_task["target_record"], + "vim_info.vim_status.ne": "DELETED", }, update_dict={"to_check_at": now, "modified_at": now}, push={"tasks": db_task}, @@ -1746,11 +1792,16 @@ class Ns(object): self.logger.debug("Updating database, Creating ro_tasks") db_ro_task = Ns._create_ro_task(target_id, db_task) - # If DELETE task: the associated created items shoud be removed + # If DELETE task: the associated created items should be removed # (except persistent volumes): if action == "DELETE": db_ro_task["vim_info"]["created"] = True - db_ro_task["vim_info"]["created_items"] = db_task.get("created_items", {}) + db_ro_task["vim_info"]["created_items"] = db_task.get( + "created_items", {} + ) + db_ro_task["vim_info"]["volumes_to_hold"] = db_task.get( + "volumes_to_hold", [] + ) db_ro_task["vim_info"]["vim_id"] = db_task.get("vim_id", None) nb_ro_tasks += 1 @@ -1765,16 +1816,23 @@ class Ns(object): def _prepare_created_items_for_healing( self, - target_id, - existing_vdu, + nsr_id, + target_record, ): - # Only ports are considered because created volumes are persistent - ports_list = {} - vim_interfaces = existing_vdu["vim_info"][target_id].get("interfaces", []) - for iface in vim_interfaces: - ports_list["port:" + iface["vim_interface_id"]] = True + created_items = {} + # Get created_items from ro_task + ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) + for ro_task in ro_tasks: + for task in ro_task["tasks"]: + if ( + task["target_record"] == target_record + and task["action"] == "CREATE" + and ro_task["vim_info"]["created_items"] + ): + created_items = ro_task["vim_info"]["created_items"] + break - return ports_list + return created_items def _prepare_persistent_volumes_for_healing( self, @@ -1837,7 +1895,10 @@ class Ns(object): item_index = 0 existing_instance = None for instance in existing_vnf.get("vdur", None): - if (instance["vdu-name"] == vdu_name and instance["count-index"] == count_index): + if ( + instance["vdu-name"] == vdu_name + and instance["count-index"] == count_index + ): existing_instance = instance break else: @@ -1859,7 +1920,7 @@ class Ns(object): target_record = f"{db_record}.{item_index}.vim_info.{target_vim}" created_items = self._prepare_created_items_for_healing( - target_vim, existing_instance + nsr_id, target_record ) volumes_to_hold = self._prepare_persistent_volumes_for_healing( @@ -1915,6 +1976,20 @@ class Ns(object): # The CREATE task depens on the DELETE task extra_dict["depends_on"] = [delete_task_id] + # Add volumes created from created_items if any + # Ports should be deleted with delete task and automatically created with create task + volumes = {} + for k, v in created_items.items(): + try: + k_item, _, k_id = k.partition(":") + if k_item == "volume": + volumes[k] = v + except Exception as e: + self.logger.error( + "Error evaluating created item {}: {}".format(k, e) + ) + extra_dict["previous_created_volumes"] = volumes + deployment_info = { "action_id": action_id, "nsr_id": nsr_id, @@ -1992,7 +2067,7 @@ class Ns(object): ) # Delete all ro_tasks registered for the targets vdurs (target_record) - # If task of type CREATE exist then vim will try to get info form deleted VMs. + # If task of type CREATE exist then vim will try to get info form deleted VMs. # So remove all task related to target record. ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) for change in changes_list: @@ -2007,7 +2082,7 @@ class Ns(object): }, fail_on_empty=False, ) - + step = "Updating database, Appending tasks to ro_tasks" self.upload_recreate_tasks( db_new_tasks=db_new_tasks, @@ -2269,7 +2344,7 @@ class Ns(object): error_text = "Error at {} {}: {}".format( task["action"].lower(), task["item"], - ro_task["vim_info"].get("vim_details") or "unknown", + ro_task["vim_info"].get("vim_message") or "unknown", ) details.append(error_text) elif task["status"] in ("SCHEDULED", "BUILD"): @@ -2304,6 +2379,99 @@ class Ns(object): return None, None, True + def rebuild_start_stop_task( + self, + vdu_id, + vnf_id, + vdu_index, + action_id, + nsr_id, + task_index, + target_vim, + extra_dict, + ): + self._assign_vim(target_vim) + target_record = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_index) + target_record_id = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_id) + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + task = Ns._create_task( + deployment_info=deployment_info, + target_id=target_vim, + item="update", + action="EXEC", + target_record=target_record, + target_record_id=target_record_id, + extra_dict=extra_dict, + ) + return task + + def rebuild_start_stop( + self, session, action_dict, version, nsr_id, *args, **kwargs + ): + task_index = 0 + extra_dict = {} + now = time() + action_id = action_dict.get("action_id", str(uuid4())) + step = "" + logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) + self.logger.debug(logging_text + "Enter") + + action = list(action_dict.keys())[0] + task_dict = action_dict.get(action) + vim_vm_id = action_dict.get(action).get("vim_vm_id") + + if action_dict.get("stop"): + action = "shutoff" + db_new_tasks = [] + try: + step = "lock the operation & do task creation" + with self.write_lock: + extra_dict["params"] = { + "vim_vm_id": vim_vm_id, + "action": action, + } + task = self.rebuild_start_stop_task( + task_dict["vdu_id"], + task_dict["vnf_id"], + task_dict["vdu_index"], + action_id, + nsr_id, + task_index, + task_dict["target_vim"], + extra_dict, + ) + db_new_tasks.append(task) + step = "upload Task to db" + self.upload_all_tasks( + db_new_tasks=db_new_tasks, + now=now, + ) + self.logger.debug( + logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) + ) + return ( + {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, + action_id, + True, + ) + except Exception as e: + if isinstance(e, (DbException, NsException)): + self.logger.error( + logging_text + "Exit Exception while '{}': {}".format(step, e) + ) + else: + e = traceback_format_exc() + self.logger.critical( + logging_text + "Exit Exception while '{}': {}".format(step, e), + exc_info=True, + ) + raise NsException(e) + def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs): nsrs = self.db.get_list("nsrs", {}) return_data = [] @@ -2323,3 +2491,118 @@ class Ns(object): return_data.append(task["action_id"]) return return_data, None, True + + def migrate_task( + self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict + ): + target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items()) + self._assign_vim(target_vim) + target_record = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu_index) + target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"]) + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + task = Ns._create_task( + deployment_info=deployment_info, + target_id=target_vim, + item="migrate", + action="EXEC", + target_record=target_record, + target_record_id=target_record_id, + extra_dict=extra_dict, + ) + + return task + + def migrate(self, session, indata, version, nsr_id, *args, **kwargs): + task_index = 0 + extra_dict = {} + now = time() + action_id = indata.get("action_id", str(uuid4())) + step = "" + logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) + self.logger.debug(logging_text + "Enter") + try: + vnf_instance_id = indata["vnfInstanceId"] + step = "Getting vnfrs from db" + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + vdu = indata.get("vdu") + migrateToHost = indata.get("migrateToHost") + db_new_tasks = [] + + with self.write_lock: + if vdu is not None: + vdu_id = indata["vdu"]["vduId"] + vdu_count_index = indata["vdu"].get("vduCountIndex", 0) + for vdu_index, vdu in enumerate(db_vnfr["vdur"]): + if ( + vdu["vdu-id-ref"] == vdu_id + and vdu["count-index"] == vdu_count_index + ): + extra_dict["params"] = { + "vim_vm_id": vdu["vim-id"], + "migrate_host": migrateToHost, + "vdu_vim_info": vdu["vim_info"], + } + step = "Creating migration task for vdu:{}".format(vdu) + task = self.migrate_task( + vdu, + db_vnfr, + vdu_index, + action_id, + nsr_id, + task_index, + extra_dict, + ) + db_new_tasks.append(task) + task_index += 1 + break + else: + + for vdu_index, vdu in enumerate(db_vnfr["vdur"]): + extra_dict["params"] = { + "vim_vm_id": vdu["vim-id"], + "migrate_host": migrateToHost, + "vdu_vim_info": vdu["vim_info"], + } + step = "Creating migration task for vdu:{}".format(vdu) + task = self.migrate_task( + vdu, + db_vnfr, + vdu_index, + action_id, + nsr_id, + task_index, + extra_dict, + ) + db_new_tasks.append(task) + task_index += 1 + + self.upload_all_tasks( + db_new_tasks=db_new_tasks, + now=now, + ) + + self.logger.debug( + logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) + ) + return ( + {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, + action_id, + True, + ) + except Exception as e: + if isinstance(e, (DbException, NsException)): + self.logger.error( + logging_text + "Exit Exception while '{}': {}".format(step, e) + ) + else: + e = traceback_format_exc() + self.logger.critical( + logging_text + "Exit Exception while '{}': {}".format(step, e), + exc_info=True, + ) + raise NsException(e)