From 22f4f9c3f2fef11377202b95fe2333b78255f8de Mon Sep 17 00:00:00 2001 From: tierno Date: Mon, 11 Jun 2018 18:53:39 +0200 Subject: [PATCH] VDU scaling Change-Id: I30b7ef1839c08c1730463639c26cd5329e871878 Signed-off-by: tierno --- osm_lcm/ROclient.py | 283 ++++++++++--- osm_lcm/__init__.py | 4 +- osm_lcm/lcm.py | 984 +++++++++++++++++++++++++++++++++----------- 3 files changed, 960 insertions(+), 311 deletions(-) diff --git a/osm_lcm/ROclient.py b/osm_lcm/ROclient.py index 429d9f5..fe12f8b 100644 --- a/osm_lcm/ROclient.py +++ b/osm_lcm/ROclient.py @@ -296,12 +296,60 @@ class ROClient: else: return "BUILD", "VMs: {}/{}, networks: {}/{}".format(vm_done, vm_total, net_done, net_total) + @staticmethod + def check_action_status(action_descriptor): + """ + Inspect RO instance descriptor and indicates the status + :param action_descriptor: action instance descriptor obtained with self.show("ns", "action") + :return: status, message: status can be BUILD,ACTIVE,ERROR, message is a text message + """ + net_total = 0 + vm_total = 0 + net_done = 0 + vm_done = 0 + other_total = 0 + other_done = 0 + + for vim_action_set in action_descriptor["actions"]: + for vim_action in vim_action_set["vim_actions"]: + if vim_action["item"] == "instance_vms": + vm_total += 1 + elif vim_action["item"] == "instance_nets": + net_total += 1 + else: + other_total += 1 + if vim_action["status"] == "FAILED": + return "ERROR", vim_action["error_msg"] + elif vim_action["status"] in ("DONE", "SUPERSEDED"): + if vim_action["item"] == "instance_vms": + vm_done += 1 + elif vim_action["item"] == "instance_nets": + net_done += 1 + else: + other_done += 1 + + if net_total == net_done and vm_total == vm_done and other_total == other_done: + return "ACTIVE", "VMs {}, networks: {}, other: {} ".format(vm_total, net_total, other_total) + else: + return "BUILD", "VMs: {}/{}, networks: {}/{}, other: {}/{}".format(vm_done, vm_total, net_done, net_total, + other_done, other_total) + @staticmethod def get_ns_vnf_info(ns_descriptor): """ Get a dict with the VIM_id, ip_addresses, mac_addresses of every vnf and vdu :param ns_descriptor: instance descriptor obtained with self.show("ns", ) - :return: dict with {: {ip_address: XXXX, vdur:{ip_address: XXX, vim_id: XXXX}}} + :return: dict with: + : + ip_address: XXXX, + vdur: + : + ip_address: XXX + vim_id: XXXX + interfaces: + : + ip_address: XXX + mac_address: XXX """ ns_info = {} for vnf in ns_descriptor["vnfs"]: @@ -315,13 +363,18 @@ class ROClient: for vm in vnf["vms"]: vdur = { "vim_id": vm.get("vim_vm_id"), - "ip_address": vm.get("ip_address") + "ip_address": vm.get("ip_address"), + "interfaces": {} } for iface in vm["interfaces"]: if iface.get("type") == "mgmt" and not iface.get("ip_address"): raise ROClientException("ns member_vnf_index '{}' vm '{}' management interface '{}' has no IP " "address".format(vnf["member_vnf_index"], vm["vdu_osm_id"], iface["external_name"]), http_code=409) + vdur["interfaces"][iface["internal_name"]] = {"ip_address": iface.get("ip_address"), + "mac_address": iface.get("mac_address"), + "vim_id": iface.get("vim_interface_id"), + } vnfr_info["vdur"][vm["vdu_osm_id"]] = vdur ns_info[str(vnf["member_vnf_index"])] = vnfr_info return ns_info @@ -372,7 +425,7 @@ class ROClient: raise ROClientException("No {} found with name '{}'".format(item[:-1], item_id_name), http_code=404) return uuid - async def _get_item(self, session, item, item_id_name, all_tenants=False): + async def _get_item(self, session, item, item_id_name, extra_item=None, extra_item_id=None, all_tenants=False): if all_tenants: tenant_text = "/any" elif all_tenants is None: @@ -389,6 +442,10 @@ class ROClient: uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants) url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + if extra_item: + url += "/" + extra_item + if extra_item_id: + url += "/" + extra_item_id self.logger.debug("GET %s", url) with aiohttp.Timeout(self.timeout_short): async with session.get(url, headers=self.headers_req) as response: @@ -444,7 +501,7 @@ class ROClient: if not action: action = "" else: - action = "/".format(action) + action = "/{}".format(action) url = "{}{apiver}{tenant}/{item}{id}{action}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text, item=item, id=uuid, action=action) @@ -534,6 +591,34 @@ class ROClient: raise ROClientException(response_text, http_code=response.status) return self._parse_yaml(response_text, response=True) + async def get_version(self): + """ + Obtain RO server version. + :return: a list with integers ["major", "minor", "release"]. Raises ROClientException on Error, + """ + try: + with aiohttp.ClientSession(loop=self.loop) as session: + url = "{}/version".format(self.endpoint_url) + self.logger.debug("RO GET %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + for word in str(response_text).split(" "): + if "." in word: + version_text, _, _ = word.partition("-") + return list(map(int, version_text.split("."))) + raise ROClientException("Got invalid version text: '{}'".format(response_text), http_code=500) + except aiohttp.errors.ClientOSError as e: + raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) + except Exception as e: + raise ROClientException("Got invalid version text: '{}'; causing exception {}".format(response_text, e), + http_code=500) + async def get_list(self, item, all_tenants=False, filter_by=None): """ Obtain a list of items filtering by the specigy filter_by. @@ -560,12 +645,17 @@ class ROClient: return content except aiohttp.errors.ClientOSError as e: raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) - async def show(self, item, item_id_name=None, all_tenants=False): + async def show(self, item, item_id_name=None, extra_item=None, extra_item_id=None, all_tenants=False): """ Obtain the information of an item from its id or name :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param extra_item: if supplied, it is used to add to the URL. + Can be 'action' if item='ns'; 'networks' or'images' if item='vim' + :param extra_item_id: if supplied, it is used get details of a concrete extra_item. :param all_tenants: True if not filtering by tenant. Only allowed for admin :return: dictionary with the information or raises ROClientException on Error, NotFound, found several """ @@ -580,10 +670,13 @@ class ROClient: all_tenants = False with aiohttp.ClientSession(loop=self.loop) as session: - content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) + content = await self._get_item(session, self.client_to_RO[item], item_id_name, extra_item=extra_item, + extra_item_id=extra_item_id, all_tenants=all_tenants) return remove_envelop(item, content) except aiohttp.errors.ClientOSError as e: raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) async def delete(self, item, item_id_name=None, all_tenants=False): """ @@ -603,10 +696,13 @@ class ROClient: return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) except aiohttp.errors.ClientOSError as e: raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) async def edit(self, item, item_id_name, descriptor=None, descriptor_format=None, **kwargs): """ Edit an item :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns', 'vim' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided :param descriptor_format: Can be 'json' or 'yaml' :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type @@ -648,6 +744,8 @@ class ROClient: return remove_envelop(item, outdata) except aiohttp.errors.ClientOSError as e: raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) async def create(self, item, descriptor=None, descriptor_format=None, **kwargs): """ @@ -691,66 +789,133 @@ class ROClient: return remove_envelop(item, outdata) except aiohttp.errors.ClientOSError as e: raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) + + async def create_action(self, item, item_id_name, descriptor=None, descriptor_format=None, **kwargs): + """ + Performs an action over an item + :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided + :param descriptor_format: Can be 'json' or 'yaml' + :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type + keys can be a dot separated list to specify elements inside dict + :return: dictionary with the information or raises ROClientException on Error + """ + try: + if isinstance(descriptor, str): + descriptor = self._parse(descriptor, descriptor_format) + elif descriptor: + pass + else: + descriptor = {} + + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + desc = remove_envelop(item, descriptor) + + # Override descriptor with kwargs + if kwargs: + desc = self.update_descriptor(desc, kwargs) + + all_tenants = False + if item in ('tenant', 'vim'): + all_tenants = None + + action = None + if item == "vims": + action = "sdn_mapping" + elif item in ("vim_account", "ns"): + action = "action" + + # create_desc = self._create_envelop(item, desc) + create_desc = desc + + with aiohttp.ClientSession(loop=self.loop) as session: + _all_tenants = all_tenants + if item == 'vim': + _all_tenants = True + # item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, + # all_tenants=_all_tenants) + outdata = await self._create_item(session, self.client_to_RO[item], create_desc, + item_id_name=item_id_name, # item_id_name=item_id + action=action, all_tenants=_all_tenants) + return remove_envelop(item, outdata) + except aiohttp.errors.ClientOSError as e: + raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) async def attach_datacenter(self, datacenter=None, descriptor=None, descriptor_format=None, **kwargs): - if isinstance(descriptor, str): - descriptor = self._parse(descriptor, descriptor_format) - elif descriptor: - pass - else: - descriptor = {} - desc = remove_envelop("vim", descriptor) - - # # check that exist - # uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True) - # tenant_text = "/" + self._get_tenant() - if kwargs: - desc = self.update_descriptor(desc, kwargs) - - if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"): - raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided") - create_desc = self._create_envelop("vim", desc) - payload_req = yaml.safe_dump(create_desc) - with aiohttp.ClientSession(loop=self.loop) as session: - # check that exist - item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True) - await self._get_tenant(session) + try: + if isinstance(descriptor, str): + descriptor = self._parse(descriptor, descriptor_format) + elif descriptor: + pass + else: + descriptor = {} + desc = remove_envelop("vim", descriptor) - url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant, - datacenter=item_id) - self.logger.debug("RO POST %s %s", url, payload_req) - with aiohttp.Timeout(self.timeout_large): - async with session.post(url, headers=self.headers_req, data=payload_req) as response: - response_text = await response.read() - self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - - response_desc = self._parse_yaml(response_text, response=True) - desc = remove_envelop("vim", response_desc) - return desc + # # check that exist + # uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True) + # tenant_text = "/" + self._get_tenant() + if kwargs: + desc = self.update_descriptor(desc, kwargs) + + if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"): + raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided") + create_desc = self._create_envelop("vim", desc) + payload_req = yaml.safe_dump(create_desc) + with aiohttp.ClientSession(loop=self.loop) as session: + # check that exist + item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True) + await self._get_tenant(session) + + url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant, + datacenter=item_id) + self.logger.debug("RO POST %s %s", url, payload_req) + with aiohttp.Timeout(self.timeout_large): + async with session.post(url, headers=self.headers_req, data=payload_req) as response: + response_text = await response.read() + self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + response_desc = self._parse_yaml(response_text, response=True) + desc = remove_envelop("vim", response_desc) + return desc + except aiohttp.errors.ClientOSError as e: + raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) async def detach_datacenter(self, datacenter=None): # TODO replace the code with delete_item(vim_account,...) - with aiohttp.ClientSession(loop=self.loop) as session: - # check that exist - item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False) - tenant = await self._get_tenant(session) - - url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant, - datacenter=item_id) - self.logger.debug("RO DELETE %s", url) - with aiohttp.Timeout(self.timeout_large): - async with session.delete(url, headers=self.headers_req) as response: - response_text = await response.read() - self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - - response_desc = self._parse_yaml(response_text, response=True) - desc = remove_envelop("vim", response_desc) - return desc + try: + with aiohttp.ClientSession(loop=self.loop) as session: + # check that exist + item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False) + tenant = await self._get_tenant(session) + + url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant, + datacenter=item_id) + self.logger.debug("RO DELETE %s", url) + with aiohttp.Timeout(self.timeout_large): + async with session.delete(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + response_desc = self._parse_yaml(response_text, response=True) + desc = remove_envelop("vim", response_desc) + return desc + except aiohttp.errors.ClientOSError as e: + raise ROClientException(e, http_code=504) + except asyncio.TimeoutError: + raise ROClientException("Timeout", http_code=504) # TODO convert to asyncio # DATACENTERS diff --git a/osm_lcm/__init__.py b/osm_lcm/__init__.py index 038e365..4b516fb 100644 --- a/osm_lcm/__init__.py +++ b/osm_lcm/__init__.py @@ -1,2 +1,2 @@ -version = '0.1.7' -date_version = '2018-06-27' +version = '0.1.8' +date_version = '2018-06-28' diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 83efbca..8c10554 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -9,6 +9,7 @@ import logging.handlers import getopt import functools import sys +import traceback from osm_common import dbmemory from osm_common import dbmongo from osm_common import fslocal @@ -27,6 +28,7 @@ from time import time __author__ = "Alfonso Tierno" +min_RO_version = [0, 5, 69] class LcmException(Exception): @@ -114,7 +116,9 @@ class Lcm: # or with list(map(int, version.split("."))) if N2VC_version < "0.0.2": raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version)) + try: + # TODO check database version if config["database"]["driver"] == "mongo": self.db = dbmongo.DbMongo() self.db.db_connect(config["database"]) @@ -145,6 +149,18 @@ class Lcm: self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) + async def check_RO_version(self): + try: + RO = ROclient.ROClient(self.loop, **self.ro_config) + RO_version = await RO.get_version() + if RO_version < min_RO_version: + raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format( + *RO_version, *min_RO_version + )) + except ROclient.ROClientException as e: + self.logger.critical("Error while conneting to osm/RO " + str(e), exc_info=True) + raise LcmException(str(e)) + def update_db(self, item, _id, _desc): try: self.db.replace(item, _id, _desc) @@ -152,10 +168,20 @@ class Lcm: self.logger.error("Updating {} _id={}: {}".format(item, _id, e)) def update_db_2(self, item, _id, _desc): + """ + Updates database with _desc information. Upon success _desc is cleared + :param item: + :param _id: + :param _desc: + :return: + """ + if not _desc: + return try: self.db.set_one(item, {"_id": _id}, _desc) + _desc.clear() except DbException as e: - self.logger.error("Updating {} _id={}: {}".format(item, _id, e)) + self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) async def vim_create(self, vim_content, order_id): vim_id = vim_content["_id"] @@ -501,6 +527,10 @@ class Lcm: ci_file = None vdu.pop("cloud-init-file", None) vdu["cloud-init"] = clout_init_content + # remnove unused by RO configuration, monitoring, scaling + vnfd_RO.pop("vnf-configuration", None) + vnfd_RO.pop("monitoring-param", None) + vnfd_RO.pop("scaling-group-descriptor", None) return vnfd_RO except FsException as e: raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e)) @@ -532,13 +562,14 @@ class Lcm: """ nsr_id = None nslcmop_id = None - update_nsr = update_nslcmop = False + db_nsr_update = {} + db_nslcmop_update = {} try: nsr_id = db_nsr["_id"] nslcmop_id = db_nslcmop["_id"] nsr_lcm = db_nsr["_admin"]["deployed"] - ns_action = db_nslcmop["lcmOperationType"] - logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id, + ns_operation = db_nslcmop["lcmOperationType"] + logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_operation, nslcmop_id, member_vnf_index) if task: @@ -551,24 +582,26 @@ class Lcm: exc = task.exception() if exc: self.logger.error(logging_text + " task Exception={}".format(exc)) - if ns_action in ("instantiate", "terminate"): + if ns_operation in ("instantiate", "terminate"): nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error" + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = \ + "error" nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc) - elif ns_action == "action": - db_nslcmop["operationState"] = "FAILED" - db_nslcmop["detailed-status"] = str(exc) - db_nslcmop["statusEnteredTime"] = time() - update_nslcmop = True + db_nsr_update[ + "_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(exc) + elif ns_operation == "action": + db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["detailed-status"] = str(exc) + db_nslcmop_update["statusEnteredTime"] = time() return else: self.logger.debug(logging_text + " task Done") # TODO revise with Adam if action is finished and ok when task is done - if ns_action == "action": - db_nslcmop["operationState"] = "COMPLETED" - db_nslcmop["detailed-status"] = "Done" - db_nslcmop["statusEnteredTime"] = time() - update_nslcmop = True + if ns_operation == "action": + db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nslcmop_update["statusEnteredTime"] = time() # task is Done, but callback is still ongoing. So ignore return elif status: @@ -576,7 +609,9 @@ class Lcm: if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status: return # same status, ignore nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = status nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message) + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(message) else: self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True) return @@ -600,37 +635,36 @@ class Lcm: if all_active: self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, member_vnf_index)) - db_nsr["config-status"] = "configured" - db_nsr["detailed-status"] = "done" - db_nslcmop["operationState"] = "COMPLETED" - db_nslcmop["detailed-status"] = "Done" - db_nslcmop["statusEnteredTime"] = time() + db_nsr_update["config-status"] = "configured" + db_nsr_update["detailed-status"] = "done" + db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nslcmop_update["statusEnteredTime"] = time() elif n2vc_error_text: - db_nsr["config-status"] = "failed" + db_nsr_update["config-status"] = "failed" error_text = "fail configuring " + ";".join(n2vc_error_text) - db_nsr["detailed-status"] = error_text - db_nslcmop["operationState"] = "FAILED_TEMP" - db_nslcmop["detailed-status"] = error_text - db_nslcmop["statusEnteredTime"] = time() + db_nsr_update["detailed-status"] = error_text + db_nslcmop_update["operationState"] = "FAILED_TEMP" + db_nslcmop_update["detailed-status"] = error_text + db_nslcmop_update["statusEnteredTime"] = time() else: cs = "configuring: " separator = "" for status, num in status_map.items(): cs += separator + "{}: {}".format(status, num) separator = ", " - db_nsr["config-status"] = cs - db_nsr["detailed-status"] = cs - db_nslcmop["detailed-status"] = cs - update_nsr = update_nslcmop = True + db_nsr_update["config-status"] = cs + db_nsr_update["detailed-status"] = cs + db_nslcmop_update["detailed-status"] = cs except Exception as e: self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True) finally: try: - if update_nslcmop: - self.update_db("nslcmops", nslcmop_id, db_nslcmop) - if update_nsr: - self.update_db("nsrs", nsr_id, db_nsr) + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) except Exception as e: self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format( member_vnf_index, e), exc_info=True) @@ -694,13 +728,134 @@ class Lcm: RO_ns_params["networks"][vld["name"]] = RO_vld return RO_ns_params + def ns_update_vnfr(self, db_vnfrs, ns_RO_info): + """ + Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated + :param db_vnfrs: + :param ns_RO_info: + :return: + """ + for vnf_index, db_vnfr in db_vnfrs.items(): + vnfr_deployed = ns_RO_info.get(vnf_index) + if not vnfr_deployed: + continue + vnfr_update = {} + db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnfr_deployed.get("ip_address") + for index, vdur in enumerate(db_vnfr["vdur"]): + vdu_deployed = vnfr_deployed["vdur"].get(vdur["vdu-id-ref"]) + if not vdu_deployed: + continue + vnfr_update["vdur.{}.vim-id".format(index)] = vdu_deployed.get("vim_id") + db_vnfr["vdur"][index]["vim-id"] = vnfr_update["vdur.{}.vim-id".format(index)] + vnfr_update["vdur.{}.ip-address".format(index)] = vdu_deployed.get("ip_address") + db_vnfr["vdur"][index]["ip-address"] = vnfr_update["vdur.{}.ip-address".format(index)] + for index2, interface in enumerate(vdur["interfaces"]): + iface_deployed = vdu_deployed["interfaces"].get(interface["name"]) + if not iface_deployed: + continue + db_vnfr["vdur"][index]["interfaces"][index2]["vim-id"] =\ + vnfr_update["vdur.{}.interfaces.{}.vim-id".format(index, index2)] = iface_deployed.get("vim_id") + db_vnfr["vdur"][index]["interfaces"][index2]["ip-address"] =\ + vnfr_update["vdur.{}.interfaces.{}.ip-address".format(index, index2)] = iface_deployed.get( + "ip_address") + db_vnfr["vdur"][index]["interfaces"][index2]["mac-address"] =\ + vnfr_update["vdur.{}.interfaces.{}.mac-address".format(index, index2)] = iface_deployed.get( + "mac_address") + self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) + + def ns_update_vnfr_2(self, db_vnfrs, nsr_desc_RO): + """ + Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated + :param db_vnfrs: + :param nsr_desc_RO: + :return: + """ + for vnf_index, db_vnfr in db_vnfrs.items(): + for vnf_RO in nsr_desc_RO["vnfs"]: + if vnf_RO["member_vnf_index"] == vnf_index: + vnfr_update = {} + db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO.get("ip_address") + vdur_list = [] + for vdur_RO in vnf_RO.get("vms", ()): + vdur = { + "vim-id": vdur_RO.get("vim_vm_id"), + "ip-address": vdur_RO.get("ip_address"), + "vdu-id-ref": vdur_RO.get("vdu_osm_id"), + "name": vdur_RO.get("vim_name"), + "status": vdur_RO.get("status"), + "status-detailed": vdur_RO.get("error_msg"), + "interfaces": [] + } + + for interface_RO in vdur_RO.get("interfaces", ()): + vdur["interfaces"].append({ + "ip-address": interface_RO.get("ip_address"), + "mac-address": interface_RO.get("mac_address"), + "name": interface_RO.get("external_name"), + }) + vdur_list.append(vdur) + db_vnfr["vdur"] = vnfr_update["vdur"] = vdur_list + self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) + break + + else: + raise LcmException("ns_update_vnfr_2: Not found member_vnf_index={} at RO info".format(vnf_index)) + + async def create_monitoring(self, nsr_id, vnf_member_index, vnfd_desc): + if not vnfd_desc.get("scaling-group-descriptor"): + return + for scaling_group in vnfd_desc["scaling-group-descriptor"]: + scaling_policy_desc = {} + scaling_desc = { + "ns_id": nsr_id, + "scaling_group_descriptor": { + "name": scaling_group["name"], + "scaling_policy": scaling_policy_desc + } + } + for scaling_policy in scaling_group.get("scaling-policy"): + scaling_policy_desc["scale_in_operation_type"] = scaling_policy_desc["scale_out_operation_type"] = \ + scaling_policy["scaling-type"] + scaling_policy_desc["threshold_time"] = scaling_policy["threshold-time"] + scaling_policy_desc["cooldown_time"] = scaling_policy["cooldown-time"] + scaling_policy_desc["scaling_criteria"] = [] + for scaling_criteria in scaling_policy.get("scaling-criteria"): + scaling_criteria_desc = {"scale_in_threshold": scaling_criteria.get("scale-in-threshold"), + "scale_out_threshold": scaling_criteria.get("scale-out-threshold"), + } + if not scaling_criteria.get("vnf-monitoring-param-ref"): + continue + for monitoring_param in vnfd_desc.get("monitoring-param", ()): + if monitoring_param["id"] == scaling_criteria["vnf-monitoring-param-ref"]: + scaling_criteria_desc["monitoring_param"] = { + "id": monitoring_param["id"], + "name": monitoring_param["name"], + "aggregation_type": monitoring_param.get("aggregation-type"), + "vdu_name": monitoring_param.get("vdu-ref"), + "vnf_member_index": vnf_member_index, + } + + scaling_policy_desc["scaling_criteria"].append(scaling_criteria_desc) + break + else: + self.logger.error( + "Task ns={} member_vnf_index={} Invalid vnfd vnf-monitoring-param-ref={} not in " + "monitoring-param list".format(nsr_id, vnf_member_index, + scaling_criteria["vnf-monitoring-param-ref"])) + + await self.msg.aiowrite("lcm_pm", "configure_scaling", scaling_desc, self.loop) + async def ns_instantiate(self, nsr_id, nslcmop_id): logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database db_nsr = None db_nslcmop = None - db_vnfr = {} + db_nsr_update = {} + db_nslcmop_update = {} + db_vnfrs = {} + RO_descriptor_number = 0 # number of descriptors created at RO + descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO exc = None try: step = "Getting nslcmop={} from db".format(nslcmop_id) @@ -714,7 +869,8 @@ class Lcm: for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"] - db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter) + step = "Getting vnfr={} of nsr={} from db".format(c_vnf["member-vnf-index"], nsr_id) + db_vnfrs[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter) if vnfd_id not in needed_vnfd: step = "Getting vnfd={} from db".format(vnfd_id) needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id}) @@ -727,149 +883,146 @@ class Lcm: "nsr_ip": {}, "VCA": {}, } - db_nsr["detailed-status"] = "creating" - db_nsr["operational-status"] = "init" + db_nsr_update["detailed-status"] = "creating" + db_nsr_update["operational-status"] = "init" RO = ROclient.ROClient(self.loop, **self.ro_config) # get vnfds, instantiate at RO for vnfd_id, vnfd in needed_vnfd.items(): - step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id) + step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id) # self.logger.debug(logging_text + step) - vnfd_id_RO = nsr_id + "." + vnfd_id[:200] + vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_id[:23]) + descriptor_id_2_RO[vnfd_id] = vnfd_id_RO + RO_descriptor_number += 1 # look if present vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO}) if vnfd_list: - nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"] + db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"] self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format( vnfd_id, vnfd_list[0]["uuid"])) else: vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO) desc = await RO.create("vnfd", descriptor=vnfd_RO) - nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] - db_nsr["_admin"]["nsState"] = "INSTANTIATED" + db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"] + db_nsr_update["_admin.nsState"] = "INSTANTIATED" self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format( vnfd_id, desc["uuid"])) - self.update_db("nsrs", nsr_id, db_nsr) + self.update_db_2("nsrs", nsr_id, db_nsr_update) # create nsd at RO nsd_id = nsd["id"] - step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id) + step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_id) # self.logger.debug(logging_text + step) - nsd_id_RO = nsr_id + "." + nsd_id[:200] - nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO}) + RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_id[:23]) + descriptor_id_2_RO[nsd_id] = RO_osm_nsd_id + RO_descriptor_number += 1 + nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id}) if nsd_list: - nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"] + db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"] self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format( - nsd_id, nsd_list[0]["uuid"])) + nsd_id, RO_nsd_uuid)) else: nsd_RO = deepcopy(nsd) - nsd_RO["id"] = nsd_id_RO + nsd_RO["id"] = RO_osm_nsd_id nsd_RO.pop("_id", None) nsd_RO.pop("_admin", None) for c_vnf in nsd_RO["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] - c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200] + c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id] desc = await RO.create("nsd", descriptor=nsd_RO) - db_nsr["_admin"]["nsState"] = "INSTANTIATED" - nsr_lcm["RO"]["nsd_id"] = desc["uuid"] - self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, desc["uuid"])) - self.update_db("nsrs", nsr_id, db_nsr) + db_nsr_update["_admin.nsState"] = "INSTANTIATED" + db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"] + self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, RO_nsd_uuid)) + self.update_db_2("nsrs", nsr_id, db_nsr_update) # Crate ns at RO # if present use it unless in error status - RO_nsr_id = nsr_lcm["RO"].get("nsr_id") + RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id") if RO_nsr_id: try: - step = db_nsr["detailed-status"] = "Looking for existing ns at RO" + step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO" # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id)) desc = await RO.show("ns", RO_nsr_id) except ROclient.ROClientException as e: if e.http_code != HTTPStatus.NOT_FOUND: raise - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None + RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None if RO_nsr_id: ns_status, ns_status_info = RO.check_ns_status(desc) - nsr_lcm["RO"]["nsr_status"] = ns_status + db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status if ns_status == "ERROR": - step = db_nsr["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id) + step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id) self.logger.debug(logging_text + step) await RO.delete("ns", RO_nsr_id) - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None + RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None if not RO_nsr_id: - step = db_nsr["detailed-status"] = "Creating ns at RO" + step = db_nsr_update["detailed-status"] = "Creating ns at RO" # self.logger.debug(logging_text + step) RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params")) desc = await RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], - scenario=nsr_lcm["RO"]["nsd_id"]) - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"] - db_nsr["_admin"]["nsState"] = "INSTANTIATED" - nsr_lcm["RO"]["nsr_status"] = "BUILD" + scenario=RO_nsd_uuid) + RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"] + db_nsr_update["_admin.nsState"] = "INSTANTIATED" + db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD" self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"])) - self.update_db("nsrs", nsr_id, db_nsr) + self.update_db_2("nsrs", nsr_id, db_nsr_update) # update VNFR vimAccount step = "Updating VNFR vimAcccount" - for vnf_index, vnfr in db_vnfr.items(): + for vnf_index, vnfr in db_vnfrs.items(): if vnfr.get("vim-account-id"): continue - vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"] + vnfr_update = {"vim-account-id": db_nsr["instantiate_params"]["vimAccountId"]} if db_nsr["instantiate_params"].get("vnf"): for vnf_params in db_nsr["instantiate_params"]["vnf"]: if vnf_params.get("member-vnf-index") == vnf_index: if vnf_params.get("vimAccountId"): - vnfr["vim-account-id"] = vnf_params.get("vimAccountId") + vnfr_update["vim-account-id"] = vnf_params.get("vimAccountId") break - self.update_db("vnfrs", vnfr["_id"], vnfr) + self.update_db_2("vnfrs", vnfr["_id"], vnfr_update) # wait until NS is ready - step = ns_status_detailed = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) - db_nsr["detailed-status"] = ns_status_detailed + step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) + detailed_status_old = None self.logger.debug(logging_text + step) + deployment_timeout = 2 * 3600 # Two hours while deployment_timeout > 0: desc = await RO.show("ns", RO_nsr_id) ns_status, ns_status_info = RO.check_ns_status(desc) - nsr_lcm["RO"]["nsr_status"] = ns_status + db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) elif ns_status == "BUILD": - db_nsr_detailed_status_old = db_nsr["detailed-status"] - db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info) - if db_nsr_detailed_status_old != db_nsr["detailed-status"]: - self.update_db("nsrs", nsr_id, db_nsr) + detailed_status = ns_status_detailed + "; {}".format(ns_status_info) elif ns_status == "ACTIVE": - step = "Waiting for management IP address from VIM" + step = detailed_status = "Waiting for management IP address reported by the VIM" try: - ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc) + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc) break except ROclient.ROClientException as e: if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT raise e else: assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + if detailed_status != detailed_status_old: + detailed_status_old = db_nsr_update["detailed-status"] = detailed_status + self.update_db_2("nsrs", nsr_id, db_nsr_update) await asyncio.sleep(5, loop=self.loop) deployment_timeout -= 5 if deployment_timeout <= 0: raise ROclient.ROClientException("Timeout waiting ns to be ready") step = "Updating VNFRs" - for vnf_index, vnfr_deployed in ns_RO_info.items(): - vnfr = db_vnfr[vnf_index] - vnfr["ip-address"] = vnfr_deployed.get("ip_address") - for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items(): - for vdur in vnfr["vdur"]: - if vdur["vdu-id-ref"] == vdu_id: - vdur["vim-id"] = vdu_deployed.get("vim_id") - vdur["ip-address"] = vdu_deployed.get("ip_address") - break - self.update_db("vnfrs", vnfr["_id"], vnfr) + # self.ns_update_vnfr(db_vnfrs, ns_RO_info) + self.ns_update_vnfr_2(db_vnfrs, desc) db_nsr["detailed-status"] = "Configuring vnfr" - self.update_db("nsrs", nsr_id, db_nsr) + self.update_db_2("nsrs", nsr_id, db_nsr_update) # The parameters we'll need to deploy a charm number_to_configure = 0 @@ -899,7 +1052,7 @@ class Lcm: ) # Setup the runtime parameters for this VNF - params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"] + params['rw_mgmt_ip'] = db_vnfrs[vnf_index]["ip-address"] # ns_name will be ignored in the current version of N2VC # but will be implemented for the next point release. @@ -909,14 +1062,16 @@ class Lcm: vnf_index, vnfd['name'], ) - - nsr_lcm["VCA"][vnf_index] = { + if not nsr_lcm.get("VCA"): + nsr_lcm["VCA"] = {} + nsr_lcm["VCA"][vnf_index] = db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = { "model": model_name, "application": application_name, "operational-status": "init", "detailed-status": "", "vnfd_id": vnfd_id, } + self.update_db_2("nsrs", nsr_id, db_nsr_update) self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm)) @@ -939,13 +1094,9 @@ class Lcm: db_nsr, db_nslcmop, vnf_index)) self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task - # TODO: Make this call inside deploy() - # Login to the VCA. If there are multiple calls to login(), - # subsequent calls will be a nop and return immediately. - await self.n2vc.login() - step = "Looking for needed vnfd to configure" self.logger.debug(logging_text + step) + for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] vnf_index = str(c_vnf["member-vnf-index"]) @@ -962,6 +1113,10 @@ class Lcm: if 'initial-config-primitive' in vnf_config: params['initial-config-primitive'] = vnf_config['initial-config-primitive'] + # Login to the VCA. If there are multiple calls to login(), + # subsequent calls will be a nop and return immediately. + step = "connecting to N2VC to configure vnf {}".format(vnf_index) + await self.n2vc.login() deploy() number_to_configure += 1 @@ -978,43 +1133,57 @@ class Lcm: params['initial-config-primitive'] = vdu_config['initial-config-primitive'] if proxy_charm: + step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index) + await self.n2vc.login() deploy() number_to_configure += 1 if number_to_configure: - db_nsr["config-status"] = "configuring" - db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure) - db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure) + db_nsr_update["config-status"] = "configuring" + db_nsr_update["operational-status"] = "running" + db_nsr_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) + db_nslcmop_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) else: - db_nslcmop["operationState"] = "COMPLETED" - db_nslcmop["statusEnteredTime"] = time() - db_nslcmop["detailed-status"] = "done" - db_nsr["config-status"] = "configured" - db_nsr["detailed-status"] = "done" - db_nsr["operational-status"] = "running" - self.update_db("nsrs", nsr_id, db_nsr) - self.update_db("nslcmops", nslcmop_id, db_nslcmop) - self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id)) - return nsr_lcm + db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["statusEnteredTime"] = time() + db_nslcmop_update["detailed-status"] = "done" + db_nsr_update["config-status"] = "configured" + db_nsr_update["detailed-status"] = "done" + db_nsr_update["operational-status"] = "running" + step = "Sending monitoring parameters to PM" + # for c_vnf in nsd["constituent-vnfd"]: + # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]]) + try: + await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + + self.logger.debug(logging_text + "Exit") + return except (ROclient.ROClientException, DbException, LcmException) as e: self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" except Exception as e: + exc = traceback.format_exc() self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), exc_info=True) - exc = e finally: if exc: if db_nsr: - db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc) - db_nsr["operational-status"] = "failed" - self.update_db("nsrs", nsr_id, db_nsr) + db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc) + db_nsr_update["operational-status"] = "failed" if db_nslcmop: - db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nslcmop["operationState"] = "FAILED" - db_nslcmop["statusEnteredTime"] = time() - self.update_db("nslcmops", nslcmop_id, db_nslcmop) + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) async def ns_terminate(self, nsr_id, nslcmop_id): logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) @@ -1022,15 +1191,18 @@ class Lcm: db_nsr = None db_nslcmop = None exc = None - step = "Getting nsr, nslcmop from db" failed_detail = [] # annotates all failed error messages vca_task_list = [] vca_task_dict = {} + db_nsr_update = {} + db_nslcmop_update = {} try: + step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + step = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) # nsd = db_nsr["nsd"] - nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"]) + nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed")) if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED": return # TODO ALF remove @@ -1038,98 +1210,110 @@ class Lcm: # #TODO check if VIM is creating and wait # RO_vim_id = db_vim["_admin"]["deployed"]["RO"] - db_nsr_update = { - "operational-status": "terminating", - "config-status": "terminating", - "detailed-status": "Deleting charms", - } - self.update_db_2("nsrs", nsr_id, db_nsr_update) + db_nsr_update["operational-status"] = "terminating" + db_nsr_update["config-status"] = "terminating" - try: - self.logger.debug(logging_text + step) - for vnf_index, deploy_info in nsr_lcm["VCA"].items(): - if deploy_info and deploy_info.get("application"): - task = asyncio.ensure_future( - self.n2vc.RemoveCharms( - deploy_info['model'], - deploy_info['application'], - # self.n2vc_callback, - # db_nsr, - # db_nslcmop, - # vnf_index, + if nsr_lcm and nsr_lcm.get("VCA"): + try: + step = "Scheduling configuration charms removing" + db_nsr_update["detailed-status"] = "Deleting charms" + self.logger.debug(logging_text + step) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + for vnf_index, deploy_info in nsr_lcm["VCA"].items(): + if deploy_info and deploy_info.get("application"): + task = asyncio.ensure_future( + self.n2vc.RemoveCharms( + deploy_info['model'], + deploy_info['application'], + # self.n2vc_callback, + # db_nsr, + # db_nslcmop, + # vnf_index, + ) ) - ) - vca_task_list.append(task) - vca_task_dict[vnf_index] = task - # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'], - # deploy_info['application'], None, db_nsr, - # db_nslcmop, vnf_index)) - self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task - except Exception as e: - self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e)) - # remove from RO + vca_task_list.append(task) + vca_task_dict[vnf_index] = task + # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'], + # deploy_info['application'], None, db_nsr, + # db_nslcmop, vnf_index)) + self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task + except Exception as e: + self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e)) + # remove from RO + RO_fail = False RO = ROclient.ROClient(self.loop, **self.ro_config) # Delete ns - RO_nsr_id = nsr_lcm["RO"].get("nsr_id") - if RO_nsr_id: + if nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsr_id"): + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] try: - step = db_nsr["detailed-status"] = "Deleting ns at RO" + step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO" self.logger.debug(logging_text + step) await RO.delete("ns", RO_nsr_id) - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" except ROclient.ROClientException as e: if e.http_code == 404: # not found - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id)) elif e.http_code == 409: # conflict failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) self.logger.debug(logging_text + failed_detail[-1]) + RO_fail = True else: failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) self.logger.error(logging_text + failed_detail[-1]) + RO_fail = True # Delete nsd - RO_nsd_id = nsr_lcm["RO"]["nsd_id"] - if RO_nsd_id: + if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"): + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] try: - step = db_nsr["detailed-status"] = "Deleting nsd at RO" + step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\ + "Deleting nsd at RO" await RO.delete("nsd", RO_nsd_id) self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id)) - nsr_lcm["RO"]["nsd_id"] = None + db_nsr_update["_admin.deployed.RO.nsd_id"] = None except ROclient.ROClientException as e: if e.http_code == 404: # not found - nsr_lcm["RO"]["nsd_id"] = None + db_nsr_update["_admin.deployed.RO.nsd_id"] = None self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id)) elif e.http_code == 409: # conflict failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e)) self.logger.debug(logging_text + failed_detail[-1]) + RO_fail = True else: failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e)) self.logger.error(logging_text + failed_detail[-1]) + RO_fail = True - for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): - if not RO_vnfd_id: - continue - try: - step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id) - await RO.delete("vnfd", RO_vnfd_id) - self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id)) - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - except ROclient.ROClientException as e: - if e.http_code == 404: # not found - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id)) - elif e.http_code == 409: # conflict - failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e)) - self.logger.debug(logging_text + failed_detail[-1]) - else: - failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) - self.logger.error(logging_text + failed_detail[-1]) + if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"): + for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + if not RO_vnfd_id: + continue + try: + step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\ + "Deleting vnfd={} at RO".format(vnf_id) + await RO.delete("vnfd", RO_vnfd_id) + self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id)) + db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None + self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id)) + elif e.http_code == 409: # conflict + failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) + else: + failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) + self.logger.error(logging_text + failed_detail[-1]) if vca_task_list: + db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\ + "Waiting for deletion of configuration charms" + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + self.update_db_2("nsrs", nsr_id, db_nsr_update) await asyncio.wait(vca_task_list, timeout=300) for vnf_index, task in vca_task_dict.items(): if task.cancelled(): @@ -1139,7 +1323,7 @@ class Lcm: if exc: failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc)) else: - nsr_lcm["VCA"][vnf_index] = None + db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = None else: # timeout # TODO Should it be cancelled?!! task.cancel() @@ -1147,41 +1331,40 @@ class Lcm: if failed_detail: self.logger.error(logging_text + " ;".join(failed_detail)) - db_nsr_update = { - "operational-status": "failed", - "detailed-status": "Deletion errors " + "; ".join(failed_detail), - "_admin.deployed": nsr_lcm - } - db_nslcmop_update = { - "detailed-status": "; ".join(failed_detail), - "operationState": "FAILED", - "statusEnteredTime": time() - } + db_nsr_update["operational-status"] = "failed" + db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail) + db_nslcmop_update["detailed-status"] = "; ".join(failed_detail) + db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() elif db_nslcmop["operationParams"].get("autoremove"): self.db.del_one("nsrs", {"_id": nsr_id}) + db_nsr_update.clear() self.db.del_list("nslcmops", {"nsInstanceId": nsr_id}) + db_nslcmop_update.clear() self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id}) self.logger.debug(logging_text + "Delete from database") else: - db_nsr_update = { - "operational-status": "terminated", - "detailed-status": "Done", - "_admin.deployed": nsr_lcm, - "_admin.nsState": "NOT_INSTANTIATED" - } - db_nslcmop_update = { - "detailed-status": "Done", - "operationState": "COMPLETED", - "statusEnteredTime": time() - } + db_nsr_update["operational-status"] = "terminated" + db_nsr_update["detailed-status"] = "Done" + db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" + db_nslcmop_update["detailed-status"] = "Done" + db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["statusEnteredTime"] = time() + try: + await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) self.logger.debug(logging_text + "Exit") except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" except Exception as e: + exc = traceback.format_exc() self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) - exc = e finally: if exc and db_nslcmop: db_nslcmop_update = { @@ -1194,6 +1377,53 @@ class Lcm: if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + async def _ns_execute_primitive(self, db_deployed, member_vnf_index, primitive, primitive_params): + vca_deployed = db_deployed["VCA"].get(member_vnf_index) + if not vca_deployed: + raise LcmException("charm for member_vnf_index={} is not deployed".format(member_vnf_index)) + model_name = vca_deployed.get("model") + application_name = vca_deployed.get("application") + if not model_name or not application_name: + raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index)) + if vca_deployed["operational-status"] != "active": + raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format( + member_vnf_index, vca_deployed["operational-status"])) + callback = None # self.n2vc_callback + callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None] + await self.n2vc.login() + task = asyncio.ensure_future( + self.n2vc.ExecutePrimitive( + model_name, + application_name, + primitive, callback, + *callback_args, + **primitive_params + ) + ) + # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, + # db_nsr, db_nslcmop, member_vnf_index)) + # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task + # wait until completed with timeout + await asyncio.wait((task,), timeout=600) + + result = "FAILED" # by default + result_detail = "" + if task.cancelled(): + result_detail = "Task has been cancelled" + elif task.done(): + exc = task.exception() + if exc: + result_detail = str(exc) + else: + # TODO revise with Adam if action is finished and ok when task is done or callback is needed + result = "COMPLETED" + result_detail = "Done" + else: # timeout + # TODO Should it be cancelled?!! + task.cancel() + result_detail = "timeout" + return result, result_detail + async def ns_action(self, nsr_id, nslcmop_id): logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") @@ -1210,54 +1440,9 @@ class Lcm: vnf_index = db_nslcmop["operationParams"]["member_vnf_index"] # TODO check if ns is in a proper status - vca_deployed = nsr_lcm["VCA"].get(vnf_index) - if not vca_deployed: - raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index)) - model_name = vca_deployed.get("model") - application_name = vca_deployed.get("application") - if not model_name or not application_name: - raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index)) - if vca_deployed["operational-status"] != "active": - raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format( - vnf_index, vca_deployed["operational-status"])) primitive = db_nslcmop["operationParams"]["primitive"] primitive_params = db_nslcmop["operationParams"]["primitive_params"] - callback = None # self.n2vc_callback - callback_args = () # [db_nsr, db_nslcmop, vnf_index, None] - await self.n2vc.login() - task = asyncio.ensure_future( - self.n2vc.ExecutePrimitive( - model_name, - application_name, - primitive, callback, - *callback_args, - **primitive_params - ) - ) - # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, - # db_nsr, db_nslcmop, vnf_index)) - # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task - # wait until completed with timeout - await asyncio.wait((task,), timeout=300) - - result = "FAILED" # by default - result_detail = "" - if task.cancelled(): - db_nslcmop["detailed-status"] = "Task has been cancelled" - elif task.done(): - exc = task.exception() - if exc: - result_detail = str(exc) - else: - self.logger.debug(logging_text + " task Done") - # TODO revise with Adam if action is finished and ok when task is done or callback is needed - result = "COMPLETED" - result_detail = "Done" - else: # timeout - # TODO Should it be cancelled?!! - task.cancel() - result_detail = "timeout" - + result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, primitive, primitive_params) db_nslcmop_update = { "detailed-status": result_detail, "operationState": result, @@ -1269,9 +1454,12 @@ class Lcm: except (DbException, LcmException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" except Exception as e: + exc = traceback.format_exc() self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) - exc = e finally: if exc and db_nslcmop: db_nslcmop_update = { @@ -1282,6 +1470,288 @@ class Lcm: if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + async def ns_scale(self, nsr_id, nslcmop_id): + logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nsr = None + db_nslcmop = None + db_nslcmop_update = {} + db_nsr_update = {} + exc = None + try: + step = "Getting nslcmop from database" + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + step = "Getting nsr from database" + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + step = "Parsing scaling parameters" + nsr_lcm = db_nsr["_admin"].get("deployed") + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"] + scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"] + scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"] + scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy") + + step = "Getting vnfr from database" + db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}) + step = "Getting vnfd from database" + db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) + step = "Getting scaling-group-descriptor" + for scaling_descriptor in db_vnfd["scaling-group-descriptor"]: + if scaling_descriptor["name"] == scaling_group: + break + else: + raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present " + "at vnfd:scaling-group-descriptor".format(scaling_group)) + cooldown_time = 0 + for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()): + cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0) + if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"): + break + + # TODO check if ns is in a proper status + step = "Sending scale order to RO" + nb_scale_op = 0 + if not db_nsr["_admin"].get("scaling-group"): + self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]}) + admin_scale_index = 0 + else: + for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]): + if admin_scale_info["name"] == scaling_group: + nb_scale_op = admin_scale_info.get("nb-scale-op", 0) + break + RO_scaling_info = [] + vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []} + if scaling_type == "SCALE_OUT": + # count if max-instance-count is reached + if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None: + max_instance_count = int(scaling_descriptor["max-instance-count"]) + if nb_scale_op >= max_instance_count: + raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the" + " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)) + nb_scale_op = nb_scale_op + 1 + vdu_scaling_info["scaling_direction"] = "OUT" + vdu_scaling_info["vdu-create"] = {} + for vdu_scale_info in scaling_descriptor["vdu"]: + RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index, + "type": "create", "count": vdu_scale_info.get("count", 1)}) + vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1) + elif scaling_type == "SCALE_IN": + # count if min-instance-count is reached + if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None: + min_instance_count = int(scaling_descriptor["min-instance-count"]) + if nb_scale_op <= min_instance_count: + raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the " + "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)) + nb_scale_op = nb_scale_op - 1 + vdu_scaling_info["scaling_direction"] = "IN" + vdu_scaling_info["vdu-delete"] = {} + for vdu_scale_info in scaling_descriptor["vdu"]: + RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index, + "type": "delete", "count": vdu_scale_info.get("count", 1)}) + vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1) + + # update VDU_SCALING_INFO with the VDUs to delete ip_addresses + if vdu_scaling_info["scaling_direction"] == "IN": + for vdur in reversed(db_vnfr["vdur"]): + if vdu_scaling_info["vdu-delete"].get(vdur["vdu-id-ref"]): + vdu_scaling_info["vdu-delete"][vdur["vdu-id-ref"]] -= 1 + vdu_scaling_info["vdu"].append({ + "name": vdur["name"], + "vdu_id": vdur["vdu-id-ref"], + "interface": [] + }) + for interface in vdur["interfaces"]: + vdu_scaling_info["vdu"][-1]["interface"].append({ + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + }) + del vdu_scaling_info["vdu-delete"] + + # execute primitive service PRE-SCALING + step = "Executing pre-scale vnf-config-primitive" + if scaling_descriptor.get("scaling-config-action"): + for scaling_config_action in scaling_descriptor["scaling-config-action"]: + if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \ + and scaling_type == "SCALE_IN": + vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"] + step = db_nslcmop_update["detailed-status"] = \ + "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive) + # look for primitive + primitive_params = {} + for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()): + if config_primitive["name"] == vnf_config_primitive: + for parameter in config_primitive.get("parameter", ()): + if 'default-value' in parameter and \ + parameter['default-value'] == "": + primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info, + default_flow_style=True, + width=256) + break + else: + raise LcmException( + "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action" + "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-" + "primitive".format(scaling_group, config_primitive)) + result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, + vnf_config_primitive, primitive_params) + self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( + vnf_config_primitive, result, result_detail)) + if result == "FAILED": + raise LcmException(result_detail) + + if RO_scaling_info: + RO = ROclient.ROClient(self.loop, **self.ro_config) + RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info}) + db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op + db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time() + # TODO mark db_nsr_update as scaling + # wait until ready + RO_nslcmop_id = RO_desc["instance_action_id"] + db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id + + RO_task_done = False + step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id) + detailed_status_old = None + self.logger.debug(logging_text + step) + + deployment_timeout = 1 * 3600 # One hours + while deployment_timeout > 0: + if not RO_task_done: + desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action", + extra_item_id=RO_nslcmop_id) + ns_status, ns_status_info = RO.check_action_status(desc) + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + detailed_status = step + "; {}".format(ns_status_info) + elif ns_status == "ACTIVE": + RO_task_done = True + step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) + self.logger.debug(logging_text + step) + else: + assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) + else: + desc = await RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = RO.check_ns_status(desc) + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + detailed_status = step + "; {}".format(ns_status_info) + elif ns_status == "ACTIVE": + step = detailed_status = "Waiting for management IP address reported by the VIM" + try: + desc = await RO.show("ns", RO_nsr_id) + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc) + break + except ROclient.ROClientException as e: + if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT + raise e + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + if detailed_status != detailed_status_old: + detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + + await asyncio.sleep(5, loop=self.loop) + deployment_timeout -= 5 + if deployment_timeout <= 0: + raise ROclient.ROClientException("Timeout waiting ns to be ready") + + step = "Updating VNFRs" + # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info) + self.ns_update_vnfr_2({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) + + # update VDU_SCALING_INFO with the obtained ip_addresses + if vdu_scaling_info["scaling_direction"] == "OUT": + for vdur in reversed(db_vnfr["vdur"]): + if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]): + vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1 + vdu_scaling_info["vdu"].append({ + "name": vdur["name"], + "vdu_id": vdur["vdu-id-ref"], + "interface": [] + }) + for interface in vdur["interfaces"]: + vdu_scaling_info["vdu"][-1]["interface"].append({ + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + }) + del vdu_scaling_info["vdu-create"] + + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + + # execute primitive service POST-SCALING + step = "Executing post-scale vnf-config-primitive" + if scaling_descriptor.get("scaling-config-action"): + for scaling_config_action in scaling_descriptor["scaling-config-action"]: + if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \ + and scaling_type == "SCALE_OUT": + vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"] + step = db_nslcmop_update["detailed-status"] = \ + "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive) + # look for primitive + primitive_params = {} + for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()): + if config_primitive["name"] == vnf_config_primitive: + for parameter in config_primitive.get("parameter", ()): + if 'default-value' in parameter and \ + parameter['default-value'] == "": + primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info, + default_flow_style=True, + width=256) + break + else: + raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:" + "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not " + "match any vnf-cnfiguration:config-primitive".format(scaling_group, + config_primitive)) + result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, + vnf_config_primitive, primitive_params) + self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( + vnf_config_primitive, result, result_detail)) + if result == "FAILED": + raise LcmException(result_detail) + + db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["statusEnteredTime"] = time() + db_nslcmop_update["detailed-status"] = "done" + db_nsr_update["detailed-status"] = "done" + try: + await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + if cooldown_time: + await asyncio.sleep(cooldown_time) + await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit Ok") + return + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + finally: + if exc: + db_nsr_update = None + if db_nslcmop: + db_nslcmop_update = { + "detailed-status": "FAILED {}: {}".format(step, exc), + "operationState": "FAILED", + "statusEnteredTime": time(), + } + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) @@ -1405,6 +1875,16 @@ class Lcm: self.lcm_ns_tasks[nsr_id] = {} self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task} continue + elif command == "scale": + # self.logger.debug("Update NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns_scale(nsr_id, nslcmop_id)) + if nsr_id not in self.lcm_ns_tasks: + self.lcm_ns_tasks[nsr_id] = {} + self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_scale": task} + continue elif command == "show": try: db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) @@ -1482,6 +1962,10 @@ class Lcm: def start(self): self.loop = asyncio.get_event_loop() + + # check RO version + self.loop.run_until_complete(self.check_RO_version()) + self.loop.run_until_complete(asyncio.gather( self.kafka_read(), self.kafka_ping() @@ -1578,7 +2062,7 @@ if __name__ == '__main__': exit(1) lcm = Lcm(config_file) lcm.start() - except getopt.GetoptError as e: + except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage() exit(1) -- 2.25.1