Disable the check of the release notes
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index 17bfb20..e34cce4 100644 (file)
@@ -19,15 +19,15 @@ It is based on asyncio.
 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
 """
 
-import logging
-import threading
 import asyncio
 from http import HTTPStatus
+import logging
+import threading
+from time import time
 
-from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common import dbmemory, dbmongo, msgkafka, msglocal
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -39,7 +39,6 @@ class VimAdminException(Exception):
 
 
 class LockRenew:
-
     renew_list = []
     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
     # be renewed. The time order is achieved as it is appended at the end
@@ -52,15 +51,13 @@ class LockRenew:
         self.config = config
         self.logger = logger
         self.to_terminate = False
-        self.loop = None
         self.db = None
         self.task_locked_time = config["global"]["task_locked_time"]
         self.task_relock_time = config["global"]["task_relock_time"]
         self.task_max_locked_time = config["global"]["task_max_locked_time"]
 
-    def start(self, db, loop):
+    def start(self, db):
         self.db = db
-        self.loop = loop
 
     @staticmethod
     def add_lock_object(database_table, database_object, thread_object):
@@ -90,9 +87,7 @@ class LockRenew:
     async def renew_locks(self):
         while not self.to_terminate:
             if not self.renew_list:
-                await asyncio.sleep(
-                    self.task_locked_time - self.task_relock_time, loop=self.loop
-                )
+                await asyncio.sleep(self.task_locked_time - self.task_relock_time)
                 continue
 
             lock_object = self.renew_list[0]
@@ -152,7 +147,7 @@ class LockRenew:
                         )
             else:
                 # wait until it is time to re-lock it
-                await asyncio.sleep(time_to_relock, loop=self.loop)
+                await asyncio.sleep(time_to_relock)
 
     def stop(self):
         # unlock all locked items
@@ -269,37 +264,31 @@ class VimAdminThread(threading.Thread):
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
 
-            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED)
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
-                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
-                    await self.msg.aiowrite(
-                        "vim_account", "echo", "dummy message", loop=self.loop
-                    )
+                    for kafka_topic in self.kafka_topics:
+                        await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                         self.msg.aioread(
                             self.kafka_topics,
-                            loop=self.loop,
                             group_id=False,
                             aiocallback=self._msg_callback,
                         ),
-                        loop=self.loop,
                     )
 
                 if not self.aiomain_task_vim:
-                    self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(), loop=self.loop
-                    )
+                    self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
 
                 if not self.aiomain_task_renew_lock:
                     self.aiomain_task_renew_lock = asyncio.ensure_future(
-                        self.lock_renew.renew_locks(), loop=self.loop
+                        self.lock_renew.renew_locks()
                     )
 
                 done, _ = await asyncio.wait(
@@ -309,7 +298,6 @@ class VimAdminThread(threading.Thread):
                         self.aiomain_task_renew_lock,
                     ],
                     timeout=None,
-                    loop=self.loop,
                     return_when=asyncio.FIRST_COMPLETED,
                 )
 
@@ -333,7 +321,7 @@ class VimAdminThread(threading.Thread):
                         self.logger.error("renew_locks task exception: {}".format(exc))
                         self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
-                    pass
+                    self.logger.exception("asyncio.CancelledError occured.")
 
             except Exception as e:
                 if self.to_terminate:
@@ -346,7 +334,7 @@ class VimAdminThread(threading.Thread):
                     )
                     kafka_working = False
 
-            await asyncio.sleep(10, loop=self.loop)
+            await asyncio.sleep(10)
 
     def run(self):
         """
@@ -369,11 +357,10 @@ class VimAdminThread(threading.Thread):
                         )
                     )
 
-            self.lock_renew.start(self.db, self.loop)
+            self.lock_renew.start(self.db)
 
             if not self.msg:
                 config_msg = self.config["message"].copy()
-                config_msg["loop"] = self.loop
 
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
@@ -393,11 +380,7 @@ class VimAdminThread(threading.Thread):
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(
-                    asyncio.ensure_future(self.aiomain(), loop=self.loop)
-                )
-            # except asyncio.CancelledError:
-            #     break  # if cancelled it should end, breaking loop
+                asyncio.run(self.main_task())
             except Exception as e:
                 if not self.to_terminate:
                     self.logger.exception(
@@ -408,6 +391,10 @@ class VimAdminThread(threading.Thread):
         self._stop()
         self.loop.close()
 
+    async def main_task(self):
+        task = asyncio.ensure_future(self.aiomain())
+        await task
+
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
@@ -471,12 +458,12 @@ class VimAdminThread(threading.Thread):
         self.lock_renew.to_terminate = True
 
         if self.aiomain_task_kafka:
-            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel())
 
         if self.aiomain_task_vim:
-            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel())
 
         if self.aiomain_task_renew_lock:
-            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel())
 
         self.lock_renew.stop()