X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fsubscriptions.py;h=846e7d3c0306bfe960899abcfbd09d3991915477;hp=6810ccd6a0a37a159e37cbda9d5618f0c8480643;hb=HEAD;hpb=4568a372eb5a204e04d917213de03ec51f9110c1 diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 6810ccd..846e7d3 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka from osm_common.dbbase import DbException from osm_common.msgbase import MsgException from osm_nbi.engine import EngineException -from osm_nbi.notifications import NsLcmNotification +from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification __author__ = "Alfonso Tierno " @@ -53,7 +53,6 @@ class SubscriptionThread(threading.Thread): self.db = None self.msg = None self.engine = engine - self.loop = None self.logger = logging.getLogger("nbi.subscriptions") self.aiomain_task_admin = ( None # asyncio task for receiving admin actions from kafka bus @@ -70,6 +69,7 @@ class SubscriptionThread(threading.Thread): "method": "delete", } self.nslcm = None + self.vnflcm = None async def start_kafka(self): # timeout_wait_for_kafka = 3*60 @@ -80,40 +80,38 @@ class SubscriptionThread(threading.Thread): # created. # Before subscribe, send dummy messages await self.msg.aiowrite( - "admin", "echo", "dummy message", loop=self.loop + "admin", + "echo", + "dummy message", ) - await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) - await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) + await self.msg.aiowrite("ns", "echo", "dummy message") + await self.msg.aiowrite("nsi", "echo", "dummy message") + await self.msg.aiowrite("vnf", "echo", "dummy message") if not kafka_working: self.logger.critical("kafka is working again") kafka_working = True if not self.aiomain_task_admin: - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) self.logger.debug("Starting admin subscription task") self.aiomain_task_admin = asyncio.ensure_future( self.msg.aioread( ("admin",), - loop=self.loop, group_id=False, aiocallback=self._msg_callback, ), - loop=self.loop, ) if not self.aiomain_task: - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) self.logger.debug("Starting non-admin subscription task") self.aiomain_task = asyncio.ensure_future( self.msg.aioread( - ("ns", "nsi"), - loop=self.loop, + ("ns", "nsi", "vnf"), aiocallback=self._msg_callback, ), - loop=self.loop, ) done, _ = await asyncio.wait( [self.aiomain_task, self.aiomain_task_admin], timeout=None, - loop=self.loop, return_when=asyncio.FIRST_COMPLETED, ) try: @@ -140,14 +138,13 @@ class SubscriptionThread(threading.Thread): "Error accessing kafka '{}'. Retrying ...".format(e) ) kafka_working = False - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) def run(self): """ Start of the thread :return: None """ - self.loop = asyncio.new_event_loop() try: if not self.db: if self.config["database"]["driver"] == "mongo": @@ -164,7 +161,6 @@ class SubscriptionThread(threading.Thread): ) if not self.msg: config_msg = self.config["message"].copy() - config_msg["loop"] = self.loop if config_msg["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_msg) @@ -178,18 +174,14 @@ class SubscriptionThread(threading.Thread): ) ) self.nslcm = NsLcmNotification(self.db) + self.vnflcm = VnfLcmNotification(self.db) except (DbException, MsgException) as e: raise SubscriptionException(str(e), http_code=e.http_code) self.logger.debug("Starting") while not self.to_terminate: try: - - self.loop.run_until_complete( - asyncio.ensure_future(self.start_kafka(), loop=self.loop) - ) - # except asyncio.CancelledError: - # break # if cancelled it should end, breaking loop + asyncio.run(self.start_kafka()) except Exception as e: if not self.to_terminate: self.logger.exception( @@ -198,7 +190,6 @@ class SubscriptionThread(threading.Thread): self.logger.debug("Finishing") self._stop() - self.loop.close() async def _msg_callback(self, topic, command, params): """ @@ -263,15 +254,36 @@ class SubscriptionThread(threading.Thread): # self.logger.debug(subscribers) if subscribers: asyncio.ensure_future( - self.nslcm.send_notifications( - subscribers, loop=self.loop - ), - loop=self.loop, + self.nslcm.send_notifications(subscribers), ) else: self.logger.debug( "Message can not be used for notification of nslcm" ) + elif topic == "vnf": + if isinstance(params, dict): + vnfd_id = params["vnfdId"] + vnf_instance_id = params["vnfInstanceId"] + if command == "create" or command == "delete": + op_state = command + else: + op_state = params["operationState"] + event_details = { + "topic": topic, + "command": command.upper(), + "params": params, + } + subscribers = self.vnflcm.get_subscribers( + vnfd_id, + vnf_instance_id, + command.upper(), + op_state, + event_details, + ) + if subscribers: + asyncio.ensure_future( + self.vnflcm.send_notifications(subscribers), + ) elif topic == "nsi": if command == "terminated" and params["operationState"] in ( "COMPLETED", @@ -316,7 +328,7 @@ class SubscriptionThread(threading.Thread): # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, # but content to be written is stored at msg_to_send for msg in msg_to_send: - await self.msg.aiowrite(*msg, loop=self.loop) + await self.msg.aiowrite(*msg) except (EngineException, DbException, MsgException) as e: self.logger.error( "Error while processing topic={} command={}: {}".format( @@ -352,6 +364,8 @@ class SubscriptionThread(threading.Thread): """ self.to_terminate = True if self.aiomain_task: - self.loop.call_soon_threadsafe(self.aiomain_task.cancel) + asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel) if self.aiomain_task_admin: - self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel) + asyncio.get_event_loop().call_soon_threadsafe( + self.aiomain_task_admin.cancel + )