X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_nbi%2Fsubscriptions.py;h=b178e5bae8dc8b5c89a1bf639e8f84c5c317eaa9;hb=9af2a4785d3a77772fd205aa572cc6a64d4d1003;hp=6810ccd6a0a37a159e37cbda9d5618f0c8480643;hpb=4568a372eb5a204e04d917213de03ec51f9110c1;p=osm%2FNBI.git diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 6810ccd..b178e5b 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka from osm_common.dbbase import DbException from osm_common.msgbase import MsgException from osm_nbi.engine import EngineException -from osm_nbi.notifications import NsLcmNotification +from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification __author__ = "Alfonso Tierno " @@ -70,6 +70,7 @@ class SubscriptionThread(threading.Thread): "method": "delete", } self.nslcm = None + self.vnflcm = None async def start_kafka(self): # timeout_wait_for_kafka = 3*60 @@ -84,6 +85,7 @@ class SubscriptionThread(threading.Thread): ) await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) + await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop) if not kafka_working: self.logger.critical("kafka is working again") kafka_working = True @@ -104,7 +106,7 @@ class SubscriptionThread(threading.Thread): self.logger.debug("Starting non-admin subscription task") self.aiomain_task = asyncio.ensure_future( self.msg.aioread( - ("ns", "nsi"), + ("ns", "nsi", "vnf"), loop=self.loop, aiocallback=self._msg_callback, ), @@ -178,13 +180,13 @@ class SubscriptionThread(threading.Thread): ) ) self.nslcm = NsLcmNotification(self.db) + self.vnflcm = VnfLcmNotification(self.db) except (DbException, MsgException) as e: raise SubscriptionException(str(e), http_code=e.http_code) self.logger.debug("Starting") while not self.to_terminate: try: - self.loop.run_until_complete( asyncio.ensure_future(self.start_kafka(), loop=self.loop) ) @@ -272,6 +274,31 @@ class SubscriptionThread(threading.Thread): self.logger.debug( "Message can not be used for notification of nslcm" ) + elif topic == "vnf": + if isinstance(params, dict): + vnfd_id = params["vnfdId"] + vnf_instance_id = params["vnfInstanceId"] + if command == "create" or command == "delete": + op_state = command + else: + op_state = params["operationState"] + event_details = { + "topic": topic, + "command": command.upper(), + "params": params, + } + subscribers = self.vnflcm.get_subscribers( + vnfd_id, + vnf_instance_id, + command.upper(), + op_state, + event_details, + ) + if subscribers: + asyncio.ensure_future( + self.vnflcm.send_notifications(subscribers, loop=self.loop), + loop=self.loop, + ) elif topic == "nsi": if command == "terminated" and params["operationState"] in ( "COMPLETED",