From e876f6786da64cb81571ab37a25fb9f2cf61d359 Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 13 Feb 2020 14:34:48 +0000 Subject: [PATCH] fix terminate primitives. Enhance detailed-status report enhance KDU error reporting Change-Id: Ia07ef61785f6a6e577ffe4842da9bb3a63de6261 Signed-off-by: tierno --- osm_lcm/ns.py | 1709 +++++++++++++------------- osm_lcm/tests/test_db_descriptors.py | 4 + osm_lcm/tests/test_ns.py | 65 +- 3 files changed, 948 insertions(+), 830 deletions(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 8705823..a7affce 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -46,12 +46,14 @@ __author__ = "Alfonso Tierno" class NsLcm(LcmBase): timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns + timeout_ns_terminate = 1800 # default global timeout for un deployment a ns timeout_charm_delete = 10 * 60 timeout_primitive = 10 * 60 # timeout for primitive execution SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 SUBOPERATION_STATUS_SKIP = -3 + task_name_deploy_vca = "Deploy VCA" def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ @@ -600,6 +602,21 @@ class NsLcm(LcmBase): else: raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"])) + def set_vnfr_at_error(self, db_vnfrs, error_text): + try: + for db_vnfr in db_vnfrs.values(): + vnfr_update = {"status": "ERROR"} + for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")): + if "status" not in vdur: + vdur["status"] = "ERROR" + vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR" + if error_text: + vdur["status-detailed"] = str(error_text) + vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR" + self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) + except DbException as e: + self.logger.error("Cannot update vnf. {}".format(e)) + def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO): """ Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated @@ -725,197 +742,223 @@ class NsLcm(LcmBase): primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []}) return primitive_list - async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, - db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list): + async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, + n2vc_key_list, stage): + try: + db_nsr_update = {} + RO_descriptor_number = 0 # number of descriptors created at RO + vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO + nslcmop_id = db_nslcmop["_id"] + start_deploy = time() + ns_params = db_nslcmop.get("operationParams") + if ns_params and ns_params.get("timeout_ns_deploy"): + timeout_ns_deploy = ns_params["timeout_ns_deploy"] + else: + timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy) - db_nsr_update = {} - RO_descriptor_number = 0 # number of descriptors created at RO - vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO - start_deploy = time() - ns_params = db_nslcmop.get("operationParams") - if ns_params and ns_params.get("timeout_ns_deploy"): - timeout_ns_deploy = ns_params["timeout_ns_deploy"] - else: - timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy) + # Check for and optionally request placement optimization. Database will be updated if placement activated + stage[2] = "Waiting for Placement." + await self.do_placement(logging_text, db_nslcmop, db_vnfrs) - # Check for and optionally request placement optimization. Database will be updated if placement activated - await self.do_placement(logging_text, db_nslcmop, db_vnfrs) + # deploy RO - # deploy RO + # get vnfds, instantiate at RO + for c_vnf in nsd.get("constituent-vnfd", ()): + member_vnf_index = c_vnf["member-vnf-index"] + vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']] + vnfd_ref = vnfd["id"] - # get vnfds, instantiate at RO + stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index) + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) - for c_vnf in nsd.get("constituent-vnfd", ()): - member_vnf_index = c_vnf["member-vnf-index"] - vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']] - vnfd_ref = vnfd["id"] - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \ - " RO".format(vnfd_ref, member_vnf_index) - # self.logger.debug(logging_text + step) - vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23]) - vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO - RO_descriptor_number += 1 + # self.logger.debug(logging_text + stage[2]) + vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23]) + vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO + RO_descriptor_number += 1 - # look position at deployed.RO.vnfd if not present it will be appended at the end - for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]): - if vnf_deployed["member-vnf-index"] == member_vnf_index: - break - else: - index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]) - db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None) - - # look if present - RO_update = {"member-vnf-index": member_vnf_index} - vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO}) - if vnfd_list: - RO_update["id"] = vnfd_list[0]["uuid"] - self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}". - format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"])) - else: - vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]]. - get("additionalParamsForVnf"), nsr_id) - desc = await self.RO.create("vnfd", descriptor=vnfd_RO) - RO_update["id"] = desc["uuid"] - self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format( - vnfd_ref, member_vnf_index, desc["uuid"])) - db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update - db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update + # look position at deployed.RO.vnfd if not present it will be appended at the end + for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]): + if vnf_deployed["member-vnf-index"] == member_vnf_index: + break + else: + index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]) + db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None) + + # look if present + RO_update = {"member-vnf-index": member_vnf_index} + vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO}) + if vnfd_list: + RO_update["id"] = vnfd_list[0]["uuid"] + self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}". + format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"])) + else: + vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]]. + get("additionalParamsForVnf"), nsr_id) + desc = await self.RO.create("vnfd", descriptor=vnfd_RO) + RO_update["id"] = desc["uuid"] + self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format( + vnfd_ref, member_vnf_index, desc["uuid"])) + db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update + db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update + + # create nsd at RO + nsd_ref = nsd["id"] + + stage[2] = "Creating nsd={} at RO".format(nsd_ref) + db_nsr_update["detailed-status"] = " ".join(stage) self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) - # create nsd at RO - nsd_ref = nsd["id"] - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref) - # self.logger.debug(logging_text + step) - - RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23]) - RO_descriptor_number += 1 - nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id}) - if nsd_list: - db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"] - self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format( - nsd_ref, RO_nsd_uuid)) - else: - nsd_RO = deepcopy(nsd) - nsd_RO["id"] = RO_osm_nsd_id - nsd_RO.pop("_id", None) - nsd_RO.pop("_admin", None) - for c_vnf in nsd_RO.get("constituent-vnfd", ()): - member_vnf_index = c_vnf["member-vnf-index"] - c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index] - for c_vld in nsd_RO.get("vld", ()): - for cp in c_vld.get("vnfd-connection-point-ref", ()): - member_vnf_index = cp["member-vnf-index-ref"] - cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index] - - desc = await self.RO.create("nsd", descriptor=nsd_RO) - db_nsr_update["_admin.nsState"] = "INSTANTIATED" - db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"] - self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid)) - self.update_db_2("nsrs", nsr_id, db_nsr_update) + # self.logger.debug(logging_text + stage[2]) + RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23]) + RO_descriptor_number += 1 + nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id}) + if nsd_list: + db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"] + self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format( + nsd_ref, RO_nsd_uuid)) + else: + nsd_RO = deepcopy(nsd) + nsd_RO["id"] = RO_osm_nsd_id + nsd_RO.pop("_id", None) + nsd_RO.pop("_admin", None) + for c_vnf in nsd_RO.get("constituent-vnfd", ()): + member_vnf_index = c_vnf["member-vnf-index"] + c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index] + for c_vld in nsd_RO.get("vld", ()): + for cp in c_vld.get("vnfd-connection-point-ref", ()): + member_vnf_index = cp["member-vnf-index-ref"] + cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index] + + desc = await self.RO.create("nsd", descriptor=nsd_RO) + db_nsr_update["_admin.nsState"] = "INSTANTIATED" + db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"] + self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid)) + self.update_db_2("nsrs", nsr_id, db_nsr_update) - # Crate ns at RO - # if present use it unless in error status - RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id")) - if RO_nsr_id: - try: - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO" - # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id)) - desc = await self.RO.show("ns", RO_nsr_id) + # Crate ns at RO + stage[2] = "Creating nsd={} at RO".format(nsd_ref) + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) - except ROclient.ROClientException as e: - if e.http_code != HTTPStatus.NOT_FOUND: - raise - RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None + # if present use it unless in error status + RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id")) if RO_nsr_id: - ns_status, ns_status_info = self.RO.check_ns_status(desc) - db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status - if ns_status == "ERROR": - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\ - .format(RO_nsr_id) - self.logger.debug(logging_text + step) - await self.RO.delete("ns", RO_nsr_id) + try: + stage[2] = "Looking for existing ns at RO" + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id)) + desc = await self.RO.show("ns", RO_nsr_id) + + except ROclient.ROClientException as e: + if e.http_code != HTTPStatus.NOT_FOUND: + raise RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None - if not RO_nsr_id: - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies" - # self.logger.debug(logging_text + step) - - # check if VIM is creating and wait look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"]) - if task_dependency: - step = "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - await asyncio.wait(task_dependency, timeout=3600) - if ns_params.get("vnf"): - for vnf in ns_params["vnf"]: - if "vimAccountId" in vnf: - task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", - vnf["vimAccountId"]) - if task_dependency: - step = "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - await asyncio.wait(task_dependency, timeout=3600) - - step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters" - - RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list) - - step = db_nsr_update["detailed-status"] = "Deploying ns at VIM" - # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM" - desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid) - RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"] - db_nsr_update["_admin.nsState"] = "INSTANTIATED" - db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD" - self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"])) - self.update_db_2("nsrs", nsr_id, db_nsr_update) + if RO_nsr_id: + ns_status, ns_status_info = self.RO.check_ns_status(desc) + db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status + if ns_status == "ERROR": + stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id) + self.logger.debug(logging_text + stage[2]) + await self.RO.delete("ns", RO_nsr_id) + RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None + if not RO_nsr_id: + stage[2] = "Checking dependencies" + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + # self.logger.debug(logging_text + stage[2]) + + # check if VIM is creating and wait look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"]) + if task_dependency: + stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name) + self.logger.debug(logging_text + stage[2]) + await asyncio.wait(task_dependency, timeout=3600) + if ns_params.get("vnf"): + for vnf in ns_params["vnf"]: + if "vimAccountId" in vnf: + task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", + vnf["vimAccountId"]) + if task_dependency: + stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name) + self.logger.debug(logging_text + stage[2]) + await asyncio.wait(task_dependency, timeout=3600) + + stage[2] = "Checking instantiation parameters." + RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list) + stage[2] = "Deploying ns at VIM." + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) - # wait until NS is ready - step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id) - detailed_status_old = None - self.logger.debug(logging_text + step) - - old_desc = None - while time() <= start_deploy + timeout_ns_deploy: - desc = await self.RO.show("ns", RO_nsr_id) - - # deploymentStatus - if desc != old_desc: - # desc has changed => update db - self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) - old_desc = desc - - ns_status, ns_status_info = self.RO.check_ns_status(desc) - db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - detailed_status = ns_status_detailed + "; {}".format(ns_status_info) - elif ns_status == "ACTIVE": - step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs" - try: - self.ns_update_vnfr(db_vnfrs, desc) - break - except LcmExceptionNoMgmtIP: - pass - else: - assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) - if detailed_status != detailed_status_old: - detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status - self.update_db_2("nsrs", nsr_id, db_nsr_update) - await asyncio.sleep(5, loop=self.loop) - else: # timeout_ns_deploy - raise ROclient.ROClientException("Timeout waiting ns to be ready") + desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid) + RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"] + db_nsr_update["_admin.nsState"] = "INSTANTIATED" + db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD" + self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"])) - step = "Updating NSR" - self.ns_update_nsr(db_nsr_update, db_nsr, desc) + # wait until NS is ready + stage[2] = "Waiting VIM to deploy ns." + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + detailed_status_old = None + self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id)) - db_nsr_update["_admin.deployed.RO.operational-status"] = "running" - db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM" - db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM" - self.update_db_2("nsrs", nsr_id, db_nsr_update) - # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update) + old_desc = None + while time() <= start_deploy + timeout_ns_deploy: + desc = await self.RO.show("ns", RO_nsr_id) - step = "Deployed at VIM" - self.logger.debug(logging_text + step) + # deploymentStatus + if desc != old_desc: + # desc has changed => update db + self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) + old_desc = desc + + ns_status, ns_status_info = self.RO.check_ns_status(desc) + db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + stage[2] = "VIM: ({})".format(ns_status_info) + elif ns_status == "ACTIVE": + stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs." + try: + self.ns_update_vnfr(db_vnfrs, desc) + break + except LcmExceptionNoMgmtIP: + pass + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + if stage[2] != detailed_status_old: + detailed_status_old = stage[2] + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + await asyncio.sleep(5, loop=self.loop) + else: # timeout_ns_deploy + raise ROclient.ROClientException("Timeout waiting ns to be ready") + + # Updating NSR + self.ns_update_nsr(db_nsr_update, db_nsr, desc) + + db_nsr_update["_admin.deployed.RO.operational-status"] = "running" + # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM" + stage[2] = "Deployed at VIM" + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update) + # self.logger.debug(logging_text + "Deployed at VIM") + except (ROclient.ROClientException, LcmException, DbException) as e: + self.set_vnfr_at_error(db_vnfrs, str(e)) + raise async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None): """ @@ -944,18 +987,14 @@ class NsLcm(LcmBase): raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)) await asyncio.sleep(10, loop=self.loop) - # wait until NS is deployed at RO - if not ro_nsr_id: - db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id}) - ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id")) - if not ro_nsr_id: - continue # get ip address if not target_vdu_id: db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) if not vdu_id: # for the VNF case + if db_vnfr.get("status") == "ERROR": + raise LcmException("Cannot inject ssh-key because target VNF is in error state") ip_address = db_vnfr.get("ip-address") if not ip_address: continue @@ -983,6 +1022,13 @@ class NsLcm(LcmBase): # inject public key into machine if pub_key and user: + # wait until NS is deployed at RO + if not ro_nsr_id: + db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id}) + ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id")) + if not ro_nsr_id: + continue + # self.logger.debug(logging_text + "Inserting RO key") if vdur.get("pdu-type"): self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU") @@ -1050,8 +1096,8 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") - async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, - vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id): + async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, + config_descriptor, deploy_params, base_folder, nslcmop_id, stage): nsr_id = db_nsr["_id"] db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] @@ -1237,13 +1283,13 @@ class NsLcm(LcmBase): my_vca = vca_deployed_list[vca_index] if my_vca.get("vdu_id") or my_vca.get("kdu_name"): # VDU or KDU - stage = 'Stage 3/5: running Day-1 primitives for VDU' + stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.' elif my_vca.get("member-vnf-index"): # VNF - stage = 'Stage 4/5: running Day-1 primitives for VNF' + stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.' else: # NS - stage = 'Stage 5/5: running Day-1 primitives for NS' + stage[0] = 'Stage 5/5: running Day-1 primitives for NS.' self._write_configuration_status( nsr_id=nsr_id, @@ -1256,6 +1302,7 @@ class NsLcm(LcmBase): stage=stage ) + check_if_terminated_needed = True for initial_config_primitive in initial_config_primitive_list: # adding information on the vca_deployed if it is a NS execution environment if not vca_deployed["member-vnf-index"]: @@ -1271,6 +1318,11 @@ class NsLcm(LcmBase): params_dict=primitive_params_, db_dict=db_dict ) + # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives + if check_if_terminated_needed: + if config_descriptor.get('terminate-config-primitive'): + self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True}) + check_if_terminated_needed = False # TODO register in database that primitive is done @@ -1285,36 +1337,60 @@ class NsLcm(LcmBase): except Exception as e: # TODO not use Exception but N2VC exception # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"}) + if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)): + self.logger.error("Exception while {} : {}".format(step, e), exc_info=True) self._write_configuration_status( nsr_id=nsr_id, vca_index=vca_index, status='BROKEN' ) - raise Exception("{} {}".format(step, e)) from e - # TODO raise N2VC exception with 'step' extra information + raise LcmException("{} {}".format(step, e)) from e def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str, - error_description: str = None): + error_description: str = None, other_update: dict = None): + """ + Update db_nsr fields. + :param nsr_id: + :param ns_state: + :param current_operation: + :param current_operation_id: + :param error_description: + :param other_update: Other required changes at database if provided, will be cleared + :return: + """ try: - db_dict = dict() - if ns_state: - db_dict["nsState"] = ns_state + db_dict = other_update or {} + db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility + db_dict["_admin.current-operation"] = current_operation_id + db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None db_dict["currentOperation"] = current_operation db_dict["currentOperationID"] = current_operation_id db_dict["errorDescription"] = error_description + + if ns_state: + db_dict["nsState"] = ns_state self.update_db_2("nsrs", nsr_id, db_dict) - except Exception as e: + except DbException as e: self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e)) - def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0): + def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0, + operation_state: str = None, other_update: dict = None): try: - db_dict = dict() + db_dict = other_update or {} db_dict['queuePosition'] = queuePosition - db_dict['stage'] = stage - if error_message: + if isinstance(stage, list): + db_dict['stage'] = stage[0] + db_dict['detailed-status'] = " ".join(stage) + elif stage is not None: + db_dict['stage'] = str(stage) + + if error_message is not None: db_dict['errorMessage'] = error_message + if operation_state is not None: + db_dict['operationState'] = operation_state + db_dict["statusEnteredTime"] = time() self.update_db_2("nslcmops", op_id, db_dict) - except Exception as e: + except DbException as e: self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e)) def _write_all_config_status(self, nsr_id: str, status: str): @@ -1332,7 +1408,7 @@ class NsLcm(LcmBase): db_dict['configurationStatus'].append(c) self.update_db_2("nsrs", nsr_id, db_dict) - except Exception as e: + except DbException as e: self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e)) def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None, @@ -1351,7 +1427,7 @@ class NsLcm(LcmBase): if element_type: db_dict[db_path + 'elementType'] = element_type self.update_db_2("nsrs", nsr_id, db_dict) - except Exception as e: + except DbException as e: self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}' .format(status, nsr_id, vca_index, e)) @@ -1412,37 +1488,42 @@ class NsLcm(LcmBase): db_nslcmop = None # update operation on nsrs - db_nsr_update = {"_admin.nslcmop": nslcmop_id, - "_admin.current-operation": nslcmop_id, - "_admin.operation-type": "instantiate"} - self.update_db_2("nsrs", nsr_id, db_nsr_update) - + db_nsr_update = {} # update operation on nslcmops db_nslcmop_update = {} nslcmop_operation_state = None db_vnfrs = {} # vnf's info indexed by member-index # n2vc_info = {} - task_instantiation_list = [] - task_instantiation_info = {} # from task to info text + tasks_dict_info = {} # from task to info text exc = None + error_list = [] + stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""] + # ^ stage, step, VIM progress try: # wait for any previous tasks in process - step = "Waiting for previous operations to terminate" await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds) - + stage[1] = "Reading from database," # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id + db_nsr_update["detailed-status"] = "creating" + db_nsr_update["operational-status"] = "init" self._write_ns_status( nsr_id=nsr_id, ns_state="BUILDING", current_operation="INSTANTIATING", - current_operation_id=nslcmop_id + current_operation_id=nslcmop_id, + other_update=db_nsr_update + ) + self._write_op_status( + op_id=nslcmop_id, + stage=stage, + queuePosition=0 ) # read from db: operation - step = "Getting nslcmop={} from db".format(nslcmop_id) + stage[1] = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) ns_params = db_nslcmop.get("operationParams") if ns_params and ns_params.get("timeout_ns_deploy"): @@ -1451,15 +1532,15 @@ class NsLcm(LcmBase): timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy) # read from db: ns - step = "Getting nsr={} from db".format(nsr_id) + stage[1] = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) # nsd is replicated into ns (no db read) nsd = db_nsr["nsd"] # nsr_name = db_nsr["name"] # TODO short-name?? # read from db: vnf's of this ns - step = "Getting vnfrs from db" - self.logger.debug(logging_text + step) + stage[1] = "Getting vnfrs from db" + self.logger.debug(logging_text + stage[1]) db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) # read from db: vnfd's for every vnf @@ -1467,12 +1548,6 @@ class NsLcm(LcmBase): db_vnfds = {} # every vnfd data indexed by vnf id db_vnfds_index = {} # every vnfd data indexed by vnf member-index - self._write_op_status( - op_id=nslcmop_id, - stage='Stage 1/5: preparation of the environment', - queuePosition=0 - ) - # for each vnf in ns, read vnfd for vnfr in db_vnfrs_list: db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc @@ -1481,8 +1556,8 @@ class NsLcm(LcmBase): # if we haven't this vnfd, read it from db if vnfd_id not in db_vnfds: # read from db - step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref) - self.logger.debug(logging_text + step) + stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref) + self.logger.debug(logging_text + stage[1]) vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) # store vnfd @@ -1507,9 +1582,6 @@ class NsLcm(LcmBase): db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list) - db_nsr_update["detailed-status"] = "creating" - db_nsr_update["operational-status"] = "init" - if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list): populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), []) db_nsr_update["_admin.deployed.RO.vnfd"] = [] @@ -1519,26 +1591,25 @@ class NsLcm(LcmBase): self.update_db_2("nsrs", nsr_id, db_nsr_update) # n2vc_redesign STEP 2 Deploy Network Scenario - + stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.' self._write_op_status( op_id=nslcmop_id, - stage='Stage 2/5: deployment of VMs and execution environments' + stage=stage ) - self.logger.debug(logging_text + "Before deploy_kdus") + stage[1] = "Deploying KDUs," + # self.logger.debug(logging_text + "Before deploy_kdus") # Call to deploy_kdus in case exists the "vdu:kdu" param - task_kdu = asyncio.ensure_future( - self.deploy_kdus( - logging_text=logging_text, - nsr_id=nsr_id, - db_nsr=db_nsr, - db_vnfrs=db_vnfrs, - db_vnfds=db_vnfds - ) + await self.deploy_kdus( + logging_text=logging_text, + nsr_id=nsr_id, + nslcmop_id=nslcmop_id, + db_vnfrs=db_vnfrs, + db_vnfds=db_vnfds, + task_instantiation_info=tasks_dict_info, ) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu) - task_instantiation_info[task_kdu] = "Deploy KDUs" - task_instantiation_list.append(task_kdu) + + stage[1] = "Getting VCA public key." # n2vc_redesign STEP 1 Get VCA public ssh-key # feature 1429. Add n2vc public key to needed VMs n2vc_key = self.n2vc.get_public_key() @@ -1546,6 +1617,7 @@ class NsLcm(LcmBase): if self.vca_config.get("public_key"): n2vc_key_list.append(self.vca_config["public_key"]) + stage[1] = "Deploying NS at VIM." task_ro = asyncio.ensure_future( self.instantiate_RO( logging_text=logging_text, @@ -1555,16 +1627,16 @@ class NsLcm(LcmBase): db_nslcmop=db_nslcmop, db_vnfrs=db_vnfrs, db_vnfds_ref=db_vnfds_ref, - n2vc_key_list=n2vc_key_list + n2vc_key_list=n2vc_key_list, + stage=stage ) ) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro) - task_instantiation_info[task_ro] = "Deploy at VIM" - task_instantiation_list.append(task_ro) + tasks_dict_info[task_ro] = "Deploy at VIM" # n2vc_redesign STEP 3 to 6 Deploy N2VC - step = "Deploying proxy and native charms" - self.logger.debug(logging_text + step) + stage[1] = "Deploying Execution Environments." + self.logger.debug(logging_text + stage[1]) nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI # get_iterable() returns a value from a dict or empty tuple if key does not exist @@ -1602,8 +1674,8 @@ class NsLcm(LcmBase): deploy_params=deploy_params, descriptor_config=descriptor_config, base_folder=base_folder, - task_instantiation_list=task_instantiation_list, - task_instantiation_info=task_instantiation_info + task_instantiation_info=tasks_dict_info, + stage=stage ) # Deploy charms for each VDU that supports one. @@ -1645,8 +1717,7 @@ class NsLcm(LcmBase): deploy_params=deploy_params_vdu, descriptor_config=descriptor_config, base_folder=base_folder, - task_instantiation_list=task_instantiation_list, - task_instantiation_info=task_instantiation_info + task_instantiation_info=tasks_dict_info ) for kdud in get_iterable(vnfd, 'kdu'): kdu_name = kdud["name"] @@ -1681,8 +1752,7 @@ class NsLcm(LcmBase): deploy_params=deploy_params, descriptor_config=descriptor_config, base_folder=base_folder, - task_instantiation_list=task_instantiation_list, - task_instantiation_info=task_instantiation_info + task_instantiation_info=tasks_dict_info ) # Check if this NS has a charm configuration @@ -1717,138 +1787,85 @@ class NsLcm(LcmBase): deploy_params=deploy_params, descriptor_config=descriptor_config, base_folder=base_folder, - task_instantiation_list=task_instantiation_list, - task_instantiation_info=task_instantiation_info + task_instantiation_info=tasks_dict_info ) - # Wait until all tasks of "task_instantiation_list" have been finished - - error_text_list = [] - - # let's begin with all OK - instantiated_ok = True - # let's begin with RO 'running' status (later we can change it) - db_nsr_update["operational-status"] = "running" - # let's begin with VCA 'configured' status (later we can change it) - db_nsr_update["config-status"] = "configured" - - step = "Waiting for tasks to be finished" - if task_instantiation_list: - # wait for all tasks completion - done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy) + # rest of staff will be done at finally - for task in pending: - instantiated_ok = False - if task in (task_ro, task_kdu): - # RO or KDU task is pending - db_nsr_update["operational-status"] = "failed" - else: - # A N2VC task is pending - db_nsr_update["config-status"] = "failed" - self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout") - error_text_list.append(task_instantiation_info[task] + ": Timeout") - for task in done: - if task.cancelled(): - instantiated_ok = False - if task in (task_ro, task_kdu): - # RO or KDU task was cancelled - db_nsr_update["operational-status"] = "failed" - else: - # A N2VC was cancelled - db_nsr_update["config-status"] = "failed" - self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled") - error_text_list.append(task_instantiation_info[task] + ": Cancelled") - else: - exc = task.exception() - if exc: - instantiated_ok = False - if task in (task_ro, task_kdu): - # RO or KDU task raised an exception - db_nsr_update["operational-status"] = "failed" - else: - # A N2VC task raised an exception - db_nsr_update["config-status"] = "failed" - self.logger.error(logging_text + task_instantiation_info[task] + ": Failed") - - if isinstance(exc, (N2VCException, ROclient.ROClientException)): - error_text_list.append(task_instantiation_info[task] + ": {}".format(exc)) - else: - exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__)) - self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback) - error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback) - else: - self.logger.debug(logging_text + task_instantiation_info[task] + ": Done") - - if error_text_list: - error_text = "\n".join(error_text_list) - db_nsr_update["detailed-status"] = error_text - db_nslcmop_update["detailed-status"] = error_text - db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" - db_nslcmop_update["statusEnteredTime"] = time() - else: - # all is done - db_nsr_update["detailed-status"] = "done" - db_nslcmop_update["detailed-status"] = "done" - db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" - db_nslcmop_update["statusEnteredTime"] = time() - - except (ROclient.ROClientException, DbException, LcmException) as e: - self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e: + self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e)) exc = e except asyncio.CancelledError: - self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1])) exc = "Operation was cancelled" except Exception as e: exc = traceback.format_exc() - self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), - exc_info=True) + self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True) finally: if exc: - instantiated_ok = False - if db_nsr: - db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc) - db_nsr_update["operational-status"] = "failed" - db_nsr_update["config-status"] = "failed" - if db_nslcmop: - db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" - db_nslcmop_update["statusEnteredTime"] = time() + error_list.append(str(exc)) try: - if db_nsr: - db_nsr_update["_admin.nslcmop"] = None - db_nsr_update["_admin.current-operation"] = None - db_nsr_update["_admin.operation-type"] = None - self.update_db_2("nsrs", nsr_id, db_nsr_update) - - # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None - ns_state = None - error_description = None - if instantiated_ok: - ns_state = "READY" + # wait for pending tasks + if tasks_dict_info: + stage[1] = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage[1]) + error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy, + stage, nslcmop_id, nsr_id=nsr_id) + stage[1] = stage[2] = "" + except asyncio.CancelledError: + error_list.append("Cancelled") + # TODO cancel all tasks + except Exception as exc: + error_list.append(str(exc)) + + # update operation-status + db_nsr_update["operational-status"] = "running" + # let's begin with VCA 'configured' status (later we can change it) + db_nsr_update["config-status"] = "configured" + for task, task_name in tasks_dict_info.items(): + if not task.done() or task.cancelled() or task.exception(): + if task_name.startswith(self.task_name_deploy_vca): + # A N2VC task is pending + db_nsr_update["config-status"] = "failed" else: - ns_state = "BROKEN" - error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step) - - self._write_ns_status( - nsr_id=nsr_id, - ns_state=ns_state, - current_operation="IDLE", - current_operation_id=None, - error_description=error_description - ) - - self._write_op_status( - op_id=nslcmop_id, - error_message=error_description - ) - - if db_nslcmop_update: - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + # RO or KDU task is pending + db_nsr_update["operational-status"] = "failed" - self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok)) + # update status at database + if error_list: + error_detail = "; ".join(error_list) + self.logger.error(logging_text + error_detail) + error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail) + error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, error_description_nslcmop) + + db_nsr_update["detailed-status"] = error_description_nsr + db_nslcmop_update["detailed-status"] = error_detail + nslcmop_operation_state = "FAILED" + ns_state = "BROKEN" + else: + error_description_nsr = error_description_nslcmop = None + ns_state = "READY" + db_nsr_update["detailed-status"] = "Done" + db_nslcmop_update["detailed-status"] = "Done" + nslcmop_operation_state = "COMPLETED" - except DbException as e: - self.logger.error(logging_text + "Cannot update database: {}".format(e)) + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=ns_state, + current_operation="IDLE", + current_operation_id=None, + error_description=error_description_nsr, + other_update=db_nsr_update + ) + if db_nslcmop: + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) if nslcmop_operation_state: try: @@ -2021,11 +2038,9 @@ class NsLcm(LcmBase): self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e)) return False - async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds): + async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor - deployed_ok = True - k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}} def _get_cluster_id(cluster_id, cluster_type): @@ -2043,84 +2058,71 @@ class NsLcm(LcmBase): return k8s_id logging_text += "Deploy kdus: " + step = "" try: db_nsr_update = {"_admin.deployed.K8s": []} self.update_db_2("nsrs", nsr_id, db_nsr_update) - # Look for all vnfds - pending_tasks = {} index = 0 + updated_cluster_list = [] + for vnfr_data in db_vnfrs.values(): for kdur in get_iterable(vnfr_data, "kdur"): desc_params = self._format_additional_params(kdur.get("additionalParams")) - kdumodel = None - k8sclustertype = None - error_text = None - cluster_uuid = None vnfd_id = vnfr_data.get('vnfd-id') pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir')) if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] - k8sclustertype = "chart" - k8sclustertype_full = "helm-chart" + k8sclustertype = "helm-chart" elif kdur.get("juju-bundle"): kdumodel = kdur["juju-bundle"] - k8sclustertype = "juju" - k8sclustertype_full = "juju-bundle" + k8sclustertype = "juju-bundle" else: - error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \ - " running" + raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor " + "juju-bundle. Maybe an old NBI version is running". + format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"])) # check if kdumodel is a file and exists try: # path format: /vnfdid/pkkdir/kdumodel - filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel) + filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel) if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'): kdumodel = self.fs.path + filename - except Exception: - # it is not a file + except asyncio.CancelledError: + raise + except Exception: # it is not a file pass - step = "Prepare instantiate KDU {} in k8s cluster {}".format( - kdur["kdu-name"], kdur["k8s-cluster"]["id"]) - - try: - if not error_text: - cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full) - - updated_cluster_list = [] - if k8sclustertype == "chart" and cluster_uuid not in updated_cluster_list: - del_repo_list, added_repo_dict = await asyncio.ensure_future( - self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) - if del_repo_list or added_repo_dict: - unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list} - updated = {'_admin.helm_charts_added.' + - item: name for item, name in added_repo_dict.items()} - self.logger.debug(logging_text + "repos synchronized, to_delete: {}, to_add: {}". - format(del_repo_list, added_repo_dict)) - self.db.set_one("k8sclusters", {"_id": kdur["k8s-cluster"]["id"]}, - updated, unset=unset) - updated_cluster_list.append(cluster_uuid) - - except LcmException as e: - error_text = str(e) - deployed_ok = False - - step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid) + k8s_cluster_id = kdur["k8s-cluster"]["id"] + step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id) + cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype) + + if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: + del_repo_list, added_repo_dict = await asyncio.ensure_future( + self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) + if del_repo_list or added_repo_dict: + unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list} + updated = {'_admin.helm_charts_added.' + + item: name for item, name in added_repo_dict.items()} + self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, " + "to_add: {}".format(k8s_cluster_id, del_repo_list, + added_repo_dict)) + self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) + updated_cluster_list.append(cluster_uuid) + + step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], + kdur["kdu-name"], k8s_cluster_id) k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid, "k8scluster-type": k8sclustertype, "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel} - if error_text: - k8s_instace_info["detailed-status"] = error_text db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info self.update_db_2("nsrs", nsr_id, db_nsr_update) - if error_text: - continue - db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s." - "{}".format(index)} + db_dict = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": "_admin.deployed.K8s.{}".format(index)} - if k8sclustertype == "chart": + if k8sclustertype == "helm-chart": task = asyncio.ensure_future( self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True, params=desc_params, db_dict=db_dict, timeout=3600) @@ -2133,34 +2135,19 @@ class NsLcm(LcmBase): kdu_name=kdur["kdu-name"]) ) - pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index) - index += 1 + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) + task_instantiation_info[task] = "Deploy KDU {}".format(kdur["kdu-name"]) - if pending_tasks: - self.logger.debug(logging_text + 'Waiting for terminate pending tasks...') - pending_list = list(pending_tasks.keys()) - while pending_list: - done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60, - return_when=asyncio.FIRST_COMPLETED) - if not done_list: # timeout - for task in pending_list: - db_nsr_update[pending_tasks[task] + "detailed-status"] = "Timeout" - deployed_ok = False - break - for task in done_list: - exc = task.exception() - if exc: - db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc) - deployed_ok = False - else: - db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result() - - if not deployed_ok: - raise LcmException('Cannot deploy KDUs') + index += 1 + except (LcmException, asyncio.CancelledError): + raise except Exception as e: - msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e) - self.logger.error(msg) + msg = "Exception {} while {}: {}".format(type(e).__name__, step, e) + if isinstance(e, (N2VCException, DbException)): + self.logger.error(logging_text + msg) + else: + self.logger.critical(logging_text + msg, exc_info=True) raise LcmException(msg) finally: if db_nsr_update: @@ -2168,7 +2155,7 @@ class NsLcm(LcmBase): def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id, kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config, - base_folder, task_instantiation_list, task_instantiation_info): + base_folder, task_instantiation_info, stage): # launch instantiate_N2VC in a asyncio task and register task object # Look where information of this charm is at database ._admin.deployed.VCA # if not found, create one entry and update database @@ -2221,12 +2208,13 @@ class NsLcm(LcmBase): deploy_params=deploy_params, config_descriptor=descriptor_config, base_folder=base_folder, - nslcmop_id=nslcmop_id + nslcmop_id=nslcmop_id, + stage=stage ) ) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) - task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "") - task_instantiation_list.append(task_n2vc) + task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format( + member_vnf_index or "", vdu_id or "") # Check if this VNFD has a configured terminate action def _has_terminate_config_primitive(self, vnfd): @@ -2350,7 +2338,7 @@ class NsLcm(LcmBase): def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive, mapped_primitive_params, operationState=None, detailed_status=None, operationType=None, RO_nsr_id=None, RO_scaling_info=None): - if not (db_nslcmop): + if not db_nslcmop: return self.SUBOPERATION_STATUS_NOT_FOUND # Get the "_admin.operations" list, if it exists db_nslcmop_admin = db_nslcmop.get('_admin', {}) @@ -2448,35 +2436,32 @@ class NsLcm(LcmBase): # Function to return execution_environment id def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list): + # TODO vdu_index_count for vca in vca_deployed_list: if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id: return vca["ee_id"] - # Helper methods for terminate() - - async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id): - """ Create a primitive with params from VNFD - Called from terminate() before deleting instance - Calls action() to execute the primitive """ - logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id) - db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] - db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) - db_vnfds = {} - # Loop over VNFRs - for vnfr in db_vnfrs_list: - vnfd_id = vnfr["vnfd-id"] - vnf_index = vnfr["member-vnf-index-ref"] - if vnfd_id not in db_vnfds: - step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id) - vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) - db_vnfds[vnfd_id] = vnfd - vnfd = db_vnfds[vnfd_id] - if not self._has_terminate_config_primitive(vnfd): - continue - # Get the primitive's sorted sequence list - seq_list = self._get_terminate_config_primitive_seq_list(vnfd) - for seq in seq_list: + async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True): + """ + Execute the terminate primitives and destroy the execution environment (if destroy_ee=False + :param logging_text: + :param db_nslcmop: + :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA. + :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu + :param vca_index: index in the database _admin.deployed.VCA + :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once + :return: None or exception + """ + # execute terminate_primitives + terminate_primitives = config_descriptor.get("terminate-config-primitive") + vdu_id = vca_deployed.get("vdu_id") + vdu_count_index = vca_deployed.get("vdu_count_index") + vdu_name = vca_deployed.get("vdu_name") + vnf_index = vca_deployed.get("member-vnf-index") + if terminate_primitives and vca_deployed.get("needed_terminate"): + # Get all 'seq' tags in seq_list, order sequences numerically, ascending. + terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq'])) + for seq in terminate_primitives: # For each sequence in list, get primitive and call _ns_execute_primitive() step = "Calling terminate action for vnf_member_index={} primitive={}".format( vnf_index, seq.get("name")) @@ -2486,12 +2471,9 @@ class NsLcm(LcmBase): mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index) # The following 3 parameters are currently set to None for 'terminate': # vdu_id, vdu_count_index, vdu_name - vdu_id = db_nslcmop["operationParams"].get("vdu_id") - vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index") - vdu_name = db_nslcmop["operationParams"].get("vdu_name") + # Add sub-operation self._add_suboperation(db_nslcmop, - nslcmop_id, vnf_index, vdu_id, vdu_count_index, @@ -2499,32 +2481,22 @@ class NsLcm(LcmBase): primitive, mapped_primitive_params) # Sub-operations: Call _ns_execute_primitive() instead of action() - # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - # nsr_deployed = db_nsr["_admin"]["deployed"] - - # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action( - # nsr_id, nslcmop_terminate_action_id) - # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED'] - # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] - # if result not in result_ok: - # raise LcmException( - # "terminate_primitive_action for vnf_member_index={}", - # " primitive={} fails with error {}".format( - # vnf_index, seq.get("name"), result_detail)) - - ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list) try: - await self.n2vc.exec_primitive( - ee_id=ee_id, - primitive_name=primitive, - params_dict=mapped_primitive_params - ) - except Exception as e: - self.logger.error('Error executing primitive {}: {}'.format(primitive, e)) - raise LcmException( - "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}" - .format(vnf_index, seq.get("name"), e), - ) + result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive, + mapped_primitive_params) + except LcmException: + # this happens when VCA is not deployed. In this case it is not needed to terminate + continue + result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] + if result not in result_ok: + raise LcmException("terminate_primitive {} for vnf_member_index={} fails with " + "error {}".format(seq.get("name"), vnf_index, result_detail)) + # set that this VCA do not need terminated + db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index) + self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}) + + if destroy_ee: + await self.n2vc.delete_execution_environment(vca_deployed["ee_id"]) async def _delete_N2VC(self, nsr_id: str): self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING') @@ -2532,8 +2504,147 @@ class NsLcm(LcmBase): await self.n2vc.delete_namespace(namespace=namespace) self._write_all_config_status(nsr_id=nsr_id, status='DELETED') - async def terminate(self, nsr_id, nslcmop_id): + async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage): + """ + Terminates a deployment from RO + :param logging_text: + :param nsr_deployed: db_nsr._admin.deployed + :param nsr_id: + :param nslcmop_id: + :param stage: list of string with the content to write on db_nslcmop.detailed-status. + this method will update only the index 2, but it will write on database the concatenated content of the list + :return: + """ + db_nsr_update = {} + failed_detail = [] + ro_nsr_id = ro_delete_action = None + if nsr_deployed and nsr_deployed.get("RO"): + ro_nsr_id = nsr_deployed["RO"].get("nsr_id") + ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id") + try: + if ro_nsr_id: + stage[2] = "Deleting ns from VIM." + db_nsr_update["detailed-status"] = " ".join(stage) + self._write_op_status(nslcmop_id, stage) + self.logger.debug(logging_text + stage[2]) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + desc = await self.RO.delete("ns", ro_nsr_id) + ro_delete_action = desc["action_id"] + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + if ro_delete_action: + # wait until NS is deleted from VIM + stage[2] = "Waiting ns deleted from VIM." + detailed_status_old = None + self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id, + ro_delete_action)) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + + delete_timeout = 20 * 60 # 20 minutes + while delete_timeout > 0: + desc = await self.RO.show( + "ns", + item_id_name=ro_nsr_id, + extra_item="action", + extra_item_id=ro_delete_action) + + # deploymentStatus + self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) + + ns_status, ns_status_info = self.RO.check_action_status(desc) + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + stage[2] = "Deleting from VIM {}".format(ns_status_info) + elif ns_status == "ACTIVE": + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + break + else: + assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) + if stage[2] != detailed_status_old: + detailed_status_old = stage[2] + db_nsr_update["detailed-status"] = " ".join(stage) + self._write_op_status(nslcmop_id, stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + await asyncio.sleep(5, loop=self.loop) + delete_timeout -= 5 + else: # delete_timeout <= 0: + raise ROclient.ROClientException("Timeout waiting ns deleted from VIM") + except Exception as e: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None + self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)) + elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict + failed_detail.append("RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) + else: + failed_detail.append("RO_ns_id={} delete error: {}".format(ro_nsr_id, e)) + self.logger.error(logging_text + failed_detail[-1]) + + # Delete nsd + if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")): + ro_nsd_id = nsr_deployed["RO"]["nsd_id"] + try: + stage[2] = "Deleting nsd from RO." + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + await self.RO.delete("nsd", ro_nsd_id) + self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)) + db_nsr_update["_admin.deployed.RO.nsd_id"] = None + except Exception as e: + if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.nsd_id"] = None + self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)) + elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict + failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) + else: + failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)) + self.logger.error(logging_text + failed_detail[-1]) + + if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")): + for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]): + if not vnf_deployed or not vnf_deployed["id"]: + continue + try: + ro_vnfd_id = vnf_deployed["id"] + stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format( + vnf_deployed["member-vnf-index"], ro_vnfd_id) + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + await self.RO.delete("vnfd", ro_vnfd_id) + self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)) + db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None + except Exception as e: + if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None + self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)) + elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict + failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) + else: + failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)) + self.logger.error(logging_text + failed_detail[-1]) + + stage[2] = "Deleted from VIM" + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + + if failed_detail: + raise LcmException("Error while {}: {}".format(stage[2], "; ".join(failed_detail))) + + async def terminate(self, nsr_id, nslcmop_id): # Try to lock HA task here task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) if not task_is_locked_by_me: @@ -2541,280 +2652,201 @@ class NsLcm(LcmBase): logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") + timeout_ns_terminate = self.timeout_ns_terminate db_nsr = None db_nslcmop = None exc = None - failed_detail = [] # annotates all failed error messages - db_nsr_update = {"_admin.nslcmop": nslcmop_id, - "_admin.current-operation": nslcmop_id, - "_admin.operation-type": "terminate"} - self.update_db_2("nsrs", nsr_id, db_nsr_update) + error_list = [] # annotates all failed error messages db_nslcmop_update = {} - nslcmop_operation_state = None autoremove = False # autoremove after terminated - pending_tasks = [] + tasks_dict_info = {} + db_nsr_update = {} + stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""] + # ^ contains [stage, step, VIM-status] try: # wait for any previous tasks in process - step = "Waiting for previous operations to terminate" await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id) + stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + operation_params = db_nslcmop.get("operationParams") or {} + if operation_params.get("timeout_ns_terminate"): + timeout_ns_terminate = operation_params["timeout_ns_terminate"] + stage[1] = "Getting nsr={} from db.".format(nsr_id) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + + db_nsr_update["operational-status"] = "terminating" + db_nsr_update["config-status"] = "terminating" self._write_ns_status( nsr_id=nsr_id, ns_state="TERMINATING", current_operation="TERMINATING", - current_operation_id=nslcmop_id + current_operation_id=nslcmop_id, + other_update=db_nsr_update ) self._write_op_status( op_id=nslcmop_id, - queuePosition=0 + queuePosition=0, + stage=stage ) - - step = "Getting nslcmop={} from db".format(nslcmop_id) - db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) - step = "Getting nsr={} from db".format(nsr_id) - db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - # nsd = db_nsr["nsd"] - nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) + nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {} if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED": return - # #TODO check if VIM is creating and wait - # RO_vim_id = db_vim["_admin"]["deployed"]["RO"] - # Call internal terminate action - await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id) - - pending_tasks = [] - - db_nsr_update["operational-status"] = "terminating" - db_nsr_update["config-status"] = "terminating" - - # remove NS - try: - step = "delete execution environment" - self.logger.debug(logging_text + step) - - task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id)) - # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id)) - - pending_tasks.append(task_delete_ee) - except Exception as e: - msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e) - self.logger.error(msg) - failed_detail.append(msg) - - try: - # Delete from k8scluster - step = "delete kdus" - self.logger.debug(logging_text + step) - # print(nsr_deployed) - if nsr_deployed: - for kdu in nsr_deployed.get("K8s", ()): - kdu_instance = kdu.get("kdu-instance") - if not kdu_instance: - continue - if kdu.get("k8scluster-type") == "chart": - task_delete_kdu_instance = asyncio.ensure_future( - self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu_instance)) - elif kdu.get("k8scluster-type") == "juju": - task_delete_kdu_instance = asyncio.ensure_future( - self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu_instance)) - else: - self.error(logging_text + "Unknown k8s deployment type {}". - format(kdu.get("k8scluster-type"))) - continue - pending_tasks.append(task_delete_kdu_instance) - except LcmException as e: - msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e) - self.logger.error(msg) - failed_detail.append(msg) - - # remove from RO - RO_fail = False - - # Delete ns - RO_nsr_id = RO_delete_action = None - if nsr_deployed and nsr_deployed.get("RO"): - RO_nsr_id = nsr_deployed["RO"].get("nsr_id") - RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id") - try: - if RO_nsr_id: - step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \ - "Deleting ns from VIM" - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - self.update_db_2("nsrs", nsr_id, db_nsr_update) - self.logger.debug(logging_text + step) - desc = await self.RO.delete("ns", RO_nsr_id) - RO_delete_action = desc["action_id"] - db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action - db_nsr_update["_admin.deployed.RO.nsr_id"] = None - db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" - if RO_delete_action: - # wait until NS is deleted from VIM - step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\ - format(RO_nsr_id, RO_delete_action) - detailed_status_old = None - self.logger.debug(logging_text + step) - - delete_timeout = 20 * 60 # 20 minutes - while delete_timeout > 0: - desc = await self.RO.show( - "ns", - item_id_name=RO_nsr_id, - extra_item="action", - extra_item_id=RO_delete_action) - - # deploymentStatus - self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) - - ns_status, ns_status_info = self.RO.check_action_status(desc) - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - detailed_status = step + "; {}".format(ns_status_info) - elif ns_status == "ACTIVE": - db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None - db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" - break - else: - assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) - if detailed_status != detailed_status_old: - detailed_status_old = db_nslcmop_update["detailed-status"] = \ - db_nsr_update["detailed-status"] = detailed_status - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - self.update_db_2("nsrs", nsr_id, db_nsr_update) - await asyncio.sleep(5, loop=self.loop) - delete_timeout -= 5 - else: # delete_timeout <= 0: - raise ROclient.ROClientException("Timeout waiting ns deleted from VIM") - - except ROclient.ROClientException as e: - if e.http_code == 404: # not found - db_nsr_update["_admin.deployed.RO.nsr_id"] = None - db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" - db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None - self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id)) - elif e.http_code == 409: # conflict - failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) - self.logger.debug(logging_text + failed_detail[-1]) - RO_fail = True - else: - failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) - self.logger.error(logging_text + failed_detail[-1]) - RO_fail = True - # Delete nsd - if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"): - RO_nsd_id = nsr_deployed["RO"]["nsd_id"] - try: - step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\ - "Deleting nsd from RO" - await self.RO.delete("nsd", RO_nsd_id) - self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id)) - db_nsr_update["_admin.deployed.RO.nsd_id"] = None - except ROclient.ROClientException as e: - if e.http_code == 404: # not found - db_nsr_update["_admin.deployed.RO.nsd_id"] = None - self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id)) - elif e.http_code == 409: # conflict - failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e)) - self.logger.debug(logging_text + failed_detail[-1]) - RO_fail = True + stage[1] = "Getting vnf descriptors from db." + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + db_vnfds_from_id = {} + db_vnfds_from_member_index = {} + # Loop over VNFRs + for vnfr in db_vnfrs_list: + vnfd_id = vnfr["vnfd-id"] + if vnfd_id not in db_vnfds_from_id: + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + db_vnfds_from_id[vnfd_id] = vnfd + db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id] + + # Destroy individual execution environments when there are terminating primitives. + # Rest of EE will be deleted at once + if not operation_params.get("skip_terminate_primitives"): + stage[0] = "Stage 2/3 execute terminating primitives." + stage[1] = "Looking execution environment that needs terminate." + self.logger.debug(logging_text + stage[1]) + for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): + config_descriptor = None + if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"): + continue + if not vca.get("member-vnf-index"): + # ns + config_descriptor = db_nsr.get("ns-configuration") + elif vca.get("vdu_id"): + db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] + vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None) + if vdud: + config_descriptor = vdud.get("vdu-configuration") + elif vca.get("kdu_name"): + db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] + kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None) + if kdud: + config_descriptor = kdud.get("kdu-configuration") else: - failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e)) - self.logger.error(logging_text + failed_detail[-1]) - RO_fail = True + config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration") + task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, + vca_index, False)) + tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) + + # wait for pending tasks of terminate primitives + if tasks_dict_info: + self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...') + error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, + min(self.timeout_charm_delete, timeout_ns_terminate), + stage, nslcmop_id) + if error_list: + return # raise LcmException("; ".join(error_list)) + tasks_dict_info.clear() + + # remove All execution environments at once + stage[0] = "Stage 3/3 delete all." + stage[1] = "Deleting all execution environments." + self.logger.debug(logging_text + stage[1]) + + task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id)) + # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id)) + tasks_dict_info[task_delete_ee] = "Terminating all VCA" + + # Delete from k8scluster + stage[1] = "Deleting KDUs." + self.logger.debug(logging_text + stage[1]) + # print(nsr_deployed) + for kdu in get_iterable(nsr_deployed, "K8s"): + if not kdu or not kdu.get("kdu-instance"): + continue + kdu_instance = kdu.get("kdu-instance") + if kdu.get("k8scluster-type") in ("helm-chart", "chart"): + task_delete_kdu_instance = asyncio.ensure_future( + self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"), + kdu_instance=kdu_instance)) + elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"): + task_delete_kdu_instance = asyncio.ensure_future( + self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"), + kdu_instance=kdu_instance)) + else: + self.logger.error(logging_text + "Unknown k8s deployment type {}". + format(kdu.get("k8scluster-type"))) + continue + tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name")) - if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"): - for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]): - if not vnf_deployed or not vnf_deployed["id"]: - continue - try: - RO_vnfd_id = vnf_deployed["id"] - step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\ - "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format( - vnf_deployed["member-vnf-index"], RO_vnfd_id) - await self.RO.delete("vnfd", RO_vnfd_id) - self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id)) - db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None - except ROclient.ROClientException as e: - if e.http_code == 404: # not found - db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None - self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id)) - elif e.http_code == 409: # conflict - failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e)) - self.logger.debug(logging_text + failed_detail[-1]) - else: - failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) - self.logger.error(logging_text + failed_detail[-1]) + # remove from RO + stage[1] = "Deleting ns from VIM." + task_delete_ro = asyncio.ensure_future( + self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage)) + tasks_dict_info[task_delete_ro] = "Removing deployment from VIM" - if failed_detail: - terminate_ok = False - self.logger.error(logging_text + " ;".join(failed_detail)) - db_nsr_update["operational-status"] = "failed" - db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail) - db_nslcmop_update["detailed-status"] = "; ".join(failed_detail) - db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" - db_nslcmop_update["statusEnteredTime"] = time() - else: - terminate_ok = True - db_nsr_update["operational-status"] = "terminated" - db_nsr_update["detailed-status"] = "Done" - db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" - db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" - db_nslcmop_update["statusEnteredTime"] = time() - if db_nslcmop["operationParams"].get("autoremove"): - autoremove = True + # rest of staff will be done at finally - except (ROclient.ROClientException, DbException, LcmException) as e: + except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except asyncio.CancelledError: - self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1])) exc = "Operation was cancelled" except Exception as e: exc = traceback.format_exc() - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True) finally: - if exc and db_nslcmop: - db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" - db_nslcmop_update["statusEnteredTime"] = time() + if exc: + error_list.append(str(exc)) try: - if db_nslcmop and db_nslcmop_update: - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - if db_nsr: - db_nsr_update["_admin.nslcmop"] = None - db_nsr_update["_admin.current-operation"] = None - db_nsr_update["_admin.operation-type"] = None - self.update_db_2("nsrs", nsr_id, db_nsr_update) - - if terminate_ok: - ns_state = "IDLE" - error_description = None - error_detail = None - else: - ns_state = "BROKEN" - error_detail = "; ".join(failed_detail) - error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\ - .format(nslcmop_id, step, error_detail) + # wait for pending tasks + if tasks_dict_info: + stage[1] = "Waiting for terminate pending tasks." + self.logger.debug(logging_text + stage[1]) + error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate, + stage, nslcmop_id) + stage[1] = stage[2] = "" + except asyncio.CancelledError: + error_list.append("Cancelled") + # TODO cancell all tasks + except Exception as exc: + error_list.append(str(exc)) + # update status at database + if error_list: + error_detail = "; ".join(error_list) + # self.logger.error(logging_text + error_detail) + error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail) + error_description_nsr = 'Operation: TERMINATING.{}, {}'.format(nslcmop_id, error_description_nslcmop) - self._write_ns_status( - nsr_id=nsr_id, - ns_state=ns_state, - current_operation="IDLE", - current_operation_id=None, - error_description=error_description - ) - - self._write_op_status( - op_id=nslcmop_id, - error_message=error_description - ) + db_nsr_update["operational-status"] = "failed" + db_nsr_update["detailed-status"] = error_description_nsr + db_nslcmop_update["detailed-status"] = error_detail + nslcmop_operation_state = "FAILED" + ns_state = "BROKEN" + else: + error_description_nsr = error_description_nslcmop = None + ns_state = "NOT_INSTANTIATED" + db_nsr_update["operational-status"] = "terminated" + db_nsr_update["detailed-status"] = "Done" + db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" + db_nslcmop_update["detailed-status"] = "Done" + nslcmop_operation_state = "COMPLETED" - except DbException as e: - self.logger.error(logging_text + "Cannot update database: {}".format(e)) + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=ns_state, + current_operation="IDLE", + current_operation_id=None, + error_description=error_description_nsr, + other_update=db_nsr_update + ) + if db_nslcmop: + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + autoremove = operation_params.get("autoremove", False) if nslcmop_operation_state: try: await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, @@ -2824,20 +2856,53 @@ class NsLcm(LcmBase): except Exception as e: self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - # wait for pending tasks - done = None - pending = None - if pending_tasks: - self.logger.debug(logging_text + 'Waiting for terminate pending tasks...') - done, pending = await asyncio.wait(pending_tasks, timeout=3600) - if not pending: - self.logger.debug(logging_text + 'All tasks finished...') - else: - self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending)) - self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") + async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None): + time_start = time() + error_list = [] + pending_tasks = list(created_tasks_info.keys()) + num_tasks = len(pending_tasks) + num_done = 0 + stage[1] = "{}/{}.".format(num_done, num_tasks) + self._write_op_status(nslcmop_id, stage) + new_error = False + while pending_tasks: + _timeout = timeout + time_start - time() + done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout, + return_when=asyncio.FIRST_COMPLETED) + num_done += len(done) + if not done: # Timeout + for task in pending_tasks: + error_list.append(created_tasks_info[task] + ": Timeout") + break + for task in done: + if task.cancelled(): + self.logger.warn(logging_text + created_tasks_info[task] + ": Cancelled") + error_list.append(created_tasks_info[task] + ": Cancelled") + new_error = True + else: + exc = task.exception() + if exc: + error_list.append(created_tasks_info[task] + ": {}".format(exc)) + new_error = True + if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)): + self.logger.error(logging_text + created_tasks_info[task] + ": " + str(exc)) + else: + exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__)) + self.logger.error(logging_text + created_tasks_info[task] + exc_traceback) + else: + self.logger.debug(logging_text + created_tasks_info[task] + ": Done") + stage[1] = "{}/{}.".format(num_done, num_tasks) + if new_error: + stage[1] += "Errors: " + ". ".join(error_list) + "." + new_error = False + if nsr_id: # update also nsr + self.update_db_2("nsrs", nsr_id, {"errorDescription": ". ".join(error_list)}) + self._write_op_status(nslcmop_id, stage) + return error_list + @staticmethod def _map_primitive_params(primitive_desc, params, instantiation_params): """ @@ -2881,33 +2946,36 @@ class NsLcm(LcmBase): calculated_params["ns_config_info"] = instantiation_params["ns_config_info"] return calculated_params - async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index, - primitive, primitive_params, retries=0, retries_interval=30) -> (str, str): - - # find vca_deployed record for this action + def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None): + # find vca_deployed record for this action. Raise LcmException if not found or there is not any id. + for vca in deployed_vca: + if not vca: + continue + if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]: + continue + if vdu_name and vdu_name != vca["vdu_name"]: + continue + if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]: + continue + if kdu_name and kdu_name != vca["kdu_name"]: + continue + break + else: + # vca_deployed not found + raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not " + "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) + + # get ee_id + ee_id = vca.get("ee_id") + if not ee_id: + raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not " + "execution environment" + .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) + return ee_id + + async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, + retries_interval=30) -> (str, str): try: - for vca_deployed in db_deployed["VCA"]: - if not vca_deployed: - continue - if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]: - continue - if vdu_name and vdu_name != vca_deployed["vdu_name"]: - continue - if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]: - continue - break - else: - # vca_deployed not found - raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not " - "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) - - # get ee_id - ee_id = vca_deployed.get("ee_id") - if not ee_id: - raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not " - "execution environment" - .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) - if primitive == "config": primitive_params = {"params": primitive_params} @@ -2927,12 +2995,14 @@ class NsLcm(LcmBase): # wait and retry await asyncio.sleep(retries_interval, loop=self.loop) else: - return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL' + return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e) - return output, 'OK' + return 'COMPLETED', output + except LcmException: + raise except Exception as e: - return 'Error executing action {}: {}'.format(primitive, e), 'FAIL' + return 'FAIL', 'Error executing action {}: {}'.format(primitive, e) async def action(self, nsr_id, nslcmop_id): @@ -2946,10 +3016,7 @@ class NsLcm(LcmBase): # get all needed from database db_nsr = None db_nslcmop = None - db_nsr_update = {"_admin.nslcmop": nslcmop_id, - "_admin.current-operation": nslcmop_id, - "_admin.operation-type": "action"} - self.update_db_2("nsrs", nsr_id, db_nsr_update) + db_nsr_update = {} db_nslcmop_update = {} nslcmop_operation_state = None nslcmop_operation_state_detail = None @@ -3029,13 +3096,13 @@ class NsLcm(LcmBase): if len(parts) == 2: kdu_model = parts[0] - if kdu.get("k8scluster-type") == "chart": + if kdu.get("k8scluster-type") in ("helm-chart", "chart"): output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance"), atomic=True, kdu_model=kdu_model, params=desc_params, db_dict=db_dict, timeout=300) - elif kdu.get("k8scluster-type") == "juju": + elif kdu.get("k8scluster-type")in ("juju-bundle", "juju"): output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance"), atomic=True, kdu_model=kdu_model, @@ -3049,11 +3116,11 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output)) break elif primitive == "rollback": - if kdu.get("k8scluster-type") == "chart": + if kdu.get("k8scluster-type") in ("helm-chart", "chart"): output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance"), db_dict=db_dict) - elif kdu.get("k8scluster-type") == "juju": + elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"): output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance"), db_dict=db_dict) @@ -3062,10 +3129,10 @@ class NsLcm(LcmBase): raise LcmException(msg) break elif primitive == "status": - if kdu.get("k8scluster-type") == "chart": + if kdu.get("k8scluster-type") in ("helm-chart", "chart"): output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance")) - elif kdu.get("k8scluster-type") == "juju": + elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"): output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance")) else: @@ -3113,21 +3180,15 @@ class NsLcm(LcmBase): desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"])) # TODO check if ns is in a proper status - output, detail = await self._ns_execute_primitive( - db_deployed=nsr_deployed, - member_vnf_index=vnf_index, - vdu_id=vdu_id, - vdu_name=vdu_name, - vdu_count_index=vdu_count_index, + result, detailed_status = await self._ns_execute_primitive( + self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=vdu_id, + vdu_name=vdu_name, + vdu_count_index=vdu_count_index), primitive=primitive, primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)) - detailed_status = output - if detail == 'OK': - result = 'COMPLETED' - else: - result = 'FAILED' - db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status db_nslcmop_update["operationState"] = nslcmop_operation_state = result db_nslcmop_update["statusEnteredTime"] = time() @@ -3153,16 +3214,12 @@ class NsLcm(LcmBase): if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr: - db_nsr_update["_admin.nslcmop"] = None - db_nsr_update["_admin.operation-type"] = None - db_nsr_update["_admin.nslcmop"] = None - db_nsr_update["_admin.current-operation"] = None - self.update_db_2("nsrs", nsr_id, db_nsr_update) self._write_ns_status( nsr_id=nsr_id, ns_state=None, current_operation="IDLE", - current_operation_id=None + current_operation_id=None, + other_update=db_nsr_update ) if exc: self._write_op_status( @@ -3197,10 +3254,7 @@ class NsLcm(LcmBase): db_nslcmop = None db_nslcmop_update = {} nslcmop_operation_state = None - db_nsr_update = {"_admin.nslcmop": nslcmop_id, - "_admin.current-operation": nslcmop_id, - "_admin.operation-type": "scale"} - self.update_db_2("nsrs", nsr_id, db_nsr_update) + db_nsr_update = {} exc = None # in case of error, indicates what part of scale was failed to put nsr at error status scale_process = None @@ -3395,7 +3449,12 @@ class NsLcm(LcmBase): format(vnf_config_primitive)) # Execute the primitive, either with new (first-time) or registered (reintent) args result, result_detail = await self._ns_execute_primitive( - nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params) + self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_name=None, + vdu_count_index=None), + vnf_config_primitive, primitive_params) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -3584,7 +3643,12 @@ class NsLcm(LcmBase): format(vnf_config_primitive)) # Execute the primitive, either with new (first-time) or registered (reintent) args result, result_detail = await self._ns_execute_primitive( - nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params) + self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_name=None, + vdu_count_index=None), + vnf_config_primitive, primitive_params) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -3630,7 +3694,6 @@ class NsLcm(LcmBase): db_nsr_update["operational-status"] = old_operational_status db_nsr_update["config-status"] = old_config_status db_nsr_update["detailed-status"] = "" - db_nsr_update["_admin.nslcmop"] = None if scale_process: if "VCA" in scale_process: db_nsr_update["config-status"] = "failed" @@ -3642,16 +3705,12 @@ class NsLcm(LcmBase): if db_nslcmop and db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr: - db_nsr_update["_admin.current-operation"] = None - db_nsr_update["_admin.operation-type"] = None - db_nsr_update["_admin.nslcmop"] = None - self.update_db_2("nsrs", nsr_id, db_nsr_update) - self._write_ns_status( nsr_id=nsr_id, ns_state=None, current_operation="IDLE", - current_operation_id=None + current_operation_id=None, + other_update=db_nsr_update ) except DbException as e: diff --git a/osm_lcm/tests/test_db_descriptors.py b/osm_lcm/tests/test_db_descriptors.py index 93fdf85..a7f4ccf 100644 --- a/osm_lcm/tests/test_db_descriptors.py +++ b/osm_lcm/tests/test_db_descriptors.py @@ -237,6 +237,8 @@ db_nsrs_text = """ member-vnf-index: '2' VCA: - application: alf-b-aa + ee_id: f48163a6-c807-47bc-9682-f72caef5af85.alf-b-aa + needed_terminate: True detailed-status: Ready! member-vnf-index: '1' model: f48163a6-c807-47bc-9682-f72caef5af85 @@ -249,6 +251,8 @@ db_nsrs_text = """ vdu_name: null vnfd_id: hackfest3charmed-vnf - application: alf-c-ab + ee_id: f48163a6-c807-47bc-9682-f72caef5af85.alf-c-ab + needed_terminate: True detailed-status: Ready! member-vnf-index: '2' model: f48163a6-c807-47bc-9682-f72caef5af85 diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index e54179b..77f1d38 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -25,7 +25,7 @@ from osm_common.dbmemory import DbMemory from osm_common.msgkafka import MsgKafka from osm_common.fslocal import FsLocal from osm_lcm.lcm_utils import TaskRegistry -# from osm_lcm.ROclient import ROClient +from osm_lcm.ROclient import ROClient from uuid import uuid4 # from asynctest.mock import patch @@ -200,6 +200,7 @@ class TestMyNS(asynctest.TestCase): self.my_ns.n2vc.GetPrimitiveStatus = asynctest.CoroutineMock(return_value="completed") self.my_ns.n2vc.GetPrimitiveOutput = asynctest.CoroutineMock(return_value={"result": "ok", "pubkey": pub_key}) + self.my_ns.n2vc.delete_execution_environment = asynctest.CoroutineMock(return_value=None) self.my_ns.n2vc.get_public_key = asynctest.CoroutineMock( return_value=getenv("OSMLCM_VCA_PUBKEY", "public_key")) self.my_ns.n2vc.delete_namespace = asynctest.CoroutineMock(return_value=None) @@ -484,22 +485,24 @@ class TestMyNS(asynctest.TestCase): async def test_deploy_kdus(self): nsr_id = descriptors.test_ids["TEST-KDU"]["ns"] - # nslcmop_id = descriptors.test_ids["TEST-KDU"]["instantiate"] + nslcmop_id = descriptors.test_ids["TEST-KDU"]["instantiate"] db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) db_vnfr = self.db.get_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "multikdu"}) db_vnfrs = {"multikdu": db_vnfr} db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) db_vnfds = {db_vnfd["_id"]: db_vnfd} + task_register = {} logging_text = "KDU" self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id") self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", "")) - await self.my_ns.deploy_kdus(logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds) + await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register) + await asyncio.wait(list(task_register.keys()), timeout=100) db_nsr = self.db.get_list("nsrs")[1] self.assertIn("K8s", db_nsr["_admin"]["deployed"], "K8s entry not created at '_admin.deployed'") self.assertIsInstance(db_nsr["_admin"]["deployed"]["K8s"], list, "K8s entry is not of type list") self.assertEqual(len(db_nsr["_admin"]["deployed"]["K8s"]), 2, "K8s entry is not of type list") - k8s_instace_info = {"kdu-instance": "k8s_id", "k8scluster-uuid": "73d96432-d692-40d2-8440-e0c73aee209c", - "k8scluster-type": "chart", + k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": "73d96432-d692-40d2-8440-e0c73aee209c", + "k8scluster-type": "helm-chart", "kdu-name": "ldap", "kdu-model": "stable/openldap:1.2.1"} self.assertEqual(db_nsr["_admin"]["deployed"]["K8s"][0], k8s_instace_info) @@ -534,6 +537,58 @@ class TestMyNS(asynctest.TestCase): self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None") self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None") + @asynctest.fail_on(active_handles=True) # all async tasks must be completed + async def test_terminate_without_configuration(self): + nsr_id = descriptors.test_ids["TEST-A"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"] + # set instantiation task as completed + self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id}, + update_dict={"operationState": "COMPLETED"}) + self.my_ns.RO.show = asynctest.CoroutineMock(ROClient.show, side_effect=self._ro_show(delete=nslcmop_id)) + self.db.set_one("nsrs", {"_id": nsr_id}, + update_dict={"_admin.deployed.VCA.0": None, "_admin.deployed.VCA.1": None}) + + await self.my_ns.terminate(nsr_id, nslcmop_id) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status")) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription "))) + self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription "))) + self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'") + self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None") + self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None") + self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None") + + @asynctest.fail_on(active_handles=True) # all async tasks must be completed + async def test_terminate_primitive(self): + nsr_id = descriptors.test_ids["TEST-A"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"] + self.my_ns.RO.show = asynctest.CoroutineMock(ROClient.show, side_effect=self._ro_show(delete=nslcmop_id)) + # set instantiation task as completed + self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id}, + update_dict={"operationState": "COMPLETED"}) + + # modify vnfd descriptor to include terminate_primitive + terminate_primitive = [{ + "name": "touch", + "parameter": [{"name": "filename", "value": "terminate_filename"}], + "seq": '1' + }] + db_vnfr = self.db.get_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "1"}) + self.db.set_one("vnfds", {"_id": db_vnfr["vnfd-id"]}, + {"vnf-configuration.terminate-config-primitive": terminate_primitive}) + + await self.my_ns.terminate(nsr_id, nslcmop_id) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status")) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription "))) + self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription "))) + self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'") + self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None") + self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None") + self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None") + if __name__ == '__main__': asynctest.main() -- 2.17.1