Feature 10926 - Subscription feature for SOL003 VNF-LCM 72/11972/2
authorselvi.j <selvi.j@tataelxsi.co.in>
Fri, 29 Apr 2022 05:42:35 +0000 (05:42 +0000)
committerjayaramans <selvi.j@tataelxsi.co.in>
Tue, 24 May 2022 11:29:46 +0000 (13:29 +0200)
Added the subscription and notification implementation code
for SOL003 Life cycle management interface

Change-Id: Id73f83f4bf1bbbd07f0b3844a9a89442f8cdfb95
Signed-off-by: selvi.j <selvi.j@tataelxsi.co.in>
osm_nbi/engine.py
osm_nbi/nbi.py
osm_nbi/notifications.py
osm_nbi/osm_vnfm/vnf_instance_actions.py
osm_nbi/osm_vnfm/vnf_instances.py
osm_nbi/osm_vnfm/vnf_subscription.py [new file with mode: 0644]
osm_nbi/subscriptions.py
osm_nbi/validation.py

index 7afaa22..37f1fb2 100644 (file)
@@ -55,6 +55,7 @@ from osm_nbi.instance_topics import (
 from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
 from osm_nbi.pmjobs_topics import PmJobsTopic
 from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
+from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic
 from base64 import b64encode
 from os import urandom  # , path
 from threading import Lock
@@ -88,6 +89,7 @@ class Engine(object):
         "nslcm_subscriptions": NslcmSubscriptionsTopic,
         "vnf_instances": VnfInstances,
         "vnflcmops": VnfLcmOpTopic,
+        "vnflcm_subscriptions": VnflcmSubscriptionsTopic,
         # [NEW_TOPIC]: add an entry here
         # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
     }
index f57258f..f19f44d 100644 (file)
@@ -510,6 +510,12 @@ valid_url_methods = {
                                         "ROLE_PERMISSION": "vnf_instances:opps:id:"
                                         },
                                },
+            "subscriptions": {"METHODS": ("GET", "POST"),
+                              "ROLE_PERMISSION": "vnflcm_subscriptions:",
+                              "<ID>": {"METHODS": ("GET", "DELETE"),
+                                       "ROLE_PERMISSION": "vnflcm_subscriptions:id:"
+                                       }
+                              },
         }
     },
     "nst": {
index 7b681a1..47a24ba 100644 (file)
@@ -348,11 +348,18 @@ class NsLcmNotification(NotificationBase):
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
+        notification_type = [
+            "NsLcmOperationOccurrenceNotification",
+            "NsChangeNotification",
+            "NsIdentifierCreationNotification",
+            "NsIdentifierDeletionNotification"
+        ]
         filter_q = {
             "identifier": [nsd_id, ns_instance_id],
             "operationStates": ["ANY"],
             "operationTypes": ["ANY"],
-        }
+            "notificationType": notification_type
+            }
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
@@ -369,6 +376,121 @@ class NsLcmNotification(NotificationBase):
             return subscribers
 
 
+class VnfLcmNotification(NotificationBase):
+    # SOL003 response model for vnflcm notifications
+    response_models = {
+        "VnfLcmOperationOccurrenceNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "notificationStatus",
+            "operationState",
+            "vnfInstanceId",
+            "operation",
+            "isAutomaticInvocation",
+            "vnfLcmOpOccId",
+            "affectedVnfcs",
+            "affectedVirtualLinks",
+            "affectedExtLinkPorts",
+            "affectedVirtualStorages",
+            "changedInfo",
+            "changedExtConnectivity",
+            "modificationsTriggeredByVnfPkgChange",
+            "error",
+            "_links"
+        },
+        "VnfIdentifierCreationNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "vnfInstanceId",
+            "_links"
+        },
+        "VnfIdentifierDeletionNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "vnfInstanceId",
+            "_links"
+        },
+    }
+
+    def __init__(self, db) -> None:
+        """
+        Constructor of VnfLcmNotification class.
+        :param db: Database handler.
+        """
+        super().__init__(db)
+        self.subscriber_collection = "mapped_subscriptions"
+
+    def get_models(self) -> dict:
+        """
+        Returns the SOL003 model of notification class
+        :param None
+        :return: dict of SOL003 data model
+        """
+        return self.response_models
+
+    def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+        """
+        Formats the raw event details from kafka message and subscriber details.
+        :param subscribers: A list of subscribers whom the event needs to be notified.
+        :param event_details: A dict containing all meta data of event.
+        :return:
+        """
+        notification_id = str(uuid4())
+        event_timestamp = time.time()
+        event_operation = event_details["command"]
+        for subscriber in subscribers:
+            subscriber["id"] = notification_id
+            subscriber["timeStamp"] = event_timestamp
+            subscriber["subscriptionId"] = subscriber["reference"]
+            subscriber["operation"] = event_operation
+            del subscriber["reference"]
+            del subscriber["_id"]
+            subscriber.update(event_details["params"])
+        return subscribers
+
+    def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
+                        event_details: dict) -> list:
+        """
+        Queries database and returns list of subscribers.
+        :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
+        :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
+        :param command: the command for event.
+        :param op_state: the operation state of VNF.
+        :param event_details: dict containing raw data of event occurred.
+        :return: List of interested subscribers for occurred event.
+        """
+        notification_type = [
+            "VnfIdentifierCreationNotification",
+            "VnfLcmOperationOccurrenceNotification",
+            "VnfIdentifierDeletionNotification"
+        ]
+        filter_q = {
+            "identifier": [vnfd_id, vnf_instance_id],
+            "operationStates": ["ANY"],
+            "operationTypes": ["ANY"],
+            "notificationType": notification_type
+        }
+        if op_state:
+            filter_q["operationStates"].append(op_state)
+        if command:
+            filter_q["operationTypes"].append(command)
+        subscribers = []
+        try:
+            subscribers = self.db.get_list(self.subscriber_collection, filter_q)
+            subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
+        except Exception as e:
+            error_text = type(e).__name__ + ": " + str(e)
+            self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
+        finally:
+            return subscribers
+
+
 class NsdNotification(NotificationBase):
     def __init__(self, db):
         """
index 93c91c5..947f0b7 100644 (file)
@@ -133,6 +133,31 @@ class NewVnfLcmOp(BaseMethod):
             }
         return formatted_indata
 
+    def notify_operation(self, session, _id, lcm_operation, op_id):
+        """
+        Formats the operation message params and sends to kafka
+        :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+        :param _id: vnf instance id
+        :param lcm_operation: lcm operation type of a VNF (instantiate, scale, terminate)
+        :param op_id: lcm operation id of a VNF
+        :return: None
+        """
+        vnfInstanceId = _id
+        operation = lcm_operation
+        nslcmop_rec = self.nslcmoptopic.show(session, op_id)
+        operation_status = nslcmop_rec["operationState"]
+        vnfr = self.vnfrtopic.show(session, vnfInstanceId)
+        links = {"self": "/osm/vnflcm/v1/vnf_lcm_op_occs/" + op_id,
+                 "vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfInstanceId}
+        params = {"vnfdId": vnfr["vnfd-ref"],
+                  "vnfInstanceId": vnfInstanceId,
+                  "operationState": operation_status,
+                  "vnfLcmOpOccId": op_id,
+                  "_links": links
+                  }
+        self.msg.write("vnf", operation, params)
+        return None
+
     def action(self, rollback, session, indata=None, kwargs=None, headers=None):
         """
         Creates an new lcm operation.
@@ -149,6 +174,7 @@ class NewVnfLcmOp(BaseMethod):
         indata["vnfInstanceId"] = vnfr.get("nsr-id-ref")
         indata = self.__get_formatted_indata(session, indata)
         op_id, _ = self.nslcmoptopic.new(rollback, session, indata, kwargs, headers)
+        self.notify_operation(session, vnfInstanceId, lcm_operation, op_id)
         return op_id, _
 
 
index a41f6d5..a6a57fc 100644 (file)
@@ -224,6 +224,11 @@ class NewVnfInstance(BaseMethod):
         nsr_id, _ = self.__create_nsr(rollback, session, formatted_indata, kwargs, headers)
         nsr = self.nsrtopic.show(session, nsr_id)
         vnfr_id =  nsr['constituent-vnfr-ref'][0]
+        if vnfr_id:
+            links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfr_id}
+            indata["vnfInstanceId"] = vnfr_id
+            indata["_links"] = links
+            self.msg.write("vnf", "create", indata)
         return vnfr_id, None
 
     def action(self, rollback, session, indata=None, kwargs=None, headers=None):
@@ -305,5 +310,10 @@ class DeleteVnfInstance(BaseMethod):
         ns_id = vnfr.get("nsr-id-ref")
         nsr = self.nsrtopic.show(session, ns_id)
         nsd_to_del = nsr['nsd']['_id']
+        links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + _id}
+        params = {"vnfdId": vnfr["vnfd-ref"],
+                  "vnfInstanceId": _id,
+                  "_links": links}
+        self.msg.write("vnf", "delete", params)
         self.nsrtopic.delete(session, ns_id, dry_run, not_send_msg)
         return self.nsdtopic.delete(session, nsd_to_del, dry_run, not_send_msg)
diff --git a/osm_nbi/osm_vnfm/vnf_subscription.py b/osm_nbi/osm_vnfm/vnf_subscription.py
new file mode 100644 (file)
index 0000000..5371a44
--- /dev/null
@@ -0,0 +1,65 @@
+# Copyright 2021 Selvi Jayaraman (Tata Elxsi)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__author__ = "Selvi Jayaraman <selvi.j@tataelxsi.co.in>"
+
+from osm_nbi.subscription_topics import CommonSubscriptions
+from osm_nbi.validation import vnf_subscription
+
+class VnflcmSubscriptionsTopic(CommonSubscriptions):
+    schema_new = vnf_subscription
+    def _subscription_mapper(self, _id, data, table):
+        """
+        Performs data transformation on subscription request
+        :param _id: subscription reference id
+        :param data: data to be transformed
+        :param table: table in which transformed data are inserted
+        """
+        formatted_data = []
+        formed_data = {
+            "reference": data.get("_id"),
+            "CallbackUri": data.get("CallbackUri")
+        }
+        if data.get("authentication"):
+            formed_data.update({"authentication": data.get("authentication")})
+        if data.get("filter"):
+            if data["filter"].get("VnfInstanceSubscriptionFilter"):
+                key = list(data["filter"]["VnfInstanceSubscriptionFilter"].keys())[0]
+                identifier = data["filter"]["VnfInstanceSubscriptionFilter"][key]
+                formed_data.update({"identifier": identifier})
+            if data["filter"].get("notificationTypes"):
+                for elem in data["filter"].get("notificationTypes"):
+                    update_dict = formed_data.copy()
+                    update_dict["notificationType"] = elem
+                    if elem == "VnfIdentifierCreationNotification":
+                        update_dict["operationTypes"] = "CREATE"
+                        update_dict["operationStates"] = "ANY"
+                        formatted_data.append(update_dict)
+                    elif elem == "VnfIdentifierDeletionNotification":
+                        update_dict["operationTypes"] = "DELETE"
+                        update_dict["operationStates"] = "ANY"
+                        formatted_data.append(update_dict)
+                    elif elem == "VnfLcmOperationOccurrenceNotification":
+                        if "operationTypes" in data["filter"].keys():
+                            update_dict["operationTypes"] = data["filter"]["operationTypes"]
+                        else:
+                            update_dict["operationTypes"] = "ANY"
+                        if "operationStates" in data["filter"].keys():
+                            update_dict["operationStates"] = data["filter"]["operationStates"]
+                        else:
+                            update_dict["operationStates"] = "ANY"
+                        formatted_data.append(update_dict)
+        self.db.create_list(table, formatted_data)
+        return None
index 6810ccd..1f172dd 100644 (file)
@@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_nbi.engine import EngineException
-from osm_nbi.notifications import NsLcmNotification
+from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -70,6 +70,7 @@ class SubscriptionThread(threading.Thread):
             "method": "delete",
         }
         self.nslcm = None
+        self.vnflcm = None
 
     async def start_kafka(self):
         # timeout_wait_for_kafka = 3*60
@@ -84,6 +85,7 @@ class SubscriptionThread(threading.Thread):
                 )
                 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
                 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+                await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
@@ -104,7 +106,7 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(
                         self.msg.aioread(
-                            ("ns", "nsi"),
+                            ("ns", "nsi", "vnf"),
                             loop=self.loop,
                             aiocallback=self._msg_callback,
                         ),
@@ -178,6 +180,7 @@ class SubscriptionThread(threading.Thread):
                         )
                     )
             self.nslcm = NsLcmNotification(self.db)
+            self.vnflcm = VnfLcmNotification(self.db)
         except (DbException, MsgException) as e:
             raise SubscriptionException(str(e), http_code=e.http_code)
 
@@ -272,6 +275,33 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug(
                         "Message can not be used for notification of nslcm"
                     )
+            elif topic == "vnf":
+                if isinstance(params, dict):
+                    vnfd_id = params["vnfdId"]
+                    vnf_instance_id = params["vnfInstanceId"]
+                    if command == "create" or command == "delete":
+                        op_state = command
+                    else:
+                        op_state = params["operationState"]
+                    event_details = {
+                            "topic": topic,
+                            "command": command.upper(),
+                            "params": params,
+                            }
+                    subscribers = self.vnflcm.get_subscribers(
+                            vnfd_id,
+                            vnf_instance_id,
+                            command.upper(),
+                            op_state,
+                            event_details
+                            )
+                    if subscribers:
+                        asyncio.ensure_future(
+                                self.vnflcm.send_notifications(
+                                    subscribers, loop=self.loop
+                                ),
+                                loop=self.loop
+                            )
             elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in (
                     "COMPLETED",
index 3373b84..3fec364 100644 (file)
@@ -1309,6 +1309,63 @@ subscription = {
     "required": ["CallbackUri"],
 }
 
+vnflcmsub_schema = {
+    "title": "vnflcmsubscription input schema",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "properties": {
+        "VnfInstanceSubscriptionFilter": {
+            "type": "object",
+            "properties": {
+                "vnfdIds": {"type": "array"},
+                "vnfInstanceIds": {"type": "array"},
+            },
+        },
+        "notificationTypes": {
+            "type": "array",
+            "items": {
+                "enum": [
+                    "VnfIdentifierCreationNotification",
+                    "VnfLcmOperationOccurrenceNotification",
+                    "VnfIdentifierDeletionNotification"
+                    ]
+            }
+        },
+        "operationTypes": {
+            "type": "array",
+            "items": {
+                "enum": [
+                    "INSTANTIATE", "SCALE", "SCALE_TO_LEVEL", "CHANGE_FLAVOUR", "TERMINATE",
+                    "HEAL", "OPERATE", "CHANGE_EXT_CONN", "MODIFY_INFO", "CREATE_SNAPSHOT",
+                    "REVERT_TO_SNAPSHOT", "CHANGE_VNFPKG"
+                    ]
+            }
+        },
+        "operationStates": {
+            "type": "array",
+            "items": {
+                "enum": [
+                    "STARTING", "PROCESSING", "COMPLETED", "FAILED_TEMP", "FAILED",
+                    "ROLLING_BACK", "ROLLED_BACK"
+                    ]
+            }
+        }
+    },
+    "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"]
+ }
+
+vnf_subscription = {
+    "title": "vnf subscription input schema",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "properties": {
+        "filter": vnflcmsub_schema,
+        "CallbackUri": description_schema,
+        "authentication": authentication_schema
+    },
+    "required": ["filter", "CallbackUri"]
+}
+
 
 class ValidationError(Exception):
     def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):