X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fvim_admin.py;h=e34cce4a5216e18a72b11b3faabef5977b1095cd;hb=HEAD;hp=2582ee2a480e38bcb195b69edb7942b3b7c96fd0;hpb=049cbb1b256805f589c24776dcf092c77fefec6a;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py index 2582ee2a..e34cce4a 100644 --- a/NG-RO/osm_ng_ro/vim_admin.py +++ b/NG-RO/osm_ng_ro/vim_admin.py @@ -39,7 +39,6 @@ class VimAdminException(Exception): class LockRenew: - renew_list = [] # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to # be renewed. The time order is achieved as it is appended at the end @@ -52,15 +51,13 @@ class LockRenew: self.config = config self.logger = logger self.to_terminate = False - self.loop = None self.db = None self.task_locked_time = config["global"]["task_locked_time"] self.task_relock_time = config["global"]["task_relock_time"] self.task_max_locked_time = config["global"]["task_max_locked_time"] - def start(self, db, loop): + def start(self, db): self.db = db - self.loop = loop @staticmethod def add_lock_object(database_table, database_object, thread_object): @@ -90,9 +87,7 @@ class LockRenew: async def renew_locks(self): while not self.to_terminate: if not self.renew_list: - await asyncio.sleep( - self.task_locked_time - self.task_relock_time, loop=self.loop - ) + await asyncio.sleep(self.task_locked_time - self.task_relock_time) continue lock_object = self.renew_list[0] @@ -152,7 +147,7 @@ class LockRenew: ) else: # wait until it is time to re-lock it - await asyncio.sleep(time_to_relock, loop=self.loop) + await asyncio.sleep(time_to_relock) def stop(self): # unlock all locked items @@ -269,38 +264,31 @@ class VimAdminThread(threading.Thread): self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM self.engine.unload_unused_vims() - await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop) + await asyncio.sleep(self.MAX_TIME_UNATTENDED) async def aiomain(self): kafka_working = True while not self.to_terminate: try: if not self.aiomain_task_kafka: - # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop) for kafka_topic in self.kafka_topics: - await self.msg.aiowrite( - kafka_topic, "echo", "dummy message", loop=self.loop - ) + await self.msg.aiowrite(kafka_topic, "echo", "dummy message") kafka_working = True self.logger.debug("Starting vim_account subscription task") self.aiomain_task_kafka = asyncio.ensure_future( self.msg.aioread( self.kafka_topics, - loop=self.loop, group_id=False, aiocallback=self._msg_callback, ), - loop=self.loop, ) if not self.aiomain_task_vim: - self.aiomain_task_vim = asyncio.ensure_future( - self.vim_watcher(), loop=self.loop - ) + self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher()) if not self.aiomain_task_renew_lock: self.aiomain_task_renew_lock = asyncio.ensure_future( - self.lock_renew.renew_locks(), loop=self.loop + self.lock_renew.renew_locks() ) done, _ = await asyncio.wait( @@ -310,7 +298,6 @@ class VimAdminThread(threading.Thread): self.aiomain_task_renew_lock, ], timeout=None, - loop=self.loop, return_when=asyncio.FIRST_COMPLETED, ) @@ -334,7 +321,7 @@ class VimAdminThread(threading.Thread): self.logger.error("renew_locks task exception: {}".format(exc)) self.aiomain_task_renew_lock = None except asyncio.CancelledError: - pass + self.logger.exception("asyncio.CancelledError occured.") except Exception as e: if self.to_terminate: @@ -347,7 +334,7 @@ class VimAdminThread(threading.Thread): ) kafka_working = False - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) def run(self): """ @@ -370,11 +357,10 @@ class VimAdminThread(threading.Thread): ) ) - self.lock_renew.start(self.db, self.loop) + self.lock_renew.start(self.db) if not self.msg: config_msg = self.config["message"].copy() - config_msg["loop"] = self.loop if config_msg["driver"] == "local": self.msg = msglocal.MsgLocal() @@ -394,11 +380,7 @@ class VimAdminThread(threading.Thread): self.logger.info("Starting") while not self.to_terminate: try: - self.loop.run_until_complete( - asyncio.ensure_future(self.aiomain(), loop=self.loop) - ) - # except asyncio.CancelledError: - # break # if cancelled it should end, breaking loop + asyncio.run(self.main_task()) except Exception as e: if not self.to_terminate: self.logger.exception( @@ -409,6 +391,10 @@ class VimAdminThread(threading.Thread): self._stop() self.loop.close() + async def main_task(self): + task = asyncio.ensure_future(self.aiomain()) + await task + async def _msg_callback(self, topic, command, params): """ Callback to process a received message from kafka @@ -472,12 +458,12 @@ class VimAdminThread(threading.Thread): self.lock_renew.to_terminate = True if self.aiomain_task_kafka: - self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel) + self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel()) if self.aiomain_task_vim: - self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel) + self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel()) if self.aiomain_task_renew_lock: - self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel) + self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel()) self.lock_renew.stop()