X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fnetslice.py;h=3a8002ce24bbf78c7349757ff5cabbb7f9592b89;hb=b4ebcd0dd81e08d7294156c2aca10d84ca9bb824;hp=6f742c295001b87c816d95f189f7051840e3ac4a;hpb=2e2155173c047ac2acb8a626019eda2d87f785ed;p=osm%2FLCM.git diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py index 6f742c2..3a8002c 100644 --- a/osm_lcm/netslice.py +++ b/osm_lcm/netslice.py @@ -1,184 +1,610 @@ # -*- coding: utf-8 -*- +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +## import asyncio import logging import logging.handlers import traceback -import ns -from lcm_utils import LcmException, LcmBase +from osm_lcm import ROclient +from osm_lcm.lcm_utils import ( + LcmException, + LcmBase, + populate_dict, + get_iterable, + deep_get, +) from osm_common.dbbase import DbException from time import time +from copy import deepcopy + __author__ = "Felipe Vicens, Pol Alemany, Alfonso Tierno" class NetsliceLcm(LcmBase): - def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop): + timeout_nsi_deploy = 2 * 3600 # default global timeout for deployment a nsi + + def __init__(self, msg, lcm_tasks, config, loop, ns): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', :return: None """ # logging - self.logger = logging.getLogger('lcm.netslice') + self.logger = logging.getLogger("lcm.netslice") self.loop = loop self.lcm_tasks = lcm_tasks - self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, ro_config, vca_config, loop) + self.ns = ns + self.ro_config = config["ro_config"] + self.timeout = config["timeout"] + + super().__init__(msg, self.logger) + + def nsi_update_nsir(self, nsi_update_nsir, db_nsir, nsir_desc_RO): + """ + Updates database nsir with the RO info for the created vld + :param nsi_update_nsir: dictionary to be filled with the updated info + :param db_nsir: content of db_nsir. This is also modified + :param nsir_desc_RO: nsir descriptor from RO + :return: Nothing, LcmException is raised on errors + """ - super().__init__(db, msg, fs, self.logger) + for vld_index, vld in enumerate(get_iterable(db_nsir, "vld")): + for net_RO in get_iterable(nsir_desc_RO, "nets"): + if vld["id"] != net_RO.get("ns_net_osm_id"): + continue + vld["vim-id"] = net_RO.get("vim_net_id") + vld["name"] = net_RO.get("vim_name") + vld["status"] = net_RO.get("status") + vld["status-detailed"] = net_RO.get("error_msg") + nsi_update_nsir["vld.{}".format(vld_index)] = vld + break + else: + raise LcmException( + "ns_update_nsir: Not found vld={} at RO info".format(vld["id"]) + ) - # TODO: check logging_text within the self.logger.info/debug async def instantiate(self, nsir_id, nsilcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("nsi", "nsilcmops", nsilcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database - exc = None + exc = None db_nsir = None db_nsilcmop = None db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} db_nsilcmop_update = {} nsilcmop_operation_state = None + vim_2_RO = {} + RO = ROclient.ROClient(self.loop, **self.ro_config) + nsi_vld_instantiationi_params = {} + + def ip_profile_2_RO(ip_profile): + RO_ip_profile = deepcopy((ip_profile)) + if "dns-server" in RO_ip_profile: + if isinstance(RO_ip_profile["dns-server"], list): + RO_ip_profile["dns-address"] = [] + for ds in RO_ip_profile.pop("dns-server"): + RO_ip_profile["dns-address"].append(ds["address"]) + else: + RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server") + if RO_ip_profile.get("ip-version") == "ipv4": + RO_ip_profile["ip-version"] = "IPv4" + if RO_ip_profile.get("ip-version") == "ipv6": + RO_ip_profile["ip-version"] = "IPv6" + if "dhcp-params" in RO_ip_profile: + RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params") + return RO_ip_profile + + def vim_account_2_RO(vim_account): + """ + Translate a RO vim_account from OSM vim_account params + :param ns_params: OSM instantiate params + :return: The RO ns descriptor + """ + if vim_account in vim_2_RO: + return vim_2_RO[vim_account] + + db_vim = self.db.get_one("vim_accounts", {"_id": vim_account}) + if db_vim["_admin"]["operationalState"] != "ENABLED": + raise LcmException( + "VIM={} is not available. operationalState={}".format( + vim_account, db_vim["_admin"]["operationalState"] + ) + ) + RO_vim_id = db_vim["_admin"]["deployed"]["RO"] + vim_2_RO[vim_account] = RO_vim_id + return RO_vim_id + + async def netslice_scenario_create( + self, vld_item, nsir_id, db_nsir, db_nsir_admin, db_nsir_update + ): + """ + Create a network slice VLD through RO Scenario + :param vld_id The VLD id inside nsir to be created + :param nsir_id The nsir id + """ + nonlocal nsi_vld_instantiationi_params + ip_vld = None + mgmt_network = False + RO_vld_sites = [] + vld_id = vld_item["id"] + netslice_vld = vld_item + # logging_text = "Task netslice={} instantiate_vld={} ".format(nsir_id, vld_id) + # self.logger.debug(logging_text + "Enter") + + vld_shared = None + for shared_nsrs_item in get_iterable(vld_item, "shared-nsrs-list"): + _filter = { + "_id.ne": nsir_id, + "_admin.nsrs-detailed-list.ANYINDEX.nsrId": shared_nsrs_item, + } + shared_nsi = self.db.get_one( + "nsis", _filter, fail_on_empty=False, fail_on_more=False + ) + if shared_nsi: + for vlds in get_iterable(shared_nsi["_admin"]["deployed"], "RO"): + if vld_id == vlds["vld_id"]: + vld_shared = { + "instance_scenario_id": vlds["netslice_scenario_id"], + "osm_id": vld_id, + } + break + break + + # Creating netslice-vld at RO + RO_nsir = deep_get(db_nsir, ("_admin", "deployed", "RO"), []) + + if vld_id in RO_nsir: + db_nsir_update["_admin.deployed.RO"] = RO_nsir + + # If netslice-vld doesn't exists then create it + else: + # TODO: Check VDU type in all descriptors finding SRIOV / PT + # Updating network names and datacenters from instantiation parameters for each VLD + for instantiation_params_vld in get_iterable( + db_nsir["instantiation_parameters"], "netslice-vld" + ): + if instantiation_params_vld.get("name") == netslice_vld["name"]: + ip_vld = deepcopy(instantiation_params_vld) + ip_vld.pop("name") + nsi_vld_instantiationi_params[netslice_vld["name"]] = ip_vld + + db_nsir_update_RO = {} + db_nsir_update_RO["vld_id"] = netslice_vld["name"] + if self.ro_config["ng"]: + db_nsir_update_RO["netslice_scenario_id"] = ( + vld_shared.get("instance_scenario_id") + if vld_shared + else "nsir:{}:vld.{}".format(nsir_id, netslice_vld["name"]) + ) + else: # if not self.ro_config["ng"]: + if netslice_vld.get("mgmt-network"): + mgmt_network = True + RO_ns_params = {} + RO_ns_params["name"] = netslice_vld["name"] + RO_ns_params["datacenter"] = vim_account_2_RO( + db_nsir["instantiation_parameters"]["vimAccountId"] + ) + + # Creating scenario if vim-network-name / vim-network-id are present as instantiation parameter + # Use vim-network-id instantiation parameter + vim_network_option = None + if ip_vld: + if ip_vld.get("vim-network-id"): + vim_network_option = "vim-network-id" + elif ip_vld.get("vim-network-name"): + vim_network_option = "vim-network-name" + if ip_vld.get("ip-profile"): + populate_dict( + RO_ns_params, + ("networks", netslice_vld["name"], "ip-profile"), + ip_profile_2_RO(ip_vld["ip-profile"]), + ) + + if vim_network_option: + if ip_vld.get(vim_network_option): + if isinstance(ip_vld.get(vim_network_option), list): + for vim_net_id in ip_vld.get(vim_network_option): + for vim_account, vim_net in vim_net_id.items(): + RO_vld_sites.append( + { + "netmap-use": vim_net, + "datacenter": vim_account_2_RO( + vim_account + ), + } + ) + elif isinstance(ip_vld.get(vim_network_option), dict): + for vim_account, vim_net in ip_vld.get( + vim_network_option + ).items(): + RO_vld_sites.append( + { + "netmap-use": vim_net, + "datacenter": vim_account_2_RO(vim_account), + } + ) + else: + RO_vld_sites.append( + { + "netmap-use": ip_vld[vim_network_option], + "datacenter": vim_account_2_RO( + netslice_vld["vimAccountId"] + ), + } + ) + + # Use default netslice vim-network-name from template + else: + for nss_conn_point_ref in get_iterable( + netslice_vld, "nss-connection-point-ref" + ): + if nss_conn_point_ref.get("vimAccountId"): + if ( + nss_conn_point_ref["vimAccountId"] + != netslice_vld["vimAccountId"] + ): + RO_vld_sites.append( + { + "netmap-create": None, + "datacenter": vim_account_2_RO( + nss_conn_point_ref["vimAccountId"] + ), + } + ) + + if vld_shared: + populate_dict( + RO_ns_params, + ("networks", netslice_vld["name"], "use-network"), + vld_shared, + ) + + if RO_vld_sites: + populate_dict( + RO_ns_params, + ("networks", netslice_vld["name"], "sites"), + RO_vld_sites, + ) + + RO_ns_params["scenario"] = { + "nets": [ + { + "name": netslice_vld["name"], + "external": mgmt_network, + "type": "bridge", + } + ] + } + + # self.logger.debug(logging_text + step) + desc = await RO.create("ns", descriptor=RO_ns_params) + db_nsir_update_RO["netslice_scenario_id"] = desc["uuid"] + db_nsir_update["_admin.deployed.RO"].append(db_nsir_update_RO) + + def overwrite_nsd_params(self, db_nsir, nslcmop): + nonlocal nsi_vld_instantiationi_params + nonlocal db_nsir_update + vld_op_list = [] + vld = None + nsr_id = nslcmop.get("nsInstanceId") + # Overwrite instantiation parameters in netslice runtime + RO_list = db_nsir_admin["deployed"]["RO"] + + for ro_item_index, RO_item in enumerate(RO_list): + netslice_vld = next( + ( + n + for n in get_iterable(db_nsir["_admin"], "netslice-vld") + if RO_item.get("vld_id") == n.get("id") + ), + None, + ) + if not netslice_vld: + continue + # if is equal vld of _admin with vld of netslice-vld then go for the CPs + # Search the cp of netslice-vld that match with nst:netslice-subnet + for nss_cp_item in get_iterable( + netslice_vld, "nss-connection-point-ref" + ): + # Search the netslice-subnet of nst that match + nss = next( + ( + nss + for nss in get_iterable( + db_nsir["_admin"], "netslice-subnet" + ) + if nss_cp_item["nss-ref"] == nss["nss-id"] + ), + None, + ) + # Compare nss-ref equal nss from nst + if not nss: + continue + db_nsds = self.db.get_one("nsds", {"_id": nss["nsdId"]}) + # Go for nsd, and search the CP that match with nst:CP to get vld-id-ref + for cp_nsd in db_nsds.get("sapd", ()): + if cp_nsd["id"] == nss_cp_item["nsd-connection-point-ref"]: + if nslcmop.get("operationParams"): + if ( + nslcmop["operationParams"].get("nsName") + == nss["nsName"] + ): + vld_id = RO_item["vld_id"] + netslice_scenario_id = RO_item[ + "netslice_scenario_id" + ] + nslcmop_vld = {} + nslcmop_vld["name"] = cp_nsd["virtual-link-desc"] + for vld in get_iterable( + nslcmop["operationParams"], "vld" + ): + if vld["name"] == cp_nsd["virtual-link-desc"]: + nslcmop_vld.update(vld) + if self.ro_config["ng"]: + nslcmop_vld["common_id"] = netslice_scenario_id + nslcmop_vld.update( + nsi_vld_instantiationi_params.get( + RO_item["vld_id"], {} + ) + ) + else: + nslcmop_vld["ns-net"] = { + vld_id: netslice_scenario_id + } + vld_op_list.append(nslcmop_vld) + nslcmop["operationParams"]["vld"] = vld_op_list + self.update_db_2( + "nslcmops", nslcmop["_id"], {"operationParams.vld": vld_op_list} + ) + return nsr_id, nslcmop try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("nsi", "nsilcmops", nsilcmop_id) + step = "Getting nsir={} from db".format(nsir_id) db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) step = "Getting nsilcmop={} from db".format(nsilcmop_id) db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) - - # look if previous tasks is in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) - if task_dependency: - step = db_nsilcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - - # Empty list to keep track of network service records status in the netslice - nsir_admin = db_nsir["_admin"] - nsir_admin["nsrs-detailed-list"] = [] + start_deploy = time() + nsi_params = db_nsilcmop.get("operationParams") + if nsi_params and nsi_params.get("timeout_nsi_deploy"): + timeout_nsi_deploy = nsi_params["timeout_nsi_deploy"] + else: + timeout_nsi_deploy = self.timeout.get( + "nsi_deploy", self.timeout_nsi_deploy + ) + # Empty list to keep track of network service records status in the netslice + nsir_admin = db_nsir_admin = db_nsir.get("_admin") + + step = "Creating slice operational-status init" # Slice status Creating db_nsir_update["detailed-status"] = "creating" db_nsir_update["operational-status"] = "init" - self.update_db_2("nsis", nsir_id, db_nsir_update) + db_nsir_update["_admin.nsiState"] = "INSTANTIATED" + + step = "Instantiating netslice VLDs before NS instantiation" + # Creating netslice VLDs networking before NS instantiation + db_nsir_update["detailed-status"] = step + self.update_db_2("nsis", nsir_id, db_nsir_update) + db_nsir_update["_admin.deployed.RO"] = db_nsir_admin["deployed"]["RO"] + for vld_item in get_iterable(nsir_admin, "netslice-vld"): + await netslice_scenario_create( + self, vld_item, nsir_id, db_nsir, db_nsir_admin, db_nsir_update + ) + + step = "Instantiating netslice subnets" + db_nsir_update["detailed-status"] = step + self.update_db_2("nsis", nsir_id, db_nsir_update) + + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + + # Check status of the VLDs and wait for creation + # netslice_scenarios = db_nsir["_admin"]["deployed"]["RO"] + # db_nsir_update_RO = deepcopy(netslice_scenarios) + # for netslice_scenario in netslice_scenarios: + # await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"], + # nsir_id, db_nsir_update_RO) + + # db_nsir_update["_admin.deployed.RO"] = db_nsir_update_RO + # self.update_db_2("nsis", nsir_id, db_nsir_update) # Iterate over the network services operation ids to instantiate NSs - # TODO: (future improvement) look another way check the tasks instead of keep asking - # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives - # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) - # ns_tasks = [] + step = "Instantiating Netslice Subnets" + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") for nslcmop_id in nslcmop_ids: nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) - nsr_id = nslcmop.get("nsInstanceId") - step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop) + # Overwriting netslice-vld vim-net-id to ns + nsr_id, nslcmop = overwrite_nsd_params(self, db_nsir, nslcmop) + step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop_id) task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "ns_instantiate", task + ) # Wait until Network Slice is ready - step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id) + step = " Waiting nsi ready." nsrs_detailed_list_old = None self.logger.debug(logging_text + step) - - # TODO: substitute while for await (all task to be done or not) - deployment_timeout = 2 * 3600 # Two hours - while deployment_timeout > 0: + + # For HA, it is checked from database, as the ns operation may be managed by other LCM worker + while time() <= start_deploy + timeout_nsi_deploy: # Check ns instantiation status nsi_ready = True - nsrs_detailed_list = [] + nsir = self.db.get_one("nsis", {"_id": nsir_id}) + nsrs_detailed_list = nsir["_admin"]["nsrs-detailed-list"] + nsrs_detailed_list_new = [] for nslcmop_item in nslcmop_ids: nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) status = nslcmop.get("operationState") # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK - nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], - "detailed-status": - nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) - if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: + for nss in nsrs_detailed_list: + if nss["nsrId"] == nslcmop["nsInstanceId"]: + nss.update( + { + "nsrId": nslcmop["nsInstanceId"], + "status": nslcmop["operationState"], + "detailed-status": nslcmop.get("detailed-status"), + "instantiated": True, + } + ) + nsrs_detailed_list_new.append(nss) + if status not in [ + "COMPLETED", + "PARTIALLY_COMPLETED", + "FAILED", + "FAILED_TEMP", + ]: nsi_ready = False - if nsrs_detailed_list != nsrs_detailed_list_old: - nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list - nsrs_detailed_list_old = nsrs_detailed_list - db_nsir_update["_admin"] = nsir_admin - self.update_db_2("nsis", nsir_id, db_nsir_update) + if nsrs_detailed_list_new != nsrs_detailed_list_old: + nsrs_detailed_list_old = nsrs_detailed_list_new + self.update_db_2( + "nsis", + nsir_id, + {"_admin.nsrs-detailed-list": nsrs_detailed_list_new}, + ) if nsi_ready: - step = "Network Slice Instance is ready. nsi_id={}".format(nsir_id) - for items in nsrs_detailed_list: - if "FAILED" in items.values(): - raise LcmException("Error deploying NSI: {}".format(nsir_id)) + error_list = [] + step = "Network Slice Instance instantiated" + for nss in nsrs_detailed_list: + if nss["status"] in ("FAILED", "FAILED_TEMP"): + error_list.append( + "NS {} {}: {}".format( + nss["nsrId"], nss["status"], nss["detailed-status"] + ) + ) + if error_list: + step = "instantiating" + raise LcmException("; ".join(error_list)) break - + # TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300) await asyncio.sleep(5, loop=self.loop) - deployment_timeout -= 5 - - if deployment_timeout <= 0: - raise LcmException("Timeout waiting nsi to be ready. nsi_id={}".format(nsir_id)) + + else: # timeout_nsi_deploy reached: + raise LcmException("Timeout waiting nsi to be ready.") db_nsir_update["operational-status"] = "running" db_nsir_update["detailed-status"] = "done" db_nsir_update["config-status"] = "configured" - db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" + db_nsilcmop_update[ + "operationState" + ] = nsilcmop_operation_state = "COMPLETED" db_nsilcmop_update["statusEnteredTime"] = time() db_nsilcmop_update["detailed-status"] = "done" return except (LcmException, DbException) as e: - self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + self.logger.error( + logging_text + "Exit Exception while '{}': {}".format(step, e) + ) exc = e except asyncio.CancelledError: - self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) exc = "Operation was cancelled" except Exception as e: exc = traceback.format_exc() - self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), - exc_info=True) + self.logger.critical( + logging_text + + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), + exc_info=True, + ) finally: if exc: if db_nsir: db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) db_nsir_update["operational-status"] = "failed" + db_nsir_update["config-status"] = "configured" if db_nsilcmop: - db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" + db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format( + step, exc + ) + db_nsilcmop_update[ + "operationState" + ] = nsilcmop_operation_state = "FAILED" db_nsilcmop_update["statusEnteredTime"] = time() - if db_nsir: - db_nsir_update["_admin.nsiState"] = "INSTANTIATED" - db_nsir_update["_admin.nsilcmop"] = None - self.update_db_2("nsis", nsir_id, db_nsir_update) - if db_nsilcmop: - - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + try: + if db_nsir: + db_nsir_update["_admin.nsilcmop"] = None + self.update_db_2("nsis", nsir_id, db_nsir_update) + if db_nsilcmop: + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + except DbException as e: + self.logger.error(logging_text + "Cannot update database: {}".format(e)) if nsilcmop_operation_state: try: - await self.msg.aiowrite("nsi", "instantiated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, - "operationState": nsilcmop_operation_state}) + await self.msg.aiowrite( + "nsi", + "instantiated", + { + "nsir_id": nsir_id, + "nsilcmop_id": nsilcmop_id, + "operationState": nsilcmop_operation_state, + }, + ) except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate") async def terminate(self, nsir_id, nsilcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("nsi", "nsilcmops", nsilcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id) self.logger.debug(logging_text + "Enter") - exc = None + exc = None db_nsir = None db_nsilcmop = None db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} db_nsilcmop_update = {} + RO = ROclient.ROClient(self.loop, **self.ro_config) + nsir_deployed = None + failed_detail = [] # annotates all failed error messages nsilcmop_operation_state = None + autoremove = False # autoremove after terminated try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("nsi", "nsilcmops", nsilcmop_id) + step = "Getting nsir={} from db".format(nsir_id) db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + nsir_deployed = deepcopy(db_nsir["_admin"].get("deployed")) step = "Getting nsilcmop={} from db".format(nsilcmop_id) db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) - + # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate # CASE: Instance was terminated but there is a second request to terminate the instance if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED": @@ -187,113 +613,262 @@ class NetsliceLcm(LcmBase): # Slice status Terminating db_nsir_update["operational-status"] = "terminating" db_nsir_update["config-status"] = "terminating" + db_nsir_update["detailed-status"] = "Terminating Netslice subnets" self.update_db_2("nsis", nsir_id, db_nsir_update) - # look if previous tasks is in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) - if task_dependency: - step = db_nsilcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - # Gets the list to keep track of network service records status in the netslice - nsir_admin = db_nsir["_admin"] - nsrs_detailed_list = [] + nsrs_detailed_list = [] # Iterate over the network services operation ids to terminate NSs - # TODO: (future improvement) look another way check the tasks instead of keep asking + # TODO: (future improvement) look another way check the tasks instead of keep asking # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) - # ns_tasks = [] + step = "Terminating Netslice Subnets" nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") + nslcmop_new = [] for nslcmop_id in nslcmop_ids: nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) nsr_id = nslcmop["operationParams"].get("nsInstanceId") - task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + nss_in_use = self.db.get_list( + "nsis", + { + "_admin.netslice-vld.ANYINDEX.shared-nsrs-list": nsr_id, + "operational-status": {"$nin": ["terminated", "failed"]}, + }, + ) + if len(nss_in_use) < 2: + task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "ns_instantiate", task + ) + nslcmop_new.append(nslcmop_id) + else: + # Update shared nslcmop shared with active nsi + netsliceInstanceId = db_nsir["_id"] + for nsis_item in nss_in_use: + if db_nsir["_id"] != nsis_item["_id"]: + netsliceInstanceId = nsis_item["_id"] + break + self.db.set_one( + "nslcmops", + {"_id": nslcmop_id}, + {"operationParams.netsliceInstanceId": netsliceInstanceId}, + ) + self.db.set_one( + "nsilcmops", + {"_id": nsilcmop_id}, + {"operationParams.nslcmops_ids": nslcmop_new}, + ) # Wait until Network Slice is terminated - step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id) + step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format( + nsir_id + ) nsrs_detailed_list_old = None self.logger.debug(logging_text + step) - - termination_timeout = 2 * 3600 # Two hours + + termination_timeout = 2 * 3600 # Two hours while termination_timeout > 0: # Check ns termination status nsi_ready = True - nsrs_detailed_list = [] + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + nsrs_detailed_list = db_nsir["_admin"].get("nsrs-detailed-list") + nsrs_detailed_list_new = [] for nslcmop_item in nslcmop_ids: nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) status = nslcmop["operationState"] - # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK - nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], - "detailed-status": - nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) - if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: + # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK + for nss in nsrs_detailed_list: + if nss["nsrId"] == nslcmop["nsInstanceId"]: + nss.update( + { + "nsrId": nslcmop["nsInstanceId"], + "status": nslcmop["operationState"], + "detailed-status": nsir_status_detailed + + "; {}".format(nslcmop.get("detailed-status")), + } + ) + nsrs_detailed_list_new.append(nss) + if status not in [ + "COMPLETED", + "PARTIALLY_COMPLETED", + "FAILED", + "FAILED_TEMP", + ]: nsi_ready = False - if nsrs_detailed_list != nsrs_detailed_list_old: - nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list - nsrs_detailed_list_old = nsrs_detailed_list - db_nsir_update["_admin"] = nsir_admin - self.update_db_2("nsis", nsir_id, db_nsir_update) - + if nsrs_detailed_list_new != nsrs_detailed_list_old: + nsrs_detailed_list_old = nsrs_detailed_list_new + self.update_db_2( + "nsis", + nsir_id, + {"_admin.nsrs-detailed-list": nsrs_detailed_list_new}, + ) + if nsi_ready: - step = "Network Slice Instance is terminated. nsi_id={}".format(nsir_id) + # Check if it is the last used nss and mark isinstantiate: False + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + nsrs_detailed_list = db_nsir["_admin"].get("nsrs-detailed-list") + for nss in nsrs_detailed_list: + _filter = { + "_admin.nsrs-detailed-list.ANYINDEX.nsrId": nss["nsrId"], + "operational-status.ne": "terminated", + "_id.ne": nsir_id, + } + nsis_list = self.db.get_one( + "nsis", _filter, fail_on_empty=False, fail_on_more=False + ) + if not nsis_list: + nss.update({"instantiated": False}) + + step = "Network Slice Instance is terminated. nsi_id={}".format( + nsir_id + ) for items in nsrs_detailed_list: if "FAILED" in items.values(): - raise LcmException("Error terminating NSI: {}".format(nsir_id)) + raise LcmException( + "Error terminating NSI: {}".format(nsir_id) + ) break await asyncio.sleep(5, loop=self.loop) termination_timeout -= 5 if termination_timeout <= 0: - raise LcmException("Timeout waiting nsi to be terminated. nsi_id={}".format(nsir_id)) + raise LcmException( + "Timeout waiting nsi to be terminated. nsi_id={}".format(nsir_id) + ) + + # Delete netslice-vlds + RO_nsir_id = RO_delete_action = None + for nsir_deployed_RO in get_iterable(nsir_deployed, "RO"): + RO_nsir_id = nsir_deployed_RO.get("netslice_scenario_id") + try: + if not self.ro_config["ng"]: + step = db_nsir_update[ + "detailed-status" + ] = "Deleting netslice-vld at RO" + db_nsilcmop_update[ + "detailed-status" + ] = "Deleting netslice-vld at RO" + self.logger.debug(logging_text + step) + desc = await RO.delete("ns", RO_nsir_id) + RO_delete_action = desc["action_id"] + nsir_deployed_RO["vld_delete_action_id"] = RO_delete_action + nsir_deployed_RO["vld_status"] = "DELETING" + db_nsir_update["_admin.deployed"] = nsir_deployed + self.update_db_2("nsis", nsir_id, db_nsir_update) + if RO_delete_action: + # wait until NS is deleted from VIM + step = "Waiting ns deleted from VIM. RO_id={}".format( + RO_nsir_id + ) + self.logger.debug(logging_text + step) + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + nsir_deployed_RO["vld_id"] = None + nsir_deployed_RO["vld_status"] = "DELETED" + self.logger.debug( + logging_text + + "RO_ns_id={} already deleted".format(RO_nsir_id) + ) + elif e.http_code == 409: # conflict + failed_detail.append( + "RO_ns_id={} delete conflict: {}".format(RO_nsir_id, e) + ) + self.logger.debug(logging_text + failed_detail[-1]) + else: + failed_detail.append( + "RO_ns_id={} delete error: {}".format(RO_nsir_id, e) + ) + self.logger.error(logging_text + failed_detail[-1]) + + if failed_detail: + self.logger.error(logging_text + " ;".join(failed_detail)) + db_nsir_update["operational-status"] = "failed" + db_nsir_update["detailed-status"] = "Deletion errors " + "; ".join( + failed_detail + ) + db_nsilcmop_update["detailed-status"] = "; ".join(failed_detail) + db_nsilcmop_update[ + "operationState" + ] = nsilcmop_operation_state = "FAILED" + db_nsilcmop_update["statusEnteredTime"] = time() + else: + db_nsir_update["operational-status"] = "terminating" + db_nsir_update["config-status"] = "terminating" + db_nsir_update["_admin.nsiState"] = "NOT_INSTANTIATED" + db_nsilcmop_update[ + "operationState" + ] = nsilcmop_operation_state = "COMPLETED" + db_nsilcmop_update["statusEnteredTime"] = time() + if db_nsilcmop["operationParams"].get("autoremove"): + autoremove = True + db_nsir_update["detailed-status"] = "done" db_nsir_update["operational-status"] = "terminated" - db_nsir_update["config-status"] = "configured" - db_nsir_update["detailed-status"] = "done" - db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" + db_nsir_update["config-status"] = "terminated" db_nsilcmop_update["statusEnteredTime"] = time() db_nsilcmop_update["detailed-status"] = "done" return except (LcmException, DbException) as e: - self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + self.logger.error( + logging_text + "Exit Exception while '{}': {}".format(step, e) + ) exc = e except asyncio.CancelledError: - self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) exc = "Operation was cancelled" except Exception as e: exc = traceback.format_exc() - self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), - exc_info=True) + self.logger.critical( + logging_text + + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), + exc_info=True, + ) finally: if exc: if db_nsir: + db_nsir_update["_admin.deployed"] = nsir_deployed db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) db_nsir_update["operational-status"] = "failed" if db_nsilcmop: - db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" + db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format( + step, exc + ) + db_nsilcmop_update[ + "operationState" + ] = nsilcmop_operation_state = "FAILED" db_nsilcmop_update["statusEnteredTime"] = time() - if db_nsir: - db_nsir_update["_admin.nsilcmop"] = None - db_nsir_update["_admin.nsiState"] = "TERMINATED" - self.update_db_2("nsis", nsir_id, db_nsir_update) - if db_nsilcmop: - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + try: + if db_nsir: + db_nsir_update["_admin.deployed"] = nsir_deployed + db_nsir_update["_admin.nsilcmop"] = None + self.update_db_2("nsis", nsir_id, db_nsir_update) + if db_nsilcmop: + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + except DbException as e: + self.logger.error(logging_text + "Cannot update database: {}".format(e)) if nsilcmop_operation_state: try: - await self.msg.aiowrite("nsi", "terminated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, - "operationState": nsilcmop_operation_state}) + await self.msg.aiowrite( + "nsi", + "terminated", + { + "nsir_id": nsir_id, + "nsilcmop_id": nsilcmop_id, + "operationState": nsilcmop_operation_state, + "autoremove": autoremove, + }, + loop=self.loop, + ) except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_terminate")