from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
from osm_nbi.pmjobs_topics import PmJobsTopic
from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
+from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic
from base64 import b64encode
from os import urandom # , path
from threading import Lock
"nslcm_subscriptions": NslcmSubscriptionsTopic,
"vnf_instances": VnfInstances,
"vnflcmops": VnfLcmOpTopic,
+ "vnflcm_subscriptions": VnflcmSubscriptionsTopic,
# [NEW_TOPIC]: add an entry here
# "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
}
"ROLE_PERMISSION": "vnf_instances:opps:id:"
},
},
+ "subscriptions": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:id:"
+ }
+ },
}
},
"nst": {
:param event_details: dict containing raw data of event occured.
:return: List of interested subscribers for occurred event.
"""
+ notification_type = [
+ "NsLcmOperationOccurrenceNotification",
+ "NsChangeNotification",
+ "NsIdentifierCreationNotification",
+ "NsIdentifierDeletionNotification"
+ ]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- }
+ "notificationType": notification_type
+ }
if op_state:
filter_q["operationStates"].append(op_state)
if command:
return subscribers
+class VnfLcmNotification(NotificationBase):
+ # SOL003 response model for vnflcm notifications
+ response_models = {
+ "VnfLcmOperationOccurrenceNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "notificationStatus",
+ "operationState",
+ "vnfInstanceId",
+ "operation",
+ "isAutomaticInvocation",
+ "vnfLcmOpOccId",
+ "affectedVnfcs",
+ "affectedVirtualLinks",
+ "affectedExtLinkPorts",
+ "affectedVirtualStorages",
+ "changedInfo",
+ "changedExtConnectivity",
+ "modificationsTriggeredByVnfPkgChange",
+ "error",
+ "_links"
+ },
+ "VnfIdentifierCreationNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "vnfInstanceId",
+ "_links"
+ },
+ "VnfIdentifierDeletionNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "vnfInstanceId",
+ "_links"
+ },
+ }
+
+ def __init__(self, db) -> None:
+ """
+ Constructor of VnfLcmNotification class.
+ :param db: Database handler.
+ """
+ super().__init__(db)
+ self.subscriber_collection = "mapped_subscriptions"
+
+ def get_models(self) -> dict:
+ """
+ Returns the SOL003 model of notification class
+ :param None
+ :return: dict of SOL003 data model
+ """
+ return self.response_models
+
+ def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+ """
+ Formats the raw event details from kafka message and subscriber details.
+ :param subscribers: A list of subscribers whom the event needs to be notified.
+ :param event_details: A dict containing all meta data of event.
+ :return:
+ """
+ notification_id = str(uuid4())
+ event_timestamp = time.time()
+ event_operation = event_details["command"]
+ for subscriber in subscribers:
+ subscriber["id"] = notification_id
+ subscriber["timeStamp"] = event_timestamp
+ subscriber["subscriptionId"] = subscriber["reference"]
+ subscriber["operation"] = event_operation
+ del subscriber["reference"]
+ del subscriber["_id"]
+ subscriber.update(event_details["params"])
+ return subscribers
+
+ def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
+ event_details: dict) -> list:
+ """
+ Queries database and returns list of subscribers.
+ :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
+ :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
+ :param command: the command for event.
+ :param op_state: the operation state of VNF.
+ :param event_details: dict containing raw data of event occurred.
+ :return: List of interested subscribers for occurred event.
+ """
+ notification_type = [
+ "VnfIdentifierCreationNotification",
+ "VnfLcmOperationOccurrenceNotification",
+ "VnfIdentifierDeletionNotification"
+ ]
+ filter_q = {
+ "identifier": [vnfd_id, vnf_instance_id],
+ "operationStates": ["ANY"],
+ "operationTypes": ["ANY"],
+ "notificationType": notification_type
+ }
+ if op_state:
+ filter_q["operationStates"].append(op_state)
+ if command:
+ filter_q["operationTypes"].append(command)
+ subscribers = []
+ try:
+ subscribers = self.db.get_list(self.subscriber_collection, filter_q)
+ subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
+ except Exception as e:
+ error_text = type(e).__name__ + ": " + str(e)
+ self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
+ finally:
+ return subscribers
+
+
class NsdNotification(NotificationBase):
def __init__(self, db):
"""
}
return formatted_indata
+ def notify_operation(self, session, _id, lcm_operation, op_id):
+ """
+ Formats the operation message params and sends to kafka
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: vnf instance id
+ :param lcm_operation: lcm operation type of a VNF (instantiate, scale, terminate)
+ :param op_id: lcm operation id of a VNF
+ :return: None
+ """
+ vnfInstanceId = _id
+ operation = lcm_operation
+ nslcmop_rec = self.nslcmoptopic.show(session, op_id)
+ operation_status = nslcmop_rec["operationState"]
+ vnfr = self.vnfrtopic.show(session, vnfInstanceId)
+ links = {"self": "/osm/vnflcm/v1/vnf_lcm_op_occs/" + op_id,
+ "vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfInstanceId}
+ params = {"vnfdId": vnfr["vnfd-ref"],
+ "vnfInstanceId": vnfInstanceId,
+ "operationState": operation_status,
+ "vnfLcmOpOccId": op_id,
+ "_links": links
+ }
+ self.msg.write("vnf", operation, params)
+ return None
+
def action(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
Creates an new lcm operation.
indata["vnfInstanceId"] = vnfr.get("nsr-id-ref")
indata = self.__get_formatted_indata(session, indata)
op_id, _ = self.nslcmoptopic.new(rollback, session, indata, kwargs, headers)
+ self.notify_operation(session, vnfInstanceId, lcm_operation, op_id)
return op_id, _
nsr_id, _ = self.__create_nsr(rollback, session, formatted_indata, kwargs, headers)
nsr = self.nsrtopic.show(session, nsr_id)
vnfr_id = nsr['constituent-vnfr-ref'][0]
+ if vnfr_id:
+ links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfr_id}
+ indata["vnfInstanceId"] = vnfr_id
+ indata["_links"] = links
+ self.msg.write("vnf", "create", indata)
return vnfr_id, None
def action(self, rollback, session, indata=None, kwargs=None, headers=None):
ns_id = vnfr.get("nsr-id-ref")
nsr = self.nsrtopic.show(session, ns_id)
nsd_to_del = nsr['nsd']['_id']
+ links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + _id}
+ params = {"vnfdId": vnfr["vnfd-ref"],
+ "vnfInstanceId": _id,
+ "_links": links}
+ self.msg.write("vnf", "delete", params)
self.nsrtopic.delete(session, ns_id, dry_run, not_send_msg)
return self.nsdtopic.delete(session, nsd_to_del, dry_run, not_send_msg)
--- /dev/null
+# Copyright 2021 Selvi Jayaraman (Tata Elxsi)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__author__ = "Selvi Jayaraman <selvi.j@tataelxsi.co.in>"
+
+from osm_nbi.subscription_topics import CommonSubscriptions
+from osm_nbi.validation import vnf_subscription
+
+class VnflcmSubscriptionsTopic(CommonSubscriptions):
+ schema_new = vnf_subscription
+ def _subscription_mapper(self, _id, data, table):
+ """
+ Performs data transformation on subscription request
+ :param _id: subscription reference id
+ :param data: data to be transformed
+ :param table: table in which transformed data are inserted
+ """
+ formatted_data = []
+ formed_data = {
+ "reference": data.get("_id"),
+ "CallbackUri": data.get("CallbackUri")
+ }
+ if data.get("authentication"):
+ formed_data.update({"authentication": data.get("authentication")})
+ if data.get("filter"):
+ if data["filter"].get("VnfInstanceSubscriptionFilter"):
+ key = list(data["filter"]["VnfInstanceSubscriptionFilter"].keys())[0]
+ identifier = data["filter"]["VnfInstanceSubscriptionFilter"][key]
+ formed_data.update({"identifier": identifier})
+ if data["filter"].get("notificationTypes"):
+ for elem in data["filter"].get("notificationTypes"):
+ update_dict = formed_data.copy()
+ update_dict["notificationType"] = elem
+ if elem == "VnfIdentifierCreationNotification":
+ update_dict["operationTypes"] = "CREATE"
+ update_dict["operationStates"] = "ANY"
+ formatted_data.append(update_dict)
+ elif elem == "VnfIdentifierDeletionNotification":
+ update_dict["operationTypes"] = "DELETE"
+ update_dict["operationStates"] = "ANY"
+ formatted_data.append(update_dict)
+ elif elem == "VnfLcmOperationOccurrenceNotification":
+ if "operationTypes" in data["filter"].keys():
+ update_dict["operationTypes"] = data["filter"]["operationTypes"]
+ else:
+ update_dict["operationTypes"] = "ANY"
+ if "operationStates" in data["filter"].keys():
+ update_dict["operationStates"] = data["filter"]["operationStates"]
+ else:
+ update_dict["operationStates"] = "ANY"
+ formatted_data.append(update_dict)
+ self.db.create_list(table, formatted_data)
+ return None
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
from osm_nbi.engine import EngineException
-from osm_nbi.notifications import NsLcmNotification
+from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"method": "delete",
}
self.nslcm = None
+ self.vnflcm = None
async def start_kafka(self):
# timeout_wait_for_kafka = 3*60
)
await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+ await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
if not kafka_working:
self.logger.critical("kafka is working again")
kafka_working = True
self.logger.debug("Starting non-admin subscription task")
self.aiomain_task = asyncio.ensure_future(
self.msg.aioread(
- ("ns", "nsi"),
+ ("ns", "nsi", "vnf"),
loop=self.loop,
aiocallback=self._msg_callback,
),
)
)
self.nslcm = NsLcmNotification(self.db)
+ self.vnflcm = VnfLcmNotification(self.db)
except (DbException, MsgException) as e:
raise SubscriptionException(str(e), http_code=e.http_code)
self.logger.debug(
"Message can not be used for notification of nslcm"
)
+ elif topic == "vnf":
+ if isinstance(params, dict):
+ vnfd_id = params["vnfdId"]
+ vnf_instance_id = params["vnfInstanceId"]
+ if command == "create" or command == "delete":
+ op_state = command
+ else:
+ op_state = params["operationState"]
+ event_details = {
+ "topic": topic,
+ "command": command.upper(),
+ "params": params,
+ }
+ subscribers = self.vnflcm.get_subscribers(
+ vnfd_id,
+ vnf_instance_id,
+ command.upper(),
+ op_state,
+ event_details
+ )
+ if subscribers:
+ asyncio.ensure_future(
+ self.vnflcm.send_notifications(
+ subscribers, loop=self.loop
+ ),
+ loop=self.loop
+ )
elif topic == "nsi":
if command == "terminated" and params["operationState"] in (
"COMPLETED",
"required": ["CallbackUri"],
}
+vnflcmsub_schema = {
+ "title": "vnflcmsubscription input schema",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "properties": {
+ "VnfInstanceSubscriptionFilter": {
+ "type": "object",
+ "properties": {
+ "vnfdIds": {"type": "array"},
+ "vnfInstanceIds": {"type": "array"},
+ },
+ },
+ "notificationTypes": {
+ "type": "array",
+ "items": {
+ "enum": [
+ "VnfIdentifierCreationNotification",
+ "VnfLcmOperationOccurrenceNotification",
+ "VnfIdentifierDeletionNotification"
+ ]
+ }
+ },
+ "operationTypes": {
+ "type": "array",
+ "items": {
+ "enum": [
+ "INSTANTIATE", "SCALE", "SCALE_TO_LEVEL", "CHANGE_FLAVOUR", "TERMINATE",
+ "HEAL", "OPERATE", "CHANGE_EXT_CONN", "MODIFY_INFO", "CREATE_SNAPSHOT",
+ "REVERT_TO_SNAPSHOT", "CHANGE_VNFPKG"
+ ]
+ }
+ },
+ "operationStates": {
+ "type": "array",
+ "items": {
+ "enum": [
+ "STARTING", "PROCESSING", "COMPLETED", "FAILED_TEMP", "FAILED",
+ "ROLLING_BACK", "ROLLED_BACK"
+ ]
+ }
+ }
+ },
+ "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"]
+ }
+
+vnf_subscription = {
+ "title": "vnf subscription input schema",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "properties": {
+ "filter": vnflcmsub_schema,
+ "CallbackUri": description_schema,
+ "authentication": authentication_schema
+ },
+ "required": ["filter", "CallbackUri"]
+}
+
class ValidationError(Exception):
def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):