X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=6b981371ed5994239938aeb3e6df56e5e1e2269f;hb=053422d42b10d20fefe5fdd565faf9658d92ba7f;hp=8c10554f7ac5a6f80c89e73362e408b2fb1f2c55;hpb=22f4f9c3f2fef11377202b95fe2333b78255f8de;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 8c10554..6b98137 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -22,19 +22,112 @@ from os import environ, path from n2vc.vnf import N2VC from n2vc import version as N2VC_version +from collections import OrderedDict from copy import deepcopy from http import HTTPStatus from time import time __author__ = "Alfonso Tierno" -min_RO_version = [0, 5, 69] +min_RO_version = [0, 5, 72] class LcmException(Exception): pass +class TaskRegistry: + """ + Implements a registry of task needed for later cancelation, look for related tasks that must be completed before + etc. It stores a four level dict + First level is the topic, ns, vim_account, sdn + Second level is the _id + Third level is the operation id + Fourth level is a descriptive name, the value is the task class + """ + + def __init__(self): + self.task_registry = { + "ns": {}, + "vim_account": {}, + "sdn": {}, + } + + def register(self, topic, _id, op_id, task_name, task): + """ + Register a new task + :param topic: Can be "ns", "vim_account", "sdn" + :param _id: _id of the related item + :param op_id: id of the operation of the related item + :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id + :param task: Task class + :return: none + """ + if _id not in self.task_registry[topic]: + self.task_registry[topic][_id] = OrderedDict() + if op_id not in self.task_registry[topic][_id]: + self.task_registry[topic][_id][op_id] = {task_name: task} + else: + self.task_registry[topic][_id][op_id][task_name] = task + # print("registering task", topic, _id, op_id, task_name, task) + + def remove(self, topic, _id, op_id, task_name=None): + """ + When task is ended, it should removed. It ignores missing tasks + :param topic: Can be "ns", "vim_account", "sdn" + :param _id: _id of the related item + :param op_id: id of the operation of the related item + :param task_name: Task descriptive name. If note it deletes all + :return: + """ + if not self.task_registry[topic].get(_id) or not self.task_registry[topic][_id].get(op_id): + return + if not task_name: + # print("deleting tasks", topic, _id, op_id, self.task_registry[topic][_id][op_id]) + del self.task_registry[topic][_id][op_id] + elif task_name in self.task_registry[topic][_id][op_id]: + # print("deleting tasks", topic, _id, op_id, task_name, self.task_registry[topic][_id][op_id][task_name]) + del self.task_registry[topic][_id][op_id][task_name] + if not self.task_registry[topic][_id][op_id]: + del self.task_registry[topic][_id][op_id] + if not self.task_registry[topic][_id]: + del self.task_registry[topic][_id] + + def lookfor_related(self, topic, _id, my_op_id=None): + task_list = [] + task_name_list = [] + if _id not in self.task_registry[topic]: + return "", task_name_list + for op_id in reversed(self.task_registry[topic][_id]): + if my_op_id: + if my_op_id == op_id: + my_op_id = None # so that the next task is taken + continue + + for task_name, task in self.task_registry[topic][_id][op_id].items(): + task_list.append(task) + task_name_list.append(task_name) + break + return ", ".join(task_name_list), task_list + + def cancel(self, topic, _id, target_op_id=None, target_task_name=None): + """ + Cancel all active tasks of a concrete ns, vim_account, sdn identified for _id. If op_id is supplied only this is + cancelled, and the same with task_name + """ + if not self.task_registry[topic].get(_id): + return + for op_id in reversed(self.task_registry[topic][_id]): + if target_op_id and target_op_id != op_id: + continue + for task_name, task in self.task_registry[topic][_id][op_id].items(): + if target_task_name and target_task_name != task_name: + continue + result = task.cancel() + if result: + self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) + + class Lcm: def __init__(self, config_file): @@ -50,9 +143,7 @@ class Lcm: self.pings_not_received = 1 # contains created tasks/futures to be able to cancel - self.lcm_ns_tasks = {} - self.lcm_vim_tasks = {} - self.lcm_sdn_tasks = {} + self.lcm_tasks = TaskRegistry() # logging self.logger = logging.getLogger('lcm') # load configuration @@ -161,12 +252,6 @@ class Lcm: self.logger.critical("Error while conneting to osm/RO " + str(e), exc_info=True) raise LcmException(str(e)) - def update_db(self, item, _id, _desc): - try: - self.db.replace(item, _id, _desc) - except DbException as e: - self.logger.error("Updating {} _id={}: {}".format(item, _id, e)) - def update_db_2(self, item, _id, _desc): """ Updates database with _desc information. Upon success _desc is cleared @@ -188,16 +273,13 @@ class Lcm: logging_text = "Task vim_create={} ".format(vim_id) self.logger.debug(logging_text + "Enter") db_vim = None + db_vim_update = {} exc = None RO_sdn_id = None try: step = "Getting vim-id='{}' from db".format(vim_id) db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) - if "_admin" not in db_vim: - db_vim["_admin"] = {} - if "deployed" not in db_vim["_admin"]: - db_vim["_admin"]["deployed"] = {} - db_vim["_admin"]["deployed"]["RO"] = None + db_vim_update["_admin.deployed.RO"] = None if vim_content.get("config") and vim_content["config"].get("sdn-controller"): step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"]) db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]}) @@ -208,6 +290,8 @@ class Lcm: vim_content["config"]["sdn-controller"])) step = "Creating vim at RO" + db_vim_update["_admin.detailed-status"] = step + self.update_db_2("vim_accounts", vim_id, db_vim_update) RO = ROclient.ROClient(self.loop, **self.ro_config) vim_RO = deepcopy(vim_content) vim_RO.pop("_id", None) @@ -222,10 +306,12 @@ class Lcm: vim_RO["config"]["sdn-controller"] = RO_sdn_id desc = await RO.create("vim", descriptor=vim_RO) RO_vim_id = desc["uuid"] - db_vim["_admin"]["deployed"]["RO"] = RO_vim_id - self.update_db("vim_accounts", vim_id, db_vim) + db_vim_update["_admin.deployed.RO"] = RO_vim_id step = "Creating vim_account at RO" + db_vim_update["_admin.detailed-status"] = step + self.update_db_2("vim_accounts", vim_id, db_vim_update) + vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"], "vim_username": vim_content["vim_user"], "vim_password": vim_content["vim_password"] @@ -236,12 +322,14 @@ class Lcm: del vim_account_RO["config"]["sdn-controller"] if "sdn-port-mapping" in vim_account_RO["config"]: del vim_account_RO["config"]["sdn-port-mapping"] - await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO) - db_vim["_admin"]["operationalState"] = "ENABLED" - self.update_db("vim_accounts", vim_id, db_vim) + desc = await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO) + db_vim_update["_admin.deployed.RO-account"] = desc["uuid"] + db_vim_update["_admin.operationalState"] = "ENABLED" + db_vim_update["_admin.detailed-status"] = "Done" - self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id)) - return RO_vim_id + # await asyncio.sleep(15) # TODO remove. This is for test + self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id)) + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -251,9 +339,11 @@ class Lcm: exc = e finally: if exc and db_vim: - db_vim["_admin"]["operationalState"] = "ERROR" - db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("vim_accounts", vim_id, db_vim) + db_vim_update["_admin.operationalState"] = "ERROR" + db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_vim_update: + self.update_db_2("vim_accounts", vim_id, db_vim_update) + self.lcm_tasks.remove("vim_account", vim_id, order_id) async def vim_edit(self, vim_content, order_id): vim_id = vim_content["_id"] @@ -262,13 +352,33 @@ class Lcm: db_vim = None exc = None RO_sdn_id = None + RO_vim_id = None + db_vim_update = {} step = "Getting vim-id='{}' from db".format(vim_id) try: db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) + + # look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", vim_id, order_id) + if task_dependency: + step = "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + # TODO write this to database + await asyncio.wait(task_dependency, timeout=3600) + if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"): if vim_content.get("config") and vim_content["config"].get("sdn-controller"): step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"]) db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]}) + + # look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("sdn", db_sdn["_id"]) + if task_dependency: + step = "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + # TODO write this to database + await asyncio.wait(task_dependency, timeout=3600) + if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get( "RO"): RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"] @@ -312,11 +422,10 @@ class Lcm: # vim_account must be edited always even if empty in order to ensure changes are translated to RO # vim_thread. RO will remove and relaunch a new thread for this vim_account await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO) - db_vim["_admin"]["operationalState"] = "ENABLED" - self.update_db("vim_accounts", vim_id, db_vim) + db_vim_update["_admin.operationalState"] = "ENABLED" - self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id)) - return RO_vim_id + self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id)) + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -326,14 +435,17 @@ class Lcm: exc = e finally: if exc and db_vim: - db_vim["_admin"]["operationalState"] = "ERROR" - db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("vim_accounts", vim_id, db_vim) + db_vim_update["_admin.operationalState"] = "ERROR" + db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_vim_update: + self.update_db_2("vim_accounts", vim_id, db_vim_update) + self.lcm_tasks.remove("vim_account", vim_id, order_id) async def vim_delete(self, vim_id, order_id): logging_text = "Task vim_delete={} ".format(vim_id) self.logger.debug(logging_text + "Enter") db_vim = None + db_vim_update = {} exc = None step = "Getting vim from db" try: @@ -360,10 +472,10 @@ class Lcm: raise else: # nothing to delete - self.logger.error(logging_text + "Skipping. There is not RO information at database") + self.logger.error(logging_text + "Nohing to remove at RO") self.db.del_one("vim_accounts", {"_id": vim_id}) - self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id)) - return None + self.logger.debug(logging_text + "Exit Ok") + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -372,25 +484,26 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) exc = e finally: + self.lcm_tasks.remove("vim_account", vim_id, order_id) if exc and db_vim: - db_vim["_admin"]["operationalState"] = "ERROR" - db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("vim_accounts", vim_id, db_vim) + db_vim_update["_admin.operationalState"] = "ERROR" + db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_vim_update: + self.update_db_2("vim_accounts", vim_id, db_vim_update) + self.lcm_tasks.remove("vim_account", vim_id, order_id) async def sdn_create(self, sdn_content, order_id): sdn_id = sdn_content["_id"] logging_text = "Task sdn_create={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None + db_sdn_update = {} + RO_sdn_id = None exc = None try: step = "Getting sdn from db" db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) - if "_admin" not in db_sdn: - db_sdn["_admin"] = {} - if "deployed" not in db_sdn["_admin"]: - db_sdn["_admin"]["deployed"] = {} - db_sdn["_admin"]["deployed"]["RO"] = None + db_sdn_update["_admin.deployed.RO"] = None step = "Creating sdn at RO" RO = ROclient.ROClient(self.loop, **self.ro_config) @@ -402,11 +515,10 @@ class Lcm: sdn_RO.pop("description", None) desc = await RO.create("sdn", descriptor=sdn_RO) RO_sdn_id = desc["uuid"] - db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id - db_sdn["_admin"]["operationalState"] = "ENABLED" - self.update_db("sdns", sdn_id, db_sdn) - self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id)) - return RO_sdn_id + db_sdn_update["_admin.deployed.RO"] = RO_sdn_id + db_sdn_update["_admin.operationalState"] = "ENABLED" + self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id)) + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -416,15 +528,18 @@ class Lcm: exc = e finally: if exc and db_sdn: - db_sdn["_admin"]["operationalState"] = "ERROR" - db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("sdns", sdn_id, db_sdn) + db_sdn_update["_admin.operationalState"] = "ERROR" + db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_sdn_update: + self.update_db_2("sdns", sdn_id, db_sdn_update) + self.lcm_tasks.remove("sdn", sdn_id, order_id) async def sdn_edit(self, sdn_content, order_id): sdn_id = sdn_content["_id"] logging_text = "Task sdn_edit={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None + db_sdn_update = {} exc = None step = "Getting sdn from db" try: @@ -441,11 +556,10 @@ class Lcm: sdn_RO.pop("description", None) if sdn_RO: await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO) - db_sdn["_admin"]["operationalState"] = "ENABLED" - self.update_db("sdns", sdn_id, db_sdn) + db_sdn_update["_admin.operationalState"] = "ENABLED" self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id)) - return RO_sdn_id + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -455,14 +569,17 @@ class Lcm: exc = e finally: if exc and db_sdn: - db_sdn["_admin"]["operationalState"] = "ERROR" - db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("sdns", sdn_id, db_sdn) + db_sdn["_admin.operationalState"] = "ERROR" + db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_sdn_update: + self.update_db_2("sdns", sdn_id, db_sdn_update) + self.lcm_tasks.remove("sdn", sdn_id, order_id) async def sdn_delete(self, sdn_id, order_id): logging_text = "Task sdn_delete={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None + db_sdn_update = {} exc = None step = "Getting sdn from db" try: @@ -483,7 +600,7 @@ class Lcm: self.logger.error(logging_text + "Skipping. There is not RO information at database") self.db.del_one("sdns", {"_id": sdn_id}) self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id)) - return None + return except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -493,9 +610,11 @@ class Lcm: exc = e finally: if exc and db_sdn: - db_sdn["_admin"]["operationalState"] = "ERROR" - db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc) - self.update_db("sdns", sdn_id, db_sdn) + db_sdn["_admin.operationalState"] = "ERROR" + db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) + if db_sdn_update: + self.update_db_2("sdns", sdn_id, db_sdn_update) + self.lcm_tasks.remove("sdn", sdn_id, order_id) def vnfd2RO(self, vnfd, new_id=None): """ @@ -628,7 +747,7 @@ class Lcm: if vca_status != "active": all_active = False - elif vca_status in ("error", "blocked"): + if vca_status in ("error", "blocked"): n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status, vca_info["detailed-status"])) @@ -669,7 +788,7 @@ class Lcm: self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format( member_vnf_index, e), exc_info=True) - def ns_params_2_RO(self, ns_params): + def ns_params_2_RO(self, ns_params, nsd, vnfd_dict): """ Creates a RO ns descriptor from OSM ns_instantite params :param ns_params: OSM instantiate params @@ -680,9 +799,8 @@ class Lcm: def vim_account_2_RO(vim_account): if vim_account in vim_2_RO: return vim_2_RO[vim_account] + db_vim = self.db.get_one("vim_accounts", {"_id": vim_account}) - # if db_vim["_admin"]["operationalState"] == "PROCESSING": - # #TODO check if VIM is creating and wait if db_vim["_admin"]["operationalState"] != "ENABLED": raise LcmException("VIM={} is not available. operationalState={}".format( vim_account, db_vim["_admin"]["operationalState"])) @@ -690,6 +808,18 @@ class Lcm: vim_2_RO[vim_account] = RO_vim_id return RO_vim_id + def ip_profile_2_RO(ip_profile): + RO_ip_profile = deepcopy((ip_profile)) + if "dns-server" in RO_ip_profile: + RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server") + if RO_ip_profile.get("ip-version") == "ipv4": + RO_ip_profile["ip-version"] = "IPv4" + if RO_ip_profile.get("ip-version") == "ipv6": + RO_ip_profile["ip-version"] = "IPv6" + if "dhcp-params" in RO_ip_profile: + RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params") + return RO_ip_profile + if not ns_params: return None RO_ns_params = { @@ -703,29 +833,96 @@ class Lcm: if ns_params.get("ssh-authorized-key"): RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]} if ns_params.get("vnf"): - for vnf in ns_params["vnf"]: - RO_vnf = {} - if "vimAccountId" in vnf: - RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"]) + for vnf_params in ns_params["vnf"]: + for constituent_vnfd in nsd["constituent-vnfd"]: + if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]: + vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]] + break + else: + raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:" + "constituent-vnfd".format(vnf_params["member-vnf-index"])) + RO_vnf = {"vdus": {}, "networks": {}} + if vnf_params.get("vimAccountId"): + RO_vnf["datacenter"] = vim_account_2_RO(vnf_params["vimAccountId"]) + if vnf_params.get("vdu"): + for vdu_params in vnf_params["vdu"]: + RO_vnf["vdus"][vdu_params["id"]] = {} + if vdu_params.get("volume"): + RO_vnf["vdus"][vdu_params["id"]]["devices"] = {} + for volume_params in vdu_params["volume"]: + RO_vnf["vdus"][vdu_params["id"]]["devices"][volume_params["name"]] = {} + if volume_params.get("vim-volume-id"): + RO_vnf["vdus"][vdu_params["id"]]["devices"][volume_params["name"]]["vim_id"] = \ + volume_params["vim-volume-id"] + if vdu_params.get("interface"): + RO_vnf["vdus"][vdu_params["id"]]["interfaces"] = {} + for interface_params in vdu_params["interface"]: + RO_interface = {} + RO_vnf["vdus"][vdu_params["id"]]["interfaces"][interface_params["name"]] = RO_interface + if interface_params.get("ip-address"): + RO_interface["ip_address"] = interface_params["ip-address"] + if interface_params.get("mac-address"): + RO_interface["mac_address"] = interface_params["mac-address"] + if interface_params.get("floating-ip-required"): + RO_interface["floating-ip"] = interface_params["floating-ip-required"] + if vnf_params.get("internal-vld"): + for internal_vld_params in vnf_params["internal-vld"]: + RO_vnf["networks"][internal_vld_params["name"]] = {} + if internal_vld_params.get("vim-network-name"): + RO_vnf["networks"][internal_vld_params["name"]]["vim-network-name"] = \ + internal_vld_params["vim-network-name"] + if internal_vld_params.get("ip-profile"): + RO_vnf["networks"][internal_vld_params["name"]]["ip-profile"] = \ + ip_profile_2_RO(internal_vld_params["ip-profile"]) + if internal_vld_params.get("internal-connection-point"): + for icp_params in internal_vld_params["internal-connection-point"]: + # look for interface + iface_found = False + for vdu_descriptor in vnf_descriptor["vdu"]: + for vdu_interface in vdu_descriptor["interface"]: + if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]: + if vdu_descriptor["id"] not in RO_vnf["vdus"]: + RO_vnf["vdus"][vdu_descriptor["id"]] = {} + if "interfaces" not in RO_vnf["vdus"][vdu_descriptor["id"]]: + RO_vnf["vdus"][vdu_descriptor["id"]]["interfaces"] = {} + RO_ifaces = RO_vnf["vdus"][vdu_descriptor["id"]]["interfaces"] + if vdu_interface["name"] not in RO_ifaces: + RO_ifaces[vdu_interface["name"]] = {} + + RO_ifaces[vdu_interface["name"]]["ip_address"] = icp_params["ip-address"] + iface_found = True + break + if iface_found: + break + else: + raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:" + "internal-vld:id-ref={} is not present at vnfd:internal-" + "connection-point".format(vnf_params["member-vnf-index"], + icp_params["id-ref"])) + + if not RO_vnf["vdus"]: + del RO_vnf["vdus"] + if not RO_vnf["networks"]: + del RO_vnf["networks"] if RO_vnf: - RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf + RO_ns_params["vnfs"][vnf_params["member-vnf-index"]] = RO_vnf if ns_params.get("vld"): - for vld in ns_params["vld"]: + for vld_params in ns_params["vld"]: RO_vld = {} - if "ip-profile" in vld: - RO_vld["ip-profile"] = vld["ip-profile"] - if "vim-network-name" in vld: + if "ip-profile" in vld_params: + RO_vld["ip-profile"] = ip_profile_2_RO(vld_params["ip-profile"]) + if "vim-network-name" in vld_params: RO_vld["sites"] = [] - if isinstance(vld["vim-network-name"], dict): - for vim_account, vim_net in vld["vim-network-name"].items(): + if isinstance(vld_params["vim-network-name"], dict): + for vim_account, vim_net in vld_params["vim-network-name"].items(): RO_vld["sites"].append({ "netmap-use": vim_net, "datacenter": vim_account_2_RO(vim_account) }) else: # isinstance str - RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]}) + RO_vld["sites"].append({"netmap-use": vld_params["vim-network-name"]}) if RO_vld: - RO_ns_params["networks"][vld["name"]] = RO_vld + RO_ns_params["networks"][vld_params["name"]] = RO_vld return RO_ns_params def ns_update_vnfr(self, db_vnfrs, ns_RO_info): @@ -862,6 +1059,7 @@ class Lcm: db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + ns_params = db_nsr.get("instantiate_params") nsd = db_nsr["nsd"] nsr_name = db_nsr["name"] # TODO short-name?? needed_vnfd = {} @@ -961,7 +1159,24 @@ class Lcm: if not RO_nsr_id: step = db_nsr_update["detailed-status"] = "Creating ns at RO" # self.logger.debug(logging_text + step) - RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params")) + + # check if VIM is creating and wait look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"]) + if task_dependency: + step = "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + await asyncio.wait(task_dependency, timeout=3600) + if ns_params.get("vnf"): + for vnf in ns_params["vnf"]: + if "vimAccountId" in vnf: + task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", + vnf["vimAccountId"]) + if task_dependency: + step = "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + await asyncio.wait(task_dependency, timeout=3600) + + RO_ns_params = self.ns_params_2_RO(ns_params, nsd, needed_vnfd) desc = await RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid) @@ -1092,7 +1307,7 @@ class Lcm: ) task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None, db_nsr, db_nslcmop, vnf_index)) - self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + vnf_index, task) step = "Looking for needed vnfd to configure" self.logger.debug(logging_text + step) @@ -1184,6 +1399,7 @@ class Lcm: self.update_db_2("nsrs", nsr_id, db_nsr_update) if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") async def ns_terminate(self, nsr_id, nslcmop_id): logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) @@ -1376,6 +1592,7 @@ class Lcm: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") async def _ns_execute_primitive(self, db_deployed, member_vnf_index, primitive, primitive_params): vca_deployed = db_deployed["VCA"].get(member_vnf_index) @@ -1469,6 +1686,7 @@ class Lcm: } if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action") async def ns_scale(self, nsr_id, nslcmop_id): logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) @@ -1485,12 +1703,14 @@ class Lcm: step = "Getting nsr from database" db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) step = "Parsing scaling parameters" + db_nsr_update["operational-status"] = "scaling" + self.update_db_2("nsrs", nsr_id, db_nsr_update) nsr_lcm = db_nsr["_admin"].get("deployed") RO_nsr_id = nsr_lcm["RO"]["nsr_id"] vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"] scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"] scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"] - scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy") + # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy") step = "Getting vnfr from database" db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}) @@ -1503,11 +1723,11 @@ class Lcm: else: raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present " "at vnfd:scaling-group-descriptor".format(scaling_group)) - cooldown_time = 0 - for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()): - cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0) - if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"): - break + # cooldown_time = 0 + # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()): + # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0) + # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"): + # break # TODO check if ns is in a proper status step = "Sending scale order to RO" @@ -1720,11 +1940,12 @@ class Lcm: db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["detailed-status"] = "done" + db_nsr_update["operational-status"] = "running" try: await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - if cooldown_time: - await asyncio.sleep(cooldown_time) - await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + # if cooldown_time: + # await asyncio.sleep(cooldown_time) + # await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) except Exception as e: self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) self.logger.debug(logging_text + "Exit Ok") @@ -1740,44 +1961,24 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) finally: if exc: - db_nsr_update = None if db_nslcmop: db_nslcmop_update = { "detailed-status": "FAILED {}: {}".format(step, exc), "operationState": "FAILED", "statusEnteredTime": time(), } + if db_nsr: + db_nsr_update["operational-status"] = "FAILED {}: {}".format(step, exc), + db_nsr_update["detailed-status"] = "failed" if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) - def cancel_tasks(self, topic, _id): - """ - Cancel all active tasks of a concrete nsr or vim identified for _id - :param topic: can be ns or vim_account - :param _id: nsr or vim identity - :return: None, or raises an exception if not possible - """ - if topic == "ns": - lcm_tasks = self.lcm_ns_tasks - elif topic == "vim_account": - lcm_tasks = self.lcm_vim_tasks - elif topic == "sdn": - lcm_tasks = self.lcm_sdn_tasks - - if not lcm_tasks.get(_id): - return - for order_id, tasks_set in lcm_tasks[_id].items(): - for task_name, task in tasks_set.items(): - result = task.cancel() - if result: - self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name)) - lcm_tasks[_id] = {} - async def kafka_ping(self): self.logger.debug("Task kafka_ping Enter") consecutive_errors = 0 @@ -1850,20 +2051,16 @@ class Lcm: nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id)) - if nsr_id not in self.lcm_ns_tasks: - self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task} + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) continue elif command == "terminate": # self.logger.debug("Deleting NS {}".format(nsr_id)) nslcmop = params nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] - self.cancel_tasks(topic, nsr_id) + self.lcm_tasks.cancel(topic, nsr_id) task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id)) - if nsr_id not in self.lcm_ns_tasks: - self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task} + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task) continue elif command == "action": # self.logger.debug("Update NS {}".format(nsr_id)) @@ -1871,9 +2068,7 @@ class Lcm: nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id)) - if nsr_id not in self.lcm_ns_tasks: - self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task} + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) continue elif command == "scale": # self.logger.debug("Update NS {}".format(nsr_id)) @@ -1881,9 +2076,7 @@ class Lcm: nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] task = asyncio.ensure_future(self.ns_scale(nsr_id, nslcmop_id)) - if nsr_id not in self.lcm_ns_tasks: - self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_scale": task} + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) continue elif command == "show": try: @@ -1899,20 +2092,18 @@ class Lcm: continue elif command == "deleted": continue # TODO cleaning of task just in case should be done + elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" + continue elif topic == "vim_account": vim_id = params["_id"] if command == "create": task = asyncio.ensure_future(self.vim_create(params, order_id)) - if vim_id not in self.lcm_vim_tasks: - self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task} + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task) continue elif command == "delete": - self.cancel_tasks(topic, vim_id) + self.lcm_tasks.cancel(topic, vim_id) task = asyncio.ensure_future(self.vim_delete(vim_id, order_id)) - if vim_id not in self.lcm_vim_tasks: - self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task} + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task) continue elif command == "show": print("not implemented show with vim_account") @@ -1920,30 +2111,22 @@ class Lcm: continue elif command == "edit": task = asyncio.ensure_future(self.vim_edit(params, order_id)) - if vim_id not in self.lcm_vim_tasks: - self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task} + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task) continue elif topic == "sdn": _sdn_id = params["_id"] if command == "create": task = asyncio.ensure_future(self.sdn_create(params, order_id)) - if _sdn_id not in self.lcm_sdn_tasks: - self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task} + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task) continue elif command == "delete": - self.cancel_tasks(topic, _sdn_id) + self.lcm_tasks.cancel(topic, _sdn_id) task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id)) - if _sdn_id not in self.lcm_sdn_tasks: - self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task} + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task) continue elif command == "edit": task = asyncio.ensure_future(self.sdn_edit(params, order_id)) - if _sdn_id not in self.lcm_sdn_tasks: - self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task} + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task) continue self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) except Exception as e: @@ -1972,14 +2155,14 @@ class Lcm: )) # TODO # self.logger.debug("Terminating cancelling creation tasks") - # self.cancel_tasks("ALL", "create") + # self.lcm_tasks.cancel("ALL", "create") # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") # await asyncio.sleep(2, loop=self.loop) # timeout -= 2 # if not timeout: - # self.cancel_tasks("ALL", "ALL") + # self.lcm_tasks.cancel("ALL", "ALL") self.loop.close() self.loop = None if self.db: