X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=b9417f29960eb3157c12c16dcf48e8d2c072a8bf;hb=6e9d2eb347c5daca2a598b607dc6d51c15662406;hp=d257b45122b3c8e0adaa8463f94affd2767132f0;hpb=ca2e16a01c3907853231290ed610af6a76bc4c0e;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index d257b45..b9417f2 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -3,19 +3,17 @@ import asyncio import yaml -import ROclient import logging import logging.handlers import getopt import functools import sys import traceback -from osm_common import dbmemory -from osm_common import dbmongo -from osm_common import fslocal -from osm_common import msglocal -from osm_common import msgkafka -from osm_common.dbbase import DbException +import ROclient +# from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient +from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka +from osm_common import version as common_version +from osm_common.dbbase import DbException, deep_update from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from os import environ, path @@ -29,9 +27,22 @@ from time import time __author__ = "Alfonso Tierno" -min_RO_version = [0, 5, 69] - - +min_RO_version = [0, 5, 72] +min_n2vc_version = "0.0.2" +min_common_version = "0.1.7" +# uncomment if LCM is installed as library and installed, and get them from __init__.py +lcm_version = '0.1.15' +lcm_version_date = '2018-09-13' + + +def versiontuple(v): + """utility for compare dot separate versions. Fills with zeros to proper number comparison""" + filled = [] + for point in v.split("."): + filled.append(point.zfill(8)) + return tuple(filled) + + class LcmException(Exception): pass @@ -123,9 +134,10 @@ class TaskRegistry: for task_name, task in self.task_registry[topic][_id][op_id].items(): if target_task_name and target_task_name != task_name: continue - result = task.cancel() - if result: - self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) + # result = + task.cancel() + # if result: + # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) class Lcm: @@ -165,12 +177,12 @@ class Lcm: config["database"]["logger_name"] = "lcm.db" config["storage"]["logger_name"] = "lcm.fs" config["message"]["logger_name"] = "lcm.msg" - if "logfile" in config["global"]: + if config["global"].get("logfile"): file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0) file_handler.setFormatter(log_formatter_simple) self.logger.addHandler(file_handler) - else: + if not config["global"].get("nologging"): str_handler = logging.StreamHandler() str_handler.setFormatter(log_formatter_simple) self.logger.addHandler(str_handler) @@ -182,14 +194,14 @@ class Lcm: for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items(): config[k1]["logger_name"] = logname logger_module = logging.getLogger(logname) - if "logfile" in config[k1]: + if config[k1].get("logfile"): file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0) file_handler.setFormatter(log_formatter_simple) logger_module.addHandler(file_handler) - if "loglevel" in config[k1]: + if config[k1].get("loglevel"): logger_module.setLevel(config[k1]["loglevel"]) - + self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)) self.n2vc = N2VC( log=self.logger, server=config['VCA']['host'], @@ -205,8 +217,12 @@ class Lcm: # check version of N2VC # TODO enhance with int conversion or from distutils.version import LooseVersion # or with list(map(int, version.split("."))) - if N2VC_version < "0.0.2": - raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version)) + if versiontuple(N2VC_version) < versiontuple(min_n2vc_version): + raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format( + N2VC_version, min_n2vc_version)) + if versiontuple(common_version) < versiontuple("0.1.7"): + raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format( + common_version, min_common_version)) try: # TODO check database version @@ -657,8 +673,7 @@ class Lcm: if ci_file: ci_file.close() - def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, member_vnf_index, - task=None): + def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None): """ Callback both for charm status change and task completion :param model_name: Charm model name @@ -673,122 +688,67 @@ class Lcm: - removing, - removed :param message: detailed message error - :param db_nsr: nsr database content - :param db_nslcmop: nslcmop database content - :param member_vnf_index: NSD member-vnf-index + :param n2vc_info dictionary with information shared with instantiate task. Contains: + nsr_id: + nslcmop_id: + lcmOperationType: currently "instantiate" + deployed: dictionary with {: {operational-status: , detailed-status: }} + db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value + n2vc_event: event used to notify instantiation task that some change has been produced :param task: None for charm status change, or task for completion task callback :return: """ - nsr_id = None - nslcmop_id = None - db_nsr_update = {} - db_nslcmop_update = {} try: - nsr_id = db_nsr["_id"] - nslcmop_id = db_nslcmop["_id"] - nsr_lcm = db_nsr["_admin"]["deployed"] - ns_operation = db_nslcmop["lcmOperationType"] - logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_operation, nslcmop_id, - member_vnf_index) + nsr_id = n2vc_info["nsr_id"] + deployed = n2vc_info["deployed"] + db_nsr_update = n2vc_info["db_update"] + nslcmop_id = n2vc_info["nslcmop_id"] + ns_operation = n2vc_info["lcmOperationType"] + n2vc_event = n2vc_info["n2vc_event"] + logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id, + application_name) + vca_deployed = deployed.get(application_name) + if not vca_deployed: + self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA") + return if task: if task.cancelled(): self.logger.debug(logging_text + " task Cancelled") - # TODO update db_nslcmop - return + vca_deployed['operational-status'] = "error" + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error" + vca_deployed['detailed-status'] = "Task Cancelled" + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = "Task Cancelled" - if task.done(): + elif task.done(): exc = task.exception() if exc: self.logger.error(logging_text + " task Exception={}".format(exc)) - if ns_operation in ("instantiate", "terminate"): - nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error" - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = \ - "error" - nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc) - db_nsr_update[ - "_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(exc) - elif ns_operation == "action": - db_nslcmop_update["operationState"] = "FAILED" - db_nslcmop_update["detailed-status"] = str(exc) - db_nslcmop_update["statusEnteredTime"] = time() - return - + vca_deployed['operational-status'] = "error" + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error" + vca_deployed['detailed-status'] = str(exc) + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc) else: self.logger.debug(logging_text + " task Done") - # TODO revise with Adam if action is finished and ok when task is done - if ns_operation == "action": - db_nslcmop_update["operationState"] = "COMPLETED" - db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["statusEnteredTime"] = time() # task is Done, but callback is still ongoing. So ignore return elif status: self.logger.debug(logging_text + " Enter status={}".format(status)) - if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status: + if vca_deployed['operational-status'] == status: return # same status, ignore - nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = status - nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message) - db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(message) + vca_deployed['operational-status'] = status + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = status + vca_deployed['detailed-status'] = str(message) + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(message) else: self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True) return - - all_active = True - status_map = {} - n2vc_error_text = [] # contain text error list. If empty no one is in error status - for vnf_index, vca_info in nsr_lcm["VCA"].items(): - vca_status = vca_info["operational-status"] - if vca_status not in status_map: - # Initialize it - status_map[vca_status] = 0 - status_map[vca_status] += 1 - - if vca_status != "active": - all_active = False - if vca_status in ("error", "blocked"): - n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status, - vca_info["detailed-status"])) - - if all_active: - self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, - member_vnf_index)) - db_nsr_update["config-status"] = "configured" - db_nsr_update["detailed-status"] = "done" - db_nslcmop_update["operationState"] = "COMPLETED" - db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["statusEnteredTime"] = time() - elif n2vc_error_text: - db_nsr_update["config-status"] = "failed" - error_text = "fail configuring " + ";".join(n2vc_error_text) - db_nsr_update["detailed-status"] = error_text - db_nslcmop_update["operationState"] = "FAILED_TEMP" - db_nslcmop_update["detailed-status"] = error_text - db_nslcmop_update["statusEnteredTime"] = time() - else: - cs = "configuring: " - separator = "" - for status, num in status_map.items(): - cs += separator + "{}: {}".format(status, num) - separator = ", " - db_nsr_update["config-status"] = cs - db_nsr_update["detailed-status"] = cs - db_nslcmop_update["detailed-status"] = cs - + # wake up instantiate task + n2vc_event.set() except Exception as e: - self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True) - finally: - try: - if db_nslcmop_update: - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - if db_nsr_update: - self.update_db_2("nsrs", nsr_id, db_nsr_update) - except Exception as e: - self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format( - member_vnf_index, e), exc_info=True) + self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True) - def ns_params_2_RO(self, ns_params): + def ns_params_2_RO(self, ns_params, nsd, vnfd_dict): """ Creates a RO ns descriptor from OSM ns_instantite params :param ns_params: OSM instantiate params @@ -808,6 +768,23 @@ class Lcm: vim_2_RO[vim_account] = RO_vim_id return RO_vim_id + def ip_profile_2_RO(ip_profile): + RO_ip_profile = deepcopy((ip_profile)) + if "dns-server" in RO_ip_profile: + if isinstance(RO_ip_profile["dns-server"], list): + RO_ip_profile["dns-address"] = [] + for ds in RO_ip_profile.pop("dns-server"): + RO_ip_profile["dns-address"].append(ds['address']) + else: + RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server") + if RO_ip_profile.get("ip-version") == "ipv4": + RO_ip_profile["ip-version"] = "IPv4" + if RO_ip_profile.get("ip-version") == "ipv6": + RO_ip_profile["ip-version"] = "IPv6" + if "dhcp-params" in RO_ip_profile: + RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params") + return RO_ip_profile + if not ns_params: return None RO_ns_params = { @@ -821,67 +798,144 @@ class Lcm: if ns_params.get("ssh-authorized-key"): RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]} if ns_params.get("vnf"): - for vnf in ns_params["vnf"]: - RO_vnf = {} - if "vimAccountId" in vnf: - RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"]) + for vnf_params in ns_params["vnf"]: + for constituent_vnfd in nsd["constituent-vnfd"]: + if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]: + vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]] + break + else: + raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:" + "constituent-vnfd".format(vnf_params["member-vnf-index"])) + RO_vnf = {"vdus": {}, "networks": {}} + if vnf_params.get("vimAccountId"): + RO_vnf["datacenter"] = vim_account_2_RO(vnf_params["vimAccountId"]) + if vnf_params.get("vdu"): + for vdu_params in vnf_params["vdu"]: + RO_vnf["vdus"][vdu_params["id"]] = {} + if vdu_params.get("volume"): + RO_vnf["vdus"][vdu_params["id"]]["devices"] = {} + for volume_params in vdu_params["volume"]: + RO_vnf["vdus"][vdu_params["id"]]["devices"][volume_params["name"]] = {} + if volume_params.get("vim-volume-id"): + RO_vnf["vdus"][vdu_params["id"]]["devices"][volume_params["name"]]["vim_id"] = \ + volume_params["vim-volume-id"] + if vdu_params.get("interface"): + RO_vnf["vdus"][vdu_params["id"]]["interfaces"] = {} + for interface_params in vdu_params["interface"]: + RO_interface = {} + RO_vnf["vdus"][vdu_params["id"]]["interfaces"][interface_params["name"]] = RO_interface + if interface_params.get("ip-address"): + RO_interface["ip_address"] = interface_params["ip-address"] + if interface_params.get("mac-address"): + RO_interface["mac_address"] = interface_params["mac-address"] + if interface_params.get("floating-ip-required"): + RO_interface["floating-ip"] = interface_params["floating-ip-required"] + if vnf_params.get("internal-vld"): + for internal_vld_params in vnf_params["internal-vld"]: + RO_vnf["networks"][internal_vld_params["name"]] = {} + if internal_vld_params.get("vim-network-name"): + RO_vnf["networks"][internal_vld_params["name"]]["vim-network-name"] = \ + internal_vld_params["vim-network-name"] + if internal_vld_params.get("ip-profile"): + RO_vnf["networks"][internal_vld_params["name"]]["ip-profile"] = \ + ip_profile_2_RO(internal_vld_params["ip-profile"]) + if internal_vld_params.get("internal-connection-point"): + for icp_params in internal_vld_params["internal-connection-point"]: + # look for interface + iface_found = False + for vdu_descriptor in vnf_descriptor["vdu"]: + for vdu_interface in vdu_descriptor["interface"]: + if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]: + RO_interface_update = {} + if icp_params.get("ip-address"): + RO_interface_update["ip_address"] = icp_params["ip-address"] + if icp_params.get("mac-address"): + RO_interface_update["mac_address"] = icp_params["mac-address"] + if RO_interface_update: + RO_vnf_update = {"vdus": {vdu_descriptor["id"]: { + "interfaces": {vdu_interface["name"]: RO_interface_update}}}} + deep_update(RO_vnf, RO_vnf_update) + iface_found = True + break + if iface_found: + break + else: + raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:" + "internal-vld:id-ref={} is not present at vnfd:internal-" + "connection-point".format(vnf_params["member-vnf-index"], + icp_params["id-ref"])) + + if not RO_vnf["vdus"]: + del RO_vnf["vdus"] + if not RO_vnf["networks"]: + del RO_vnf["networks"] if RO_vnf: - RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf + RO_ns_params["vnfs"][vnf_params["member-vnf-index"]] = RO_vnf if ns_params.get("vld"): - for vld in ns_params["vld"]: + for vld_params in ns_params["vld"]: RO_vld = {} - if "ip-profile" in vld: - RO_vld["ip-profile"] = vld["ip-profile"] - if "vim-network-name" in vld: + if "ip-profile" in vld_params: + RO_vld["ip-profile"] = ip_profile_2_RO(vld_params["ip-profile"]) + if "vim-network-name" in vld_params: RO_vld["sites"] = [] - if isinstance(vld["vim-network-name"], dict): - for vim_account, vim_net in vld["vim-network-name"].items(): + if isinstance(vld_params["vim-network-name"], dict): + for vim_account, vim_net in vld_params["vim-network-name"].items(): RO_vld["sites"].append({ "netmap-use": vim_net, "datacenter": vim_account_2_RO(vim_account) }) else: # isinstance str - RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]}) + RO_vld["sites"].append({"netmap-use": vld_params["vim-network-name"]}) + if "vnfd-connection-point-ref" in vld_params: + for cp_params in vld_params["vnfd-connection-point-ref"]: + # look for interface + for constituent_vnfd in nsd["constituent-vnfd"]: + if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]: + vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]] + break + else: + raise LcmException( + "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} " + "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"])) + match_cp = False + for vdu_descriptor in vnf_descriptor["vdu"]: + for interface_descriptor in vdu_descriptor["interface"]: + if interface_descriptor.get("external-connection-point-ref") == \ + cp_params["vnfd-connection-point-ref"]: + match_cp = True + break + if match_cp: + break + else: + raise LcmException( + "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:" + "vnfd-connection-point-ref={} is not present at vnfd={}".format( + cp_params["member-vnf-index-ref"], + cp_params["vnfd-connection-point-ref"], + vnf_descriptor["id"])) + RO_cp_params = {} + if cp_params.get("ip-address"): + RO_cp_params["ip_address"] = cp_params["ip-address"] + if cp_params.get("mac-address"): + RO_cp_params["mac_address"] = cp_params["mac-address"] + if RO_cp_params: + RO_vnf_params = { + cp_params["member-vnf-index-ref"]: { + "vdus": { + vdu_descriptor["id"]: { + "interfaces": { + interface_descriptor["name"]: RO_cp_params + } + } + } + } + } + deep_update(RO_ns_params["vnfs"], RO_vnf_params) if RO_vld: - RO_ns_params["networks"][vld["name"]] = RO_vld + RO_ns_params["networks"][vld_params["name"]] = RO_vld return RO_ns_params - def ns_update_vnfr(self, db_vnfrs, ns_RO_info): - """ - Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated - :param db_vnfrs: - :param ns_RO_info: - :return: - """ - for vnf_index, db_vnfr in db_vnfrs.items(): - vnfr_deployed = ns_RO_info.get(vnf_index) - if not vnfr_deployed: - continue - vnfr_update = {} - db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnfr_deployed.get("ip_address") - for index, vdur in enumerate(db_vnfr["vdur"]): - vdu_deployed = vnfr_deployed["vdur"].get(vdur["vdu-id-ref"]) - if not vdu_deployed: - continue - vnfr_update["vdur.{}.vim-id".format(index)] = vdu_deployed.get("vim_id") - db_vnfr["vdur"][index]["vim-id"] = vnfr_update["vdur.{}.vim-id".format(index)] - vnfr_update["vdur.{}.ip-address".format(index)] = vdu_deployed.get("ip_address") - db_vnfr["vdur"][index]["ip-address"] = vnfr_update["vdur.{}.ip-address".format(index)] - for index2, interface in enumerate(vdur["interfaces"]): - iface_deployed = vdu_deployed["interfaces"].get(interface["name"]) - if not iface_deployed: - continue - db_vnfr["vdur"][index]["interfaces"][index2]["vim-id"] =\ - vnfr_update["vdur.{}.interfaces.{}.vim-id".format(index, index2)] = iface_deployed.get("vim_id") - db_vnfr["vdur"][index]["interfaces"][index2]["ip-address"] =\ - vnfr_update["vdur.{}.interfaces.{}.ip-address".format(index, index2)] = iface_deployed.get( - "ip_address") - db_vnfr["vdur"][index]["interfaces"][index2]["mac-address"] =\ - vnfr_update["vdur.{}.interfaces.{}.mac-address".format(index, index2)] = iface_deployed.get( - "mac_address") - self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) - - def ns_update_vnfr_2(self, db_vnfrs, nsr_desc_RO): + def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO): """ Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated :param db_vnfrs: @@ -917,7 +971,7 @@ class Lcm: break else: - raise LcmException("ns_update_vnfr_2: Not found member_vnf_index={} at RO info".format(vnf_index)) + raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index)) async def create_monitoring(self, nsr_id, vnf_member_index, vnfd_desc): if not vnfd_desc.get("scaling-group-descriptor"): @@ -971,15 +1025,18 @@ class Lcm: db_nslcmop = None db_nsr_update = {} db_nslcmop_update = {} + nslcmop_operation_state = None db_vnfrs = {} RO_descriptor_number = 0 # number of descriptors created at RO descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO + n2vc_info = {} exc = None try: step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + ns_params = db_nsr.get("instantiate_params") nsd = db_nsr["nsd"] nsr_name = db_nsr["name"] # TODO short-name?? needed_vnfd = {} @@ -1081,7 +1138,6 @@ class Lcm: # self.logger.debug(logging_text + step) # check if VIM is creating and wait look if previous tasks in process - ns_params = db_nsr.get("instantiate_params") task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"]) if task_dependency: step = "Waiting for related tasks to be completed: {}".format(task_name) @@ -1097,7 +1153,7 @@ class Lcm: self.logger.debug(logging_text + step) await asyncio.wait(task_dependency, timeout=3600) - RO_ns_params = self.ns_params_2_RO(ns_params) + RO_ns_params = self.ns_params_2_RO(ns_params, nsd, needed_vnfd) desc = await RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid) @@ -1154,8 +1210,7 @@ class Lcm: raise ROclient.ROClientException("Timeout waiting ns to be ready") step = "Updating VNFRs" - # self.ns_update_vnfr(db_vnfrs, ns_RO_info) - self.ns_update_vnfr_2(db_vnfrs, desc) + self.ns_update_vnfr(db_vnfrs, desc) db_nsr["detailed-status"] = "Configuring vnfr" self.update_db_2("nsrs", nsr_id, db_nsr_update) @@ -1163,10 +1218,12 @@ class Lcm: # The parameters we'll need to deploy a charm number_to_configure = 0 - def deploy(): + def deploy(vnf_index, vdu_id, mgmt_ip_address, n2vc_info, config_primitive=None): """An inner function to deploy the charm from either vnf or vdu + vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration """ - + if not mgmt_ip_address: + raise LcmException("vnfd/vdu has not management ip address to configure it") # Login to the VCA. # if number_to_configure == 0: # self.logger.debug("Logging into N2VC...") @@ -1188,19 +1245,26 @@ class Lcm: ) # Setup the runtime parameters for this VNF - params['rw_mgmt_ip'] = db_vnfrs[vnf_index]["ip-address"] + params = {'rw_mgmt_ip': mgmt_ip_address} + if config_primitive: + params["initial-config-primitive"] = config_primitive # ns_name will be ignored in the current version of N2VC # but will be implemented for the next point release. model_name = 'default' + vdu_id_text = "vnfd" + if vdu_id: + vdu_id_text = vdu_id application_name = self.n2vc.FormatApplicationName( nsr_name, vnf_index, - vnfd['name'], + vdu_id_text ) if not nsr_lcm.get("VCA"): nsr_lcm["VCA"] = {} - nsr_lcm["VCA"][vnf_index] = db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = { + nsr_lcm["VCA"][application_name] = db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = { + "member-vnf-index": vnf_index, + "vdu_id": vdu_id, "model": model_name, "application": application_name, "operational-status": "init", @@ -1211,6 +1275,13 @@ class Lcm: self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm)) + if not n2vc_info: + n2vc_info["nsr_id"] = nsr_id + n2vc_info["nslcmop_id"] = nslcmop_id + n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop) + n2vc_info["lcmOperationType"] = "instantiate" + n2vc_info["deployed"] = nsr_lcm["VCA"] + n2vc_info["db_update"] = db_nsr_update task = asyncio.ensure_future( self.n2vc.DeployCharms( model_name, # The network service name @@ -1220,15 +1291,13 @@ class Lcm: params, # Runtime params, like mgmt ip {}, # for native charms only self.n2vc_callback, # Callback for status changes - db_nsr, # Callback parameter - db_nslcmop, - vnf_index, # Callback parameter + n2vc_info, # Callback parameter None, # Callback parameter (task) ) ) task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None, - db_nsr, db_nslcmop, vnf_index)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + vnf_index, task) + n2vc_info)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task) step = "Looking for needed vnfd to configure" self.logger.debug(logging_text + step) @@ -1243,58 +1312,108 @@ class Lcm: if vnf_config and vnf_config.get("juju"): proxy_charm = vnf_config["juju"]["charm"] - params = {} + config_primitive = None if proxy_charm: if 'initial-config-primitive' in vnf_config: - params['initial-config-primitive'] = vnf_config['initial-config-primitive'] + config_primitive = vnf_config['initial-config-primitive'] # Login to the VCA. If there are multiple calls to login(), # subsequent calls will be a nop and return immediately. step = "connecting to N2VC to configure vnf {}".format(vnf_index) await self.n2vc.login() - deploy() + deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info, config_primitive) number_to_configure += 1 # Deploy charms for each VDU that supports one. + vdu_index = 0 for vdu in vnfd['vdu']: vdu_config = vdu.get('vdu-configuration') proxy_charm = None - params = {} + config_primitive = None if vdu_config and vdu_config.get("juju"): proxy_charm = vdu_config["juju"]["charm"] if 'initial-config-primitive' in vdu_config: - params['initial-config-primitive'] = vdu_config['initial-config-primitive'] + config_primitive = vdu_config['initial-config-primitive'] if proxy_charm: step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index) await self.n2vc.login() - deploy() + deploy(vnf_index, vdu["id"], db_vnfrs[vnf_index]["vdur"][vdu_index]["ip-address"], + n2vc_info, config_primitive) number_to_configure += 1 + vdu_index += 1 + db_nsr_update["operational-status"] = "running" + configuration_failed = False if number_to_configure: - db_nsr_update["config-status"] = "configuring" - db_nsr_update["operational-status"] = "running" - db_nsr_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) - db_nslcmop_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) - else: - db_nslcmop_update["operationState"] = "COMPLETED" + old_status = "configuring: init: {}".format(number_to_configure) + db_nsr_update["config-status"] = old_status + db_nsr_update["detailed-status"] = old_status + db_nslcmop_update["detailed-status"] = old_status + + # wait until all are configured. + while True: + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + await n2vc_info["n2vc_event"].wait() + n2vc_info["n2vc_event"].clear() + all_active = True + status_map = {} + n2vc_error_text = [] # contain text error list. If empty no one is in error status + for _, vca_info in nsr_lcm["VCA"].items(): + vca_status = vca_info["operational-status"] + if vca_status not in status_map: + # Initialize it + status_map[vca_status] = 0 + status_map[vca_status] += 1 + + if vca_status != "active": + all_active = False + if vca_status in ("error", "blocked"): + n2vc_error_text.append( + "member_vnf_index={} vdu_id={} {}: {}".format(vca_info["member-vnf-index"], + vca_info["vdu_id"], vca_status, + vca_info["detailed-status"])) + + if all_active: + break + elif n2vc_error_text: + db_nsr_update["config-status"] = "failed" + error_text = "fail configuring " + ";".join(n2vc_error_text) + db_nsr_update["detailed-status"] = error_text + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP" + db_nslcmop_update["detailed-status"] = error_text + db_nslcmop_update["statusEnteredTime"] = time() + configuration_failed = True + break + else: + cs = "configuring: " + separator = "" + for status, num in status_map.items(): + cs += separator + "{}: {}".format(status, num) + separator = ", " + if old_status != cs: + db_nsr_update["config-status"] = cs + db_nsr_update["detailed-status"] = cs + db_nslcmop_update["detailed-status"] = cs + old_status = cs + + if not configuration_failed: + # all is done + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["config-status"] = "configured" db_nsr_update["detailed-status"] = "done" - db_nsr_update["operational-status"] = "running" - step = "Sending monitoring parameters to PM" + + # step = "Sending monitoring parameters to PM" # for c_vnf in nsd["constituent-vnfd"]: # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]]) - try: - await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - - self.logger.debug(logging_text + "Exit") return except (ROclient.ROClientException, DbException, LcmException) as e: @@ -1314,12 +1433,20 @@ class Lcm: db_nsr_update["operational-status"] = "failed" if db_nslcmop: db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" db_nslcmop_update["statusEnteredTime"] = time() if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") async def ns_terminate(self, nsr_id, nslcmop_id): @@ -1333,6 +1460,7 @@ class Lcm: vca_task_dict = {} db_nsr_update = {} db_nslcmop_update = {} + nslcmop_operation_state = None try: step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) @@ -1356,52 +1484,84 @@ class Lcm: db_nsr_update["detailed-status"] = "Deleting charms" self.logger.debug(logging_text + step) self.update_db_2("nsrs", nsr_id, db_nsr_update) - for vnf_index, deploy_info in nsr_lcm["VCA"].items(): - if deploy_info and deploy_info.get("application"): + for application_name, deploy_info in nsr_lcm["VCA"].items(): + if deploy_info: # TODO it would be desirable having a and deploy_info.get("deployed"): task = asyncio.ensure_future( self.n2vc.RemoveCharms( deploy_info['model'], - deploy_info['application'], + application_name, # self.n2vc_callback, # db_nsr, # db_nslcmop, - # vnf_index, ) ) vca_task_list.append(task) - vca_task_dict[vnf_index] = task + vca_task_dict[application_name] = task # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'], # deploy_info['application'], None, db_nsr, # db_nslcmop, vnf_index)) - self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task + self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + application_name] = task except Exception as e: self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e)) # remove from RO RO_fail = False RO = ROclient.ROClient(self.loop, **self.ro_config) + # Delete ns - if nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsr_id"): - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] - try: + RO_nsr_id = RO_delete_action = None + if nsr_lcm and nsr_lcm.get("RO"): + RO_nsr_id = nsr_lcm["RO"].get("nsr_id") + RO_delete_action = nsr_lcm["RO"].get("nsr_delete_action_id") + try: + if RO_nsr_id: step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO" self.logger.debug(logging_text + step) - await RO.delete("ns", RO_nsr_id) + desc = await RO.delete("ns", RO_nsr_id) + RO_delete_action = desc["action_id"] + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action db_nsr_update["_admin.deployed.RO.nsr_id"] = None db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" - except ROclient.ROClientException as e: - if e.http_code == 404: # not found - db_nsr_update["_admin.deployed.RO.nsr_id"] = None - db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" - self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id)) - elif e.http_code == 409: # conflict - failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) - self.logger.debug(logging_text + failed_detail[-1]) - RO_fail = True - else: - failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) - self.logger.error(logging_text + failed_detail[-1]) - RO_fail = True + if RO_delete_action: + # wait until NS is deleted from VIM + step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id) + detailed_status_old = None + self.logger.debug(logging_text + step) + + delete_timeout = 20 * 60 # 20 minutes + while delete_timeout > 0: + desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action", + extra_item_id=RO_delete_action) + ns_status, ns_status_info = RO.check_action_status(desc) + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + detailed_status = step + "; {}".format(ns_status_info) + elif ns_status == "ACTIVE": + break + else: + assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) + await asyncio.sleep(5, loop=self.loop) + delete_timeout -= 5 + if detailed_status != detailed_status_old: + detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + else: # delete_timeout <= 0: + raise ROclient.ROClientException("Timeout waiting ns deleted from VIM") + + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id)) + elif e.http_code == 409: # conflict + failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) + RO_fail = True + else: + failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) + self.logger.error(logging_text + failed_detail[-1]) + RO_fail = True # Delete nsd if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"): @@ -1452,31 +1612,32 @@ class Lcm: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) self.update_db_2("nsrs", nsr_id, db_nsr_update) await asyncio.wait(vca_task_list, timeout=300) - for vnf_index, task in vca_task_dict.items(): + for application_name, task in vca_task_dict.items(): if task.cancelled(): - failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index)) + failed_detail.append("VCA[{}] Deletion has been cancelled".format(application_name)) elif task.done(): exc = task.exception() if exc: - failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc)) + failed_detail.append("VCA[{}] Deletion exception: {}".format(application_name, exc)) else: - db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = None + db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = None else: # timeout # TODO Should it be cancelled?!! task.cancel() - failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index)) + failed_detail.append("VCA[{}] Deletion timeout".format(application_name)) if failed_detail: self.logger.error(logging_text + " ;".join(failed_detail)) db_nsr_update["operational-status"] = "failed" db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail) db_nslcmop_update["detailed-status"] = "; ".join(failed_detail) - db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" db_nslcmop_update["statusEnteredTime"] = time() elif db_nslcmop["operationParams"].get("autoremove"): self.db.del_one("nsrs", {"_id": nsr_id}) db_nsr_update.clear() self.db.del_list("nslcmops", {"nsInstanceId": nsr_id}) + nslcmop_operation_state = "COMPLETED" db_nslcmop_update.clear() self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id}) self.logger.debug(logging_text + "Delete from database") @@ -1485,13 +1646,8 @@ class Lcm: db_nsr_update["detailed-status"] = "Done" db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() - try: - await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - self.logger.debug(logging_text + "Exit") except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -1504,21 +1660,36 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) finally: if exc and db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") - async def _ns_execute_primitive(self, db_deployed, member_vnf_index, primitive, primitive_params): - vca_deployed = db_deployed["VCA"].get(member_vnf_index) + async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, primitive, primitive_params): + + vdu_id_text = "vnfd" + if vdu_id: + vdu_id_text = vdu_id + application_name = self.n2vc.FormatApplicationName( + nsr_name, + member_vnf_index, + vdu_id_text + ) + vca_deployed = db_deployed["VCA"].get(application_name) if not vca_deployed: - raise LcmException("charm for member_vnf_index={} is not deployed".format(member_vnf_index)) + raise LcmException("charm for member_vnf_index={} vdu_id={} is not deployed".format(member_vnf_index, + vdu_id)) model_name = vca_deployed.get("model") application_name = vca_deployed.get("application") if not model_name or not application_name: @@ -1568,24 +1739,26 @@ class Lcm: # get all needed from database db_nsr = None db_nslcmop = None - db_nslcmop_update = None + db_nslcmop_update = {} + nslcmop_operation_state = None exc = None try: step = "Getting information from database" db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) nsr_lcm = db_nsr["_admin"].get("deployed") + nsr_name = db_nsr["name"] vnf_index = db_nslcmop["operationParams"]["member_vnf_index"] + vdu_id = db_nslcmop["operationParams"].get("vdu_id") # TODO check if ns is in a proper status primitive = db_nslcmop["operationParams"]["primitive"] primitive_params = db_nslcmop["operationParams"]["primitive_params"] - result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, primitive, primitive_params) - db_nslcmop_update = { - "detailed-status": result_detail, - "operationState": result, - "statusEnteredTime": time() - } + result, result_detail = await self._ns_execute_primitive(nsr_lcm, nsr_name, vnf_index, vdu_id, primitive, + primitive_params) + db_nslcmop_update["detailed-status"] = result_detail + db_nslcmop_update["operationState"] = nslcmop_operation_state = result + db_nslcmop_update["statusEnteredTime"] = time() self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail)) return # database update is called inside finally @@ -1600,13 +1773,19 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) finally: if exc and db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + self.logger.debug(logging_text + "Exit") + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action") async def ns_scale(self, nsr_id, nslcmop_id): @@ -1616,6 +1795,7 @@ class Lcm: db_nsr = None db_nslcmop = None db_nslcmop_update = {} + nslcmop_operation_state = None db_nsr_update = {} exc = None try: @@ -1801,8 +1981,7 @@ class Lcm: raise ROclient.ROClientException("Timeout waiting ns to be ready") step = "Updating VNFRs" - # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info) - self.ns_update_vnfr_2({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) + self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) # update VDU_SCALING_INFO with the obtained ip_addresses if vdu_scaling_info["scaling_direction"] == "OUT": @@ -1857,19 +2036,11 @@ class Lcm: if result == "FAILED": raise LcmException(result_detail) - db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["detailed-status"] = "done" db_nsr_update["operational-status"] = "running" - try: - await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - # if cooldown_time: - # await asyncio.sleep(cooldown_time) - # await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - self.logger.debug(logging_text + "Exit Ok") return except (ROclient.ROClientException, DbException, LcmException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -1883,11 +2054,9 @@ class Lcm: finally: if exc: if db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nsr: db_nsr_update["operational-status"] = "FAILED {}: {}".format(step, exc), db_nsr_update["detailed-status"] = "failed" @@ -1895,6 +2064,16 @@ class Lcm: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + # if cooldown_time: + # await asyncio.sleep(cooldown_time) + # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") async def test(self, param=None):