X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fsubscriptions.py;h=846e7d3c0306bfe960899abcfbd09d3991915477;hp=b178e5bae8dc8b5c89a1bf639e8f84c5c317eaa9;hb=HEAD;hpb=4cd875d2a38488b5e717258d548eeb8e557ec9a8 diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index b178e5b..846e7d3 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -53,7 +53,6 @@ class SubscriptionThread(threading.Thread): self.db = None self.msg = None self.engine = engine - self.loop = None self.logger = logging.getLogger("nbi.subscriptions") self.aiomain_task_admin = ( None # asyncio task for receiving admin actions from kafka bus @@ -81,41 +80,38 @@ class SubscriptionThread(threading.Thread): # created. # Before subscribe, send dummy messages await self.msg.aiowrite( - "admin", "echo", "dummy message", loop=self.loop + "admin", + "echo", + "dummy message", ) - await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) - await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) - await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop) + await self.msg.aiowrite("ns", "echo", "dummy message") + await self.msg.aiowrite("nsi", "echo", "dummy message") + await self.msg.aiowrite("vnf", "echo", "dummy message") if not kafka_working: self.logger.critical("kafka is working again") kafka_working = True if not self.aiomain_task_admin: - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) self.logger.debug("Starting admin subscription task") self.aiomain_task_admin = asyncio.ensure_future( self.msg.aioread( ("admin",), - loop=self.loop, group_id=False, aiocallback=self._msg_callback, ), - loop=self.loop, ) if not self.aiomain_task: - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) self.logger.debug("Starting non-admin subscription task") self.aiomain_task = asyncio.ensure_future( self.msg.aioread( ("ns", "nsi", "vnf"), - loop=self.loop, aiocallback=self._msg_callback, ), - loop=self.loop, ) done, _ = await asyncio.wait( [self.aiomain_task, self.aiomain_task_admin], timeout=None, - loop=self.loop, return_when=asyncio.FIRST_COMPLETED, ) try: @@ -142,14 +138,13 @@ class SubscriptionThread(threading.Thread): "Error accessing kafka '{}'. Retrying ...".format(e) ) kafka_working = False - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) def run(self): """ Start of the thread :return: None """ - self.loop = asyncio.new_event_loop() try: if not self.db: if self.config["database"]["driver"] == "mongo": @@ -166,7 +161,6 @@ class SubscriptionThread(threading.Thread): ) if not self.msg: config_msg = self.config["message"].copy() - config_msg["loop"] = self.loop if config_msg["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_msg) @@ -187,11 +181,7 @@ class SubscriptionThread(threading.Thread): self.logger.debug("Starting") while not self.to_terminate: try: - self.loop.run_until_complete( - asyncio.ensure_future(self.start_kafka(), loop=self.loop) - ) - # except asyncio.CancelledError: - # break # if cancelled it should end, breaking loop + asyncio.run(self.start_kafka()) except Exception as e: if not self.to_terminate: self.logger.exception( @@ -200,7 +190,6 @@ class SubscriptionThread(threading.Thread): self.logger.debug("Finishing") self._stop() - self.loop.close() async def _msg_callback(self, topic, command, params): """ @@ -265,10 +254,7 @@ class SubscriptionThread(threading.Thread): # self.logger.debug(subscribers) if subscribers: asyncio.ensure_future( - self.nslcm.send_notifications( - subscribers, loop=self.loop - ), - loop=self.loop, + self.nslcm.send_notifications(subscribers), ) else: self.logger.debug( @@ -296,8 +282,7 @@ class SubscriptionThread(threading.Thread): ) if subscribers: asyncio.ensure_future( - self.vnflcm.send_notifications(subscribers, loop=self.loop), - loop=self.loop, + self.vnflcm.send_notifications(subscribers), ) elif topic == "nsi": if command == "terminated" and params["operationState"] in ( @@ -343,7 +328,7 @@ class SubscriptionThread(threading.Thread): # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, # but content to be written is stored at msg_to_send for msg in msg_to_send: - await self.msg.aiowrite(*msg, loop=self.loop) + await self.msg.aiowrite(*msg) except (EngineException, DbException, MsgException) as e: self.logger.error( "Error while processing topic={} command={}: {}".format( @@ -379,6 +364,8 @@ class SubscriptionThread(threading.Thread): """ self.to_terminate = True if self.aiomain_task: - self.loop.call_soon_threadsafe(self.aiomain_task.cancel) + asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel) if self.aiomain_task_admin: - self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel) + asyncio.get_event_loop().call_soon_threadsafe( + self.aiomain_task_admin.cancel + )