Ubuntu 22.04 and Python 3.10 preparation
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index b6e34e1..21e6281 100644 (file)
@@ -51,15 +51,13 @@ class LockRenew:
         self.config = config
         self.logger = logger
         self.to_terminate = False
-        self.loop = None
         self.db = None
         self.task_locked_time = config["global"]["task_locked_time"]
         self.task_relock_time = config["global"]["task_relock_time"]
         self.task_max_locked_time = config["global"]["task_max_locked_time"]
 
-    def start(self, db, loop):
+    def start(self, db):
         self.db = db
-        self.loop = loop
 
     @staticmethod
     def add_lock_object(database_table, database_object, thread_object):
@@ -89,9 +87,7 @@ class LockRenew:
     async def renew_locks(self):
         while not self.to_terminate:
             if not self.renew_list:
-                await asyncio.sleep(
-                    self.task_locked_time - self.task_relock_time, loop=self.loop
-                )
+                await asyncio.sleep(self.task_locked_time - self.task_relock_time)
                 continue
 
             lock_object = self.renew_list[0]
@@ -151,7 +147,7 @@ class LockRenew:
                         )
             else:
                 # wait until it is time to re-lock it
-                await asyncio.sleep(time_to_relock, loop=self.loop)
+                await asyncio.sleep(time_to_relock)
 
     def stop(self):
         # unlock all locked items
@@ -268,38 +264,31 @@ class VimAdminThread(threading.Thread):
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
 
-            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED)
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
-                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
                     for kafka_topic in self.kafka_topics:
-                        await self.msg.aiowrite(
-                            kafka_topic, "echo", "dummy message", loop=self.loop
-                        )
+                        await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                         self.msg.aioread(
                             self.kafka_topics,
-                            loop=self.loop,
                             group_id=False,
                             aiocallback=self._msg_callback,
                         ),
-                        loop=self.loop,
                     )
 
                 if not self.aiomain_task_vim:
-                    self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(), loop=self.loop
-                    )
+                    self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
 
                 if not self.aiomain_task_renew_lock:
                     self.aiomain_task_renew_lock = asyncio.ensure_future(
-                        self.lock_renew.renew_locks(), loop=self.loop
+                        self.lock_renew.renew_locks()
                     )
 
                 done, _ = await asyncio.wait(
@@ -309,7 +298,6 @@ class VimAdminThread(threading.Thread):
                         self.aiomain_task_renew_lock,
                     ],
                     timeout=None,
-                    loop=self.loop,
                     return_when=asyncio.FIRST_COMPLETED,
                 )
 
@@ -346,7 +334,7 @@ class VimAdminThread(threading.Thread):
                     )
                     kafka_working = False
 
-            await asyncio.sleep(10, loop=self.loop)
+            await asyncio.sleep(10)
 
     def run(self):
         """
@@ -369,11 +357,10 @@ class VimAdminThread(threading.Thread):
                         )
                     )
 
-            self.lock_renew.start(self.db, self.loop)
+            self.lock_renew.start(self.db)
 
             if not self.msg:
                 config_msg = self.config["message"].copy()
-                config_msg["loop"] = self.loop
 
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
@@ -393,11 +380,7 @@ class VimAdminThread(threading.Thread):
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(
-                    asyncio.ensure_future(self.aiomain(), loop=self.loop)
-                )
-            # except asyncio.CancelledError:
-            #     break  # if cancelled it should end, breaking loop
+                asyncio.run_coroutine_threadsafe(self.main_task(), self.loop)
             except Exception as e:
                 if not self.to_terminate:
                     self.logger.exception(
@@ -408,6 +391,10 @@ class VimAdminThread(threading.Thread):
         self._stop()
         self.loop.close()
 
+    async def main_task(self):
+        task = asyncio.ensure_future(self.aiomain())
+        await task
+
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
@@ -471,12 +458,12 @@ class VimAdminThread(threading.Thread):
         self.lock_renew.to_terminate = True
 
         if self.aiomain_task_kafka:
-            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+            self.aiomain_task_kafka.cancel()
 
         if self.aiomain_task_vim:
-            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+            self.aiomain_task_vim.cancel()
 
         if self.aiomain_task_renew_lock:
-            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+            self.aiomain_task_renew_lock.cancel()
 
         self.lock_renew.stop()