X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_nbi%2Finstance_topics.py;h=74815cad367b5e8235abe89fdbab507b39cf511c;hb=544a2ae8b0b950b55f29c3f0a223ffe4874285e5;hp=87b186ec8c3cfafb552f69c5ea18414c990f8166;hpb=4568a372eb5a204e04d917213de03ec51f9110c1;p=osm%2FNBI.git diff --git a/osm_nbi/instance_topics.py b/osm_nbi/instance_topics.py index 87b186e..74815ca 100644 --- a/osm_nbi/instance_topics.py +++ b/osm_nbi/instance_topics.py @@ -14,6 +14,7 @@ # limitations under the License. # import logging +import json from uuid import uuid4 from http import HTTPStatus from time import time @@ -25,6 +26,7 @@ from osm_nbi.validation import ( ns_terminate, ns_action, ns_scale, + ns_update, nsi_instantiate, ) from osm_nbi.base_topic import ( @@ -239,14 +241,16 @@ class NsrTopic(BaseTopic): where_, k ) ) - if "." in k or "$" in k: + if "$" in k: raise EngineException( - "Invalid param at {}:{}. Keys must not contain dots or $".format( + "Invalid param at {}:{}. Keys must not contain $ symbol".format( where_, k ) ) if isinstance(v, (dict, tuple, list)): additional_params[k] = "!!yaml " + safe_dump(v) + if kdu_name: + additional_params = json.dumps(additional_params) if descriptor: for df in descriptor.get("df", []): @@ -334,7 +338,7 @@ class NsrTopic(BaseTopic): # Create VNFRs needed_vnfds = {} # TODO: Change for multiple df support - vnf_profiles = nsd.get("df", [[]])[0].get("vnf-profile", ()) + vnf_profiles = nsd.get("df", [{}])[0].get("vnf-profile", ()) for vnfp in vnf_profiles: vnfd_id = vnfp.get("vnfd-id") vnf_index = vnfp.get("id") @@ -345,6 +349,9 @@ class NsrTopic(BaseTopic): ) if vnfd_id not in needed_vnfds: vnfd = self._get_vnfd_from_db(vnfd_id, session) + if "revision" in vnfd["_admin"]: + vnfd["revision"] = vnfd["_admin"]["revision"] + vnfd.pop("_admin") needed_vnfds[vnfd_id] = vnfd nsr_descriptor["vnfd-id"].append(vnfd["_id"]) else: @@ -394,7 +401,6 @@ class NsrTopic(BaseTopic): _filter = self._get_project_filter(session) _filter["id"] = vnfd_id vnfd = self.db.get_one("vnfds", _filter, fail_on_empty=True, fail_on_more=True) - vnfd.pop("_admin") return vnfd def _add_nsr_to_db(self, nsr_descriptor, rollback, session): @@ -474,11 +480,11 @@ class NsrTopic(BaseTopic): "ssh-authorized-key": ns_request.get("ssh_keys"), # TODO remove "flavor": [], "image": [], + "affinity-or-anti-affinity-group": [], } ns_request["nsr_id"] = nsr_id if ns_request and ns_request.get("config-units"): nsr_descriptor["config-units"] = ns_request["config-units"] - # Create vld if nsd.get("virtual-link-desc"): nsr_vld = deepcopy(nsd.get("virtual-link-desc", [])) @@ -504,6 +510,7 @@ class NsrTopic(BaseTopic): ) vnfd = self._get_vnfd_from_db(vnf_profile.get("vnfd-id"), session) + vnfd.pop("_admin") for vdu in vnfd.get("vdu", ()): flavor_data = {} @@ -589,6 +596,29 @@ class NsrTopic(BaseTopic): image_data = self._get_image_data_from_vnfd(vnfd, alt_image) self._add_image_to_nsr(nsr_descriptor, image_data) + # Add Affinity or Anti-affinity group information to NSR + vdu_profiles = vnfd.get("df", [[]])[0].get("vdu-profile", ()) + affinity_group_prefix_name = "{}-{}".format( + nsr_descriptor["name"][:16], vnf_profile.get("id")[:16] + ) + + for vdu_profile in vdu_profiles: + affinity_group_data = {} + for affinity_group in vdu_profile.get( + "affinity-or-anti-affinity-group", () + ): + affinity_group_data = ( + self._get_affinity_or_anti_affinity_group_data_from_vnfd( + vnfd, affinity_group["id"] + ) + ) + affinity_group_data["member-vnf-index"] = vnf_profile.get("id") + self._add_affinity_or_anti_affinity_group_to_nsr( + nsr_descriptor, + affinity_group_data, + affinity_group_prefix_name, + ) + for vld in nsr_vld: vld["vnfd-connection-point-ref"] = all_vld_connection_point_data.get( vld.get("id"), [] @@ -598,6 +628,51 @@ class NsrTopic(BaseTopic): return nsr_descriptor + def _get_affinity_or_anti_affinity_group_data_from_vnfd( + self, vnfd, affinity_group_id + ): + """ + Gets affinity-or-anti-affinity-group info from df and returns the desired affinity group + """ + affinity_group = utils.find_in_list( + vnfd.get("df", [[]])[0].get("affinity-or-anti-affinity-group", ()), + lambda ag: ag["id"] == affinity_group_id, + ) + affinity_group_data = {} + if affinity_group: + if affinity_group.get("id"): + affinity_group_data["ag-id"] = affinity_group["id"] + if affinity_group.get("type"): + affinity_group_data["type"] = affinity_group["type"] + if affinity_group.get("scope"): + affinity_group_data["scope"] = affinity_group["scope"] + return affinity_group_data + + def _add_affinity_or_anti_affinity_group_to_nsr( + self, nsr_descriptor, affinity_group_data, affinity_group_prefix_name + ): + """ + Adds affinity-or-anti-affinity-group to nsr checking first it is not already added + """ + affinity_group = next( + ( + f + for f in nsr_descriptor["affinity-or-anti-affinity-group"] + if all(f.get(k) == affinity_group_data[k] for k in affinity_group_data) + ), + None, + ) + if not affinity_group: + affinity_group_data["id"] = str( + len(nsr_descriptor["affinity-or-anti-affinity-group"]) + ) + affinity_group_data["name"] = "{}-{}".format( + affinity_group_prefix_name, affinity_group_data["ag-id"][:32] + ) + nsr_descriptor["affinity-or-anti-affinity-group"].append( + affinity_group_data + ) + def _get_image_data_from_vnfd(self, vnfd, sw_image_id): sw_image_desc = utils.find_in_list( vnfd.get("sw-image-desc", ()), lambda sw: sw["id"] == sw_image_id @@ -660,6 +735,13 @@ class NsrTopic(BaseTopic): "connection-point": [], "ip-address": None, # mgmt-interface filled by LCM } + + # Revision backwards compatility. Only specify the revision in the record if + # the original VNFD has a revision. + if "revision" in vnfd: + vnfr_descriptor["revision"] = vnfd["revision"] + + vnf_k8s_namespace = ns_k8s_namespace if vnf_params: if vnf_params.get("k8s-namespace"): @@ -710,9 +792,14 @@ class NsrTopic(BaseTopic): if kdu_params and kdu_params.get("k8s-namespace"): kdu_k8s_namespace = kdu_params["k8s-namespace"] + kdu_deployment_name = "" + if kdu_params and kdu_params.get("kdu-deployment-name"): + kdu_deployment_name = kdu_params.get("kdu-deployment-name") + kdur = { "additionalParams": additional_params, "k8s-namespace": kdu_k8s_namespace, + "kdu-deployment-name": kdu_deployment_name, "kdu-name": kdu["name"], # TODO "name": "" Name of the VDU in the VIM "ip-address": None, # mgmt-interface filled by LCM @@ -762,6 +849,14 @@ class NsrTopic(BaseTopic): additional_params, vdu_params = self._format_additional_params( ns_request, vnf_index, vdu_id=vdu["id"], descriptor=vnfd ) + + try: + vdu_virtual_storage_descriptors = utils.filter_in_list( + vnfd.get("virtual-storage-desc", []), + lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"] + ) + except Exception: + vdu_virtual_storage_descriptors = [] vdur = { "vdu-id-ref": vdu["id"], # TODO "name": "" Name of the VDU in the VIM @@ -771,6 +866,7 @@ class NsrTopic(BaseTopic): "interfaces": [], "additionalParams": additional_params, "vdu-name": vdu["name"], + "virtual-storages": vdu_virtual_storage_descriptors } if vdu_params and vdu_params.get("config-units"): vdur["config-units"] = vdu_params["config-units"] @@ -925,6 +1021,49 @@ class NsrTopic(BaseTopic): if nsr_flavor_desc: vdur["ns-flavor-id"] = nsr_flavor_desc["id"] + # Adding Affinity groups information to vdur + try: + vdu_profile_affinity_group = utils.find_in_list( + vnfd.get("df")[0]["vdu-profile"], + lambda a_vdu: a_vdu["id"] == vdu["id"], + ) + except Exception: + vdu_profile_affinity_group = None + + if vdu_profile_affinity_group: + affinity_group_ids = [] + for affinity_group in vdu_profile_affinity_group.get( + "affinity-or-anti-affinity-group", () + ): + vdu_affinity_group = utils.find_in_list( + vdu_profile_affinity_group.get( + "affinity-or-anti-affinity-group", () + ), + lambda ag_fp: ag_fp["id"] == affinity_group["id"], + ) + nsr_affinity_group = utils.find_in_list( + nsr_descriptor["affinity-or-anti-affinity-group"], + lambda nsr_ag: ( + nsr_ag.get("ag-id") == vdu_affinity_group.get("id") + and nsr_ag.get("member-vnf-index") + == vnfr_descriptor.get("member-vnf-index-ref") + ), + ) + # Update Affinity Group VIM name if VDU instantiation parameter is present + if vnf_params and vnf_params.get("affinity-or-anti-affinity-group"): + vnf_params_affinity_group = utils.find_in_list( + vnf_params["affinity-or-anti-affinity-group"], + lambda vnfp_ag: ( + vnfp_ag.get("id") == vdu_affinity_group.get("id") + ), + ) + if vnf_params_affinity_group.get("vim-affinity-group-id"): + nsr_affinity_group[ + "vim-affinity-group-id" + ] = vnf_params_affinity_group["vim-affinity-group-id"] + affinity_group_ids.append(nsr_affinity_group["id"]) + vdur["affinity-or-anti-affinity-group-id"] = affinity_group_ids + if vdu_instantiation_level: count = vdu_instantiation_level.get("number-of-instances") else: @@ -933,9 +1072,9 @@ class NsrTopic(BaseTopic): for index in range(0, count): vdur = deepcopy(vdur) for iface in vdur["interfaces"]: - if iface.get("ip-address"): + if iface.get("ip-address") and index != 0: iface["ip-address"] = increment_ip_mac(iface["ip-address"]) - if iface.get("mac-address"): + if iface.get("mac-address") and index != 0: iface["mac-address"] = increment_ip_mac(iface["mac-address"]) vdur["_id"] = str(uuid4()) @@ -945,6 +1084,41 @@ class NsrTopic(BaseTopic): return vnfr_descriptor + def vca_status_refresh(self, session, ns_instance_content, filter_q): + """ + vcaStatus in ns_instance_content maybe stale, check if it is stale and create lcm op + to refresh vca status by sending message to LCM when it is stale. Ignore otherwise. + :param session: contains "username", "admin", "force", "public", "project_id", "set_project" + :param ns_instance_content: ns instance content + :param filter_q: dict: query parameter containing vcaStatus-refresh as true or false + :return: None + """ + time_now, time_delta = time(), time() - ns_instance_content["_admin"]["modified"] + force_refresh = isinstance(filter_q, dict) and filter_q.get('vcaStatusRefresh') == 'true' + threshold_reached = time_delta > 120 + if force_refresh or threshold_reached: + operation, _id = "vca_status_refresh", ns_instance_content["_id"] + ns_instance_content["_admin"]["modified"] = time_now + self.db.set_one(self.topic, {"_id": _id}, ns_instance_content) + nslcmop_desc = NsLcmOpTopic._create_nslcmop(_id, operation, None) + self.format_on_new(nslcmop_desc, session["project_id"], make_public=session["public"]) + nslcmop_desc["_admin"].pop("nsState") + self.msg.write("ns", operation, nslcmop_desc) + return + + def show(self, session, _id, filter_q=None, api_req=False): + """ + Get complete information on an ns instance. + :param session: contains "username", "admin", "force", "public", "project_id", "set_project" + :param _id: string, ns instance id + :param filter_q: dict: query parameter containing vcaStatusRefresh as true or false + :param api_req: True if this call is serving an external API request. False if serving internal request. + :return: dictionary, raise exception if not found. + """ + ns_instance_content = super().show(session, _id, api_req) + self.vca_status_refresh(session, ns_instance_content, filter_q) + return ns_instance_content + def edit(self, session, _id, indata=None, kwargs=None, content=None): raise EngineException( "Method edit called directly", HTTPStatus.INTERNAL_SERVER_ERROR @@ -981,6 +1155,7 @@ class NsLcmOpTopic(BaseTopic): operation_schema = { # mapping between operation and jsonschema to validate "instantiate": ns_instantiate, "action": ns_action, + "update": ns_update, "scale": ns_scale, "terminate": ns_terminate, } @@ -992,7 +1167,7 @@ class NsLcmOpTopic(BaseTopic): """ Check that user has enter right parameters for the operation :param session: contains "username", "admin", "force", "public", "project_id", "set_project" - :param operation: it can be: instantiate, terminate, action, TODO: update, heal + :param operation: it can be: instantiate, terminate, action, update. TODO: heal :param indata: descriptor with the parameters of the operation :return: None """ @@ -1000,6 +1175,8 @@ class NsLcmOpTopic(BaseTopic): self._check_action_ns_operation(indata, nsr) elif operation == "scale": self._check_scale_ns_operation(indata, nsr) + elif operation == "update": + self._check_update_ns_operation(indata, nsr) elif operation == "instantiate": self._check_instantiate_ns_operation(indata, nsr, session) @@ -1092,6 +1269,85 @@ class NsLcmOpTopic(BaseTopic): ) ) + def _check_update_ns_operation(self, indata, nsr) -> None: + """Validates the ns-update request according to updateType + + If updateType is CHANGE_VNFPKG: + - it checks the vnfInstanceId, whether it's available under ns instance + - it checks the vnfdId whether it matches with the vnfd-id in the vnf-record of specified VNF. + Otherwise exception will be raised. + + Args: + indata: includes updateType such as CHANGE_VNFPKG, + nsr: network service record + + Raises: + EngineException: + a meaningful error if given update parameters are not proper such as + "Error in validating ns-update request: does not match + with the vnfd-id of vnfinstance + http_code=HTTPStatus.UNPROCESSABLE_ENTITY" + + """ + try: + if indata["updateType"] == "CHANGE_VNFPKG": + # vnfInstanceId, nsInstanceId, vnfdId are mandatory + vnf_instance_id = indata["changeVnfPackageData"]["vnfInstanceId"] + ns_instance_id = indata["nsInstanceId"] + vnfd_id_2update = indata["changeVnfPackageData"]["vnfdId"] + + if vnf_instance_id not in nsr["constituent-vnfr-ref"]: + + raise EngineException( + f"Error in validating ns-update request: vnf {vnf_instance_id} does not " + f"belong to NS {ns_instance_id}", + http_code=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + # Getting vnfrs through the ns_instance_id + vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instance_id}) + constituent_vnfd_id = next( + ( + vnfr["vnfd-id"] + for vnfr in vnfrs + if vnfr["id"] == vnf_instance_id + ), + None, + ) + + # Check the given vnfd-id belongs to given vnf instance + if constituent_vnfd_id and (vnfd_id_2update != constituent_vnfd_id): + + raise EngineException( + f"Error in validating ns-update request: vnfd-id {vnfd_id_2update} does not " + f"match with the vnfd-id: {constituent_vnfd_id} of VNF instance: {vnf_instance_id}", + http_code=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + # Validating the ns update timeout + if ( + indata.get("timeout_ns_update") + and indata["timeout_ns_update"] < 300 + ): + raise EngineException( + "Error in validating ns-update request: {} second is not enough " + "to upgrade the VNF instance: {}".format( + indata["timeout_ns_update"], vnf_instance_id + ), + http_code=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + except ( + DbException, + AttributeError, + IndexError, + KeyError, + ValueError, + ) as e: + raise type(e)( + "Ns update request could not be processed with error: {}.".format(e) + ) + def _check_scale_ns_operation(self, indata, nsr): vnfd = self._get_vnfd_from_vnf_member_index( indata["scaleVnfData"]["scaleByStepData"]["member-vnf-index"], nsr["_id"] @@ -1162,7 +1418,14 @@ class NsLcmOpTopic(BaseTopic): "Invalid parameter member_vnf_index='{}' is not one of the " "nsd:constituent-vnfd".format(member_vnf_index) ) - vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False) + + ## Backwards compatibility: if there is no revision, get it from the one and only VNFD entry + if "revision" in vnfr: + vnfd_revision = vnfr["vnfd-id"] + ":" + str(vnfr["revision"]) + vnfd = self.db.get_one("vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False) + else: + vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False) + if not vnfd: raise EngineException( "vnfd id={} has been deleted!. Operation cannot be performed".format( @@ -1286,6 +1549,18 @@ class NsLcmOpTopic(BaseTopic): ) vim_accounts.append(vim_account) + def _get_vim_account(self, vim_id: str, session): + try: + db_filter = self._get_project_filter(session) + db_filter["_id"] = vim_id + return self.db.get_one("vim_accounts", db_filter) + except Exception: + raise EngineException( + "Invalid vimAccountId='{}' not present for the project".format( + vim_id + ) + ) + def _check_valid_wim_account(self, wim_account, wim_accounts, session): if not isinstance(wim_account, str): return @@ -1548,6 +1823,68 @@ class NsLcmOpTopic(BaseTopic): # TODO check that this forcing is not incompatible with other forcing return ifaces_forcing_vim_network + def _update_vnfrs_from_nsd(self, nsr): + try: + nsr_id = nsr["_id"] + nsd = nsr["nsd"] + + step = "Getting vnf_profiles from nsd" + vnf_profiles = nsd.get("df", [{}])[0].get("vnf-profile", ()) + vld_fixed_ip_connection_point_data = {} + + step = "Getting ip-address info from vnf_profile if it exists" + for vnfp in vnf_profiles: + # Checking ip-address info from nsd.vnf_profile and storing + for vlc in vnfp.get("virtual-link-connectivity", ()): + for cpd in vlc.get("constituent-cpd-id", ()): + if cpd.get("ip-address"): + step = "Storing ip-address info" + vld_fixed_ip_connection_point_data.update({vlc.get("virtual-link-profile-id") + '.' + cpd.get("constituent-base-element-id"): { + "vnfd-connection-point-ref": cpd.get( + "constituent-cpd-id"), + "ip-address": cpd.get( + "ip-address")}}) + + # Inserting ip address to vnfr + if len(vld_fixed_ip_connection_point_data) > 0: + step = "Getting vnfrs" + vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + for item in vld_fixed_ip_connection_point_data.keys(): + step = "Filtering vnfrs" + vnfr = next(filter(lambda vnfr: vnfr["member-vnf-index-ref"] == item.split('.')[1], vnfrs), None) + if vnfr: + vnfr_update = {} + for vdur_index, vdur in enumerate(vnfr["vdur"]): + for iface_index, iface in enumerate(vdur["interfaces"]): + step = "Looking for matched interface" + if ( + iface.get("external-connection-point-ref") + == vld_fixed_ip_connection_point_data[item].get("vnfd-connection-point-ref") and + iface.get("ns-vld-id") == item.split('.')[0] + + ): + vnfr_update_text = "vdur.{}.interfaces.{}".format( + vdur_index, iface_index + ) + step = "Storing info in order to update vnfr" + vnfr_update[ + vnfr_update_text + ".ip-address" + ] = increment_ip_mac( + vld_fixed_ip_connection_point_data[item].get("ip-address"), + vdur.get("count-index", 0), ) + vnfr_update[vnfr_update_text + ".fixed-ip"] = True + + step = "updating vnfr at database" + self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, vnfr_update) + except ( + ValidationError, + EngineException, + DbException, + MsgException, + FsException, + ) as e: + raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) + def _update_vnfrs(self, session, rollback, nsr, indata): # get vnfr nsr_id = nsr["_id"] @@ -1560,15 +1897,14 @@ class NsLcmOpTopic(BaseTopic): # update vim-account-id vim_account = indata["vimAccountId"] - vca_id = indata.get("vcaId") + vca_id = self._get_vim_account(vim_account, session).get("vca") # check instantiate parameters for vnf_inst_params in get_iterable(indata.get("vnf")): if vnf_inst_params["member-vnf-index"] != member_vnf_index: continue if vnf_inst_params.get("vimAccountId"): vim_account = vnf_inst_params.get("vimAccountId") - if vnf_inst_params.get("vcaId"): - vca_id = vnf_inst_params.get("vcaId") + vca_id = self._get_vim_account(vim_account, session).get("vca") # get vnf.vdu.interface instantiation params to update vnfr.vdur.interfaces ip, mac for vdu_inst_param in get_iterable(vnf_inst_params.get("vdu")): @@ -1748,7 +2084,7 @@ class NsLcmOpTopic(BaseTopic): """ Creates a ns-lcm-opp content to be stored at database. :param nsr_id: internal id of the instance - :param operation: instantiate, terminate, scale, action, ... + :param operation: instantiate, terminate, scale, action, update ... :param params: user parameters for the operation :return: dictionary following SOL005 format """ @@ -1804,7 +2140,7 @@ class NsLcmOpTopic(BaseTopic): :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param indata: descriptor with the parameters of the operation. It must contains among others nsInstanceId: _id of the nsr to perform the operation - operation: it can be: instantiate, terminate, action, TODO: update, heal + operation: it can be: instantiate, terminate, action, update TODO: heal :param kwargs: used to override the indata descriptor :param headers: http request headers :return: id of the nslcmops @@ -1866,8 +2202,13 @@ class NsLcmOpTopic(BaseTopic): HTTPStatus.CONFLICT, ) self._check_ns_operation(session, nsr, operation, indata) + if (indata.get("primitive_params")): + indata["primitive_params"] = json.dumps(indata["primitive_params"]) + elif (indata.get("additionalParamsForVnf")): + indata["additionalParamsForVnf"] = json.dumps(indata["additionalParamsForVnf"]) if operation == "instantiate": + self._update_vnfrs_from_nsd(nsr) self._update_vnfrs(session, rollback, nsr, indata) nslcmop_desc = self._create_nslcmop(nsInstanceId, operation, indata)