From f100459221b95cdaaa543793623e556a9abd4852 Mon Sep 17 00:00:00 2001 From: "selvi.j" Date: Fri, 29 Apr 2022 05:42:35 +0000 Subject: [PATCH] Feature 10926 - Subscription feature for SOL003 VNF-LCM Added the subscription and notification implementation code for SOL003 Life cycle management interface Change-Id: Id73f83f4bf1bbbd07f0b3844a9a89442f8cdfb95 Signed-off-by: selvi.j --- osm_nbi/engine.py | 2 + osm_nbi/nbi.py | 6 ++ osm_nbi/notifications.py | 124 ++++++++++++++++++++++- osm_nbi/osm_vnfm/vnf_instance_actions.py | 26 +++++ osm_nbi/osm_vnfm/vnf_instances.py | 10 ++ osm_nbi/osm_vnfm/vnf_subscription.py | 65 ++++++++++++ osm_nbi/subscriptions.py | 34 ++++++- osm_nbi/validation.py | 57 +++++++++++ 8 files changed, 321 insertions(+), 3 deletions(-) create mode 100644 osm_nbi/osm_vnfm/vnf_subscription.py diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index 7afaa22..37f1fb2 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -55,6 +55,7 @@ from osm_nbi.instance_topics import ( from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic from osm_nbi.pmjobs_topics import PmJobsTopic from osm_nbi.subscription_topics import NslcmSubscriptionsTopic +from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic from base64 import b64encode from os import urandom # , path from threading import Lock @@ -88,6 +89,7 @@ class Engine(object): "nslcm_subscriptions": NslcmSubscriptionsTopic, "vnf_instances": VnfInstances, "vnflcmops": VnfLcmOpTopic, + "vnflcm_subscriptions": VnflcmSubscriptionsTopic, # [NEW_TOPIC]: add an entry here # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters } diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index f57258f..f19f44d 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -510,6 +510,12 @@ valid_url_methods = { "ROLE_PERMISSION": "vnf_instances:opps:id:" }, }, + "subscriptions": {"METHODS": ("GET", "POST"), + "ROLE_PERMISSION": "vnflcm_subscriptions:", + "": {"METHODS": ("GET", "DELETE"), + "ROLE_PERMISSION": "vnflcm_subscriptions:id:" + } + }, } }, "nst": { diff --git a/osm_nbi/notifications.py b/osm_nbi/notifications.py index 7b681a1..47a24ba 100644 --- a/osm_nbi/notifications.py +++ b/osm_nbi/notifications.py @@ -348,11 +348,18 @@ class NsLcmNotification(NotificationBase): :param event_details: dict containing raw data of event occured. :return: List of interested subscribers for occurred event. """ + notification_type = [ + "NsLcmOperationOccurrenceNotification", + "NsChangeNotification", + "NsIdentifierCreationNotification", + "NsIdentifierDeletionNotification" + ] filter_q = { "identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"], - } + "notificationType": notification_type + } if op_state: filter_q["operationStates"].append(op_state) if command: @@ -369,6 +376,121 @@ class NsLcmNotification(NotificationBase): return subscribers +class VnfLcmNotification(NotificationBase): + # SOL003 response model for vnflcm notifications + response_models = { + "VnfLcmOperationOccurrenceNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "notificationStatus", + "operationState", + "vnfInstanceId", + "operation", + "isAutomaticInvocation", + "vnfLcmOpOccId", + "affectedVnfcs", + "affectedVirtualLinks", + "affectedExtLinkPorts", + "affectedVirtualStorages", + "changedInfo", + "changedExtConnectivity", + "modificationsTriggeredByVnfPkgChange", + "error", + "_links" + }, + "VnfIdentifierCreationNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links" + }, + "VnfIdentifierDeletionNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links" + }, + } + + def __init__(self, db) -> None: + """ + Constructor of VnfLcmNotification class. + :param db: Database handler. + """ + super().__init__(db) + self.subscriber_collection = "mapped_subscriptions" + + def get_models(self) -> dict: + """ + Returns the SOL003 model of notification class + :param None + :return: dict of SOL003 data model + """ + return self.response_models + + def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list: + """ + Formats the raw event details from kafka message and subscriber details. + :param subscribers: A list of subscribers whom the event needs to be notified. + :param event_details: A dict containing all meta data of event. + :return: + """ + notification_id = str(uuid4()) + event_timestamp = time.time() + event_operation = event_details["command"] + for subscriber in subscribers: + subscriber["id"] = notification_id + subscriber["timeStamp"] = event_timestamp + subscriber["subscriptionId"] = subscriber["reference"] + subscriber["operation"] = event_operation + del subscriber["reference"] + del subscriber["_id"] + subscriber.update(event_details["params"]) + return subscribers + + def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str, + event_details: dict) -> list: + """ + Queries database and returns list of subscribers. + :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) + :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. + :param command: the command for event. + :param op_state: the operation state of VNF. + :param event_details: dict containing raw data of event occurred. + :return: List of interested subscribers for occurred event. + """ + notification_type = [ + "VnfIdentifierCreationNotification", + "VnfLcmOperationOccurrenceNotification", + "VnfIdentifierDeletionNotification" + ] + filter_q = { + "identifier": [vnfd_id, vnf_instance_id], + "operationStates": ["ANY"], + "operationTypes": ["ANY"], + "notificationType": notification_type + } + if op_state: + filter_q["operationStates"].append(op_state) + if command: + filter_q["operationTypes"].append(command) + subscribers = [] + try: + subscribers = self.db.get_list(self.subscriber_collection, filter_q) + subscribers = self._format_vnflcm_subscribers(subscribers, event_details) + except Exception as e: + error_text = type(e).__name__ + ": " + str(e) + self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) + finally: + return subscribers + + class NsdNotification(NotificationBase): def __init__(self, db): """ diff --git a/osm_nbi/osm_vnfm/vnf_instance_actions.py b/osm_nbi/osm_vnfm/vnf_instance_actions.py index 93c91c5..947f0b7 100644 --- a/osm_nbi/osm_vnfm/vnf_instance_actions.py +++ b/osm_nbi/osm_vnfm/vnf_instance_actions.py @@ -133,6 +133,31 @@ class NewVnfLcmOp(BaseMethod): } return formatted_indata + def notify_operation(self, session, _id, lcm_operation, op_id): + """ + Formats the operation message params and sends to kafka + :param session: contains "username", "admin", "force", "public", "project_id", "set_project" + :param _id: vnf instance id + :param lcm_operation: lcm operation type of a VNF (instantiate, scale, terminate) + :param op_id: lcm operation id of a VNF + :return: None + """ + vnfInstanceId = _id + operation = lcm_operation + nslcmop_rec = self.nslcmoptopic.show(session, op_id) + operation_status = nslcmop_rec["operationState"] + vnfr = self.vnfrtopic.show(session, vnfInstanceId) + links = {"self": "/osm/vnflcm/v1/vnf_lcm_op_occs/" + op_id, + "vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfInstanceId} + params = {"vnfdId": vnfr["vnfd-ref"], + "vnfInstanceId": vnfInstanceId, + "operationState": operation_status, + "vnfLcmOpOccId": op_id, + "_links": links + } + self.msg.write("vnf", operation, params) + return None + def action(self, rollback, session, indata=None, kwargs=None, headers=None): """ Creates an new lcm operation. @@ -149,6 +174,7 @@ class NewVnfLcmOp(BaseMethod): indata["vnfInstanceId"] = vnfr.get("nsr-id-ref") indata = self.__get_formatted_indata(session, indata) op_id, _ = self.nslcmoptopic.new(rollback, session, indata, kwargs, headers) + self.notify_operation(session, vnfInstanceId, lcm_operation, op_id) return op_id, _ diff --git a/osm_nbi/osm_vnfm/vnf_instances.py b/osm_nbi/osm_vnfm/vnf_instances.py index a41f6d5..a6a57fc 100644 --- a/osm_nbi/osm_vnfm/vnf_instances.py +++ b/osm_nbi/osm_vnfm/vnf_instances.py @@ -224,6 +224,11 @@ class NewVnfInstance(BaseMethod): nsr_id, _ = self.__create_nsr(rollback, session, formatted_indata, kwargs, headers) nsr = self.nsrtopic.show(session, nsr_id) vnfr_id = nsr['constituent-vnfr-ref'][0] + if vnfr_id: + links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfr_id} + indata["vnfInstanceId"] = vnfr_id + indata["_links"] = links + self.msg.write("vnf", "create", indata) return vnfr_id, None def action(self, rollback, session, indata=None, kwargs=None, headers=None): @@ -305,5 +310,10 @@ class DeleteVnfInstance(BaseMethod): ns_id = vnfr.get("nsr-id-ref") nsr = self.nsrtopic.show(session, ns_id) nsd_to_del = nsr['nsd']['_id'] + links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + _id} + params = {"vnfdId": vnfr["vnfd-ref"], + "vnfInstanceId": _id, + "_links": links} + self.msg.write("vnf", "delete", params) self.nsrtopic.delete(session, ns_id, dry_run, not_send_msg) return self.nsdtopic.delete(session, nsd_to_del, dry_run, not_send_msg) diff --git a/osm_nbi/osm_vnfm/vnf_subscription.py b/osm_nbi/osm_vnfm/vnf_subscription.py new file mode 100644 index 0000000..5371a44 --- /dev/null +++ b/osm_nbi/osm_vnfm/vnf_subscription.py @@ -0,0 +1,65 @@ +# Copyright 2021 Selvi Jayaraman (Tata Elxsi) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__author__ = "Selvi Jayaraman " + +from osm_nbi.subscription_topics import CommonSubscriptions +from osm_nbi.validation import vnf_subscription + +class VnflcmSubscriptionsTopic(CommonSubscriptions): + schema_new = vnf_subscription + def _subscription_mapper(self, _id, data, table): + """ + Performs data transformation on subscription request + :param _id: subscription reference id + :param data: data to be transformed + :param table: table in which transformed data are inserted + """ + formatted_data = [] + formed_data = { + "reference": data.get("_id"), + "CallbackUri": data.get("CallbackUri") + } + if data.get("authentication"): + formed_data.update({"authentication": data.get("authentication")}) + if data.get("filter"): + if data["filter"].get("VnfInstanceSubscriptionFilter"): + key = list(data["filter"]["VnfInstanceSubscriptionFilter"].keys())[0] + identifier = data["filter"]["VnfInstanceSubscriptionFilter"][key] + formed_data.update({"identifier": identifier}) + if data["filter"].get("notificationTypes"): + for elem in data["filter"].get("notificationTypes"): + update_dict = formed_data.copy() + update_dict["notificationType"] = elem + if elem == "VnfIdentifierCreationNotification": + update_dict["operationTypes"] = "CREATE" + update_dict["operationStates"] = "ANY" + formatted_data.append(update_dict) + elif elem == "VnfIdentifierDeletionNotification": + update_dict["operationTypes"] = "DELETE" + update_dict["operationStates"] = "ANY" + formatted_data.append(update_dict) + elif elem == "VnfLcmOperationOccurrenceNotification": + if "operationTypes" in data["filter"].keys(): + update_dict["operationTypes"] = data["filter"]["operationTypes"] + else: + update_dict["operationTypes"] = "ANY" + if "operationStates" in data["filter"].keys(): + update_dict["operationStates"] = data["filter"]["operationStates"] + else: + update_dict["operationStates"] = "ANY" + formatted_data.append(update_dict) + self.db.create_list(table, formatted_data) + return None diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 6810ccd..1f172dd 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka from osm_common.dbbase import DbException from osm_common.msgbase import MsgException from osm_nbi.engine import EngineException -from osm_nbi.notifications import NsLcmNotification +from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification __author__ = "Alfonso Tierno " @@ -70,6 +70,7 @@ class SubscriptionThread(threading.Thread): "method": "delete", } self.nslcm = None + self.vnflcm = None async def start_kafka(self): # timeout_wait_for_kafka = 3*60 @@ -84,6 +85,7 @@ class SubscriptionThread(threading.Thread): ) await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) + await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop) if not kafka_working: self.logger.critical("kafka is working again") kafka_working = True @@ -104,7 +106,7 @@ class SubscriptionThread(threading.Thread): self.logger.debug("Starting non-admin subscription task") self.aiomain_task = asyncio.ensure_future( self.msg.aioread( - ("ns", "nsi"), + ("ns", "nsi", "vnf"), loop=self.loop, aiocallback=self._msg_callback, ), @@ -178,6 +180,7 @@ class SubscriptionThread(threading.Thread): ) ) self.nslcm = NsLcmNotification(self.db) + self.vnflcm = VnfLcmNotification(self.db) except (DbException, MsgException) as e: raise SubscriptionException(str(e), http_code=e.http_code) @@ -272,6 +275,33 @@ class SubscriptionThread(threading.Thread): self.logger.debug( "Message can not be used for notification of nslcm" ) + elif topic == "vnf": + if isinstance(params, dict): + vnfd_id = params["vnfdId"] + vnf_instance_id = params["vnfInstanceId"] + if command == "create" or command == "delete": + op_state = command + else: + op_state = params["operationState"] + event_details = { + "topic": topic, + "command": command.upper(), + "params": params, + } + subscribers = self.vnflcm.get_subscribers( + vnfd_id, + vnf_instance_id, + command.upper(), + op_state, + event_details + ) + if subscribers: + asyncio.ensure_future( + self.vnflcm.send_notifications( + subscribers, loop=self.loop + ), + loop=self.loop + ) elif topic == "nsi": if command == "terminated" and params["operationState"] in ( "COMPLETED", diff --git a/osm_nbi/validation.py b/osm_nbi/validation.py index 3373b84..3fec364 100644 --- a/osm_nbi/validation.py +++ b/osm_nbi/validation.py @@ -1309,6 +1309,63 @@ subscription = { "required": ["CallbackUri"], } +vnflcmsub_schema = { + "title": "vnflcmsubscription input schema", + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "VnfInstanceSubscriptionFilter": { + "type": "object", + "properties": { + "vnfdIds": {"type": "array"}, + "vnfInstanceIds": {"type": "array"}, + }, + }, + "notificationTypes": { + "type": "array", + "items": { + "enum": [ + "VnfIdentifierCreationNotification", + "VnfLcmOperationOccurrenceNotification", + "VnfIdentifierDeletionNotification" + ] + } + }, + "operationTypes": { + "type": "array", + "items": { + "enum": [ + "INSTANTIATE", "SCALE", "SCALE_TO_LEVEL", "CHANGE_FLAVOUR", "TERMINATE", + "HEAL", "OPERATE", "CHANGE_EXT_CONN", "MODIFY_INFO", "CREATE_SNAPSHOT", + "REVERT_TO_SNAPSHOT", "CHANGE_VNFPKG" + ] + } + }, + "operationStates": { + "type": "array", + "items": { + "enum": [ + "STARTING", "PROCESSING", "COMPLETED", "FAILED_TEMP", "FAILED", + "ROLLING_BACK", "ROLLED_BACK" + ] + } + } + }, + "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"] + } + +vnf_subscription = { + "title": "vnf subscription input schema", + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "filter": vnflcmsub_schema, + "CallbackUri": description_schema, + "authentication": authentication_schema + }, + "required": ["filter", "CallbackUri"] +} + class ValidationError(Exception): def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY): -- 2.25.1