Bug 2217 fixed: modified the cloud-init merge configs
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index 17bfb20..e34cce4 100644 (file)
@@ -19,15 +19,15 @@ It is based on asyncio.
 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
 """
 
 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
 """
 
-import logging
-import threading
 import asyncio
 from http import HTTPStatus
 import asyncio
 from http import HTTPStatus
+import logging
+import threading
+from time import time
 
 
-from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common import dbmemory, dbmongo, msgkafka, msglocal
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -39,7 +39,6 @@ class VimAdminException(Exception):
 
 
 class LockRenew:
 
 
 class LockRenew:
-
     renew_list = []
     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
     # be renewed. The time order is achieved as it is appended at the end
     renew_list = []
     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
     # be renewed. The time order is achieved as it is appended at the end
@@ -52,15 +51,13 @@ class LockRenew:
         self.config = config
         self.logger = logger
         self.to_terminate = False
         self.config = config
         self.logger = logger
         self.to_terminate = False
-        self.loop = None
         self.db = None
         self.task_locked_time = config["global"]["task_locked_time"]
         self.task_relock_time = config["global"]["task_relock_time"]
         self.task_max_locked_time = config["global"]["task_max_locked_time"]
 
         self.db = None
         self.task_locked_time = config["global"]["task_locked_time"]
         self.task_relock_time = config["global"]["task_relock_time"]
         self.task_max_locked_time = config["global"]["task_max_locked_time"]
 
-    def start(self, db, loop):
+    def start(self, db):
         self.db = db
         self.db = db
-        self.loop = loop
 
     @staticmethod
     def add_lock_object(database_table, database_object, thread_object):
 
     @staticmethod
     def add_lock_object(database_table, database_object, thread_object):
@@ -90,9 +87,7 @@ class LockRenew:
     async def renew_locks(self):
         while not self.to_terminate:
             if not self.renew_list:
     async def renew_locks(self):
         while not self.to_terminate:
             if not self.renew_list:
-                await asyncio.sleep(
-                    self.task_locked_time - self.task_relock_time, loop=self.loop
-                )
+                await asyncio.sleep(self.task_locked_time - self.task_relock_time)
                 continue
 
             lock_object = self.renew_list[0]
                 continue
 
             lock_object = self.renew_list[0]
@@ -152,7 +147,7 @@ class LockRenew:
                         )
             else:
                 # wait until it is time to re-lock it
                         )
             else:
                 # wait until it is time to re-lock it
-                await asyncio.sleep(time_to_relock, loop=self.loop)
+                await asyncio.sleep(time_to_relock)
 
     def stop(self):
         # unlock all locked items
 
     def stop(self):
         # unlock all locked items
@@ -269,37 +264,31 @@ class VimAdminThread(threading.Thread):
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
 
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
 
-            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED)
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
-                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
-                    await self.msg.aiowrite(
-                        "vim_account", "echo", "dummy message", loop=self.loop
-                    )
+                    for kafka_topic in self.kafka_topics:
+                        await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                         self.msg.aioread(
                             self.kafka_topics,
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                         self.msg.aioread(
                             self.kafka_topics,
-                            loop=self.loop,
                             group_id=False,
                             aiocallback=self._msg_callback,
                         ),
                             group_id=False,
                             aiocallback=self._msg_callback,
                         ),
-                        loop=self.loop,
                     )
 
                 if not self.aiomain_task_vim:
                     )
 
                 if not self.aiomain_task_vim:
-                    self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(), loop=self.loop
-                    )
+                    self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
 
                 if not self.aiomain_task_renew_lock:
                     self.aiomain_task_renew_lock = asyncio.ensure_future(
 
                 if not self.aiomain_task_renew_lock:
                     self.aiomain_task_renew_lock = asyncio.ensure_future(
-                        self.lock_renew.renew_locks(), loop=self.loop
+                        self.lock_renew.renew_locks()
                     )
 
                 done, _ = await asyncio.wait(
                     )
 
                 done, _ = await asyncio.wait(
@@ -309,7 +298,6 @@ class VimAdminThread(threading.Thread):
                         self.aiomain_task_renew_lock,
                     ],
                     timeout=None,
                         self.aiomain_task_renew_lock,
                     ],
                     timeout=None,
-                    loop=self.loop,
                     return_when=asyncio.FIRST_COMPLETED,
                 )
 
                     return_when=asyncio.FIRST_COMPLETED,
                 )
 
@@ -333,7 +321,7 @@ class VimAdminThread(threading.Thread):
                         self.logger.error("renew_locks task exception: {}".format(exc))
                         self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
                         self.logger.error("renew_locks task exception: {}".format(exc))
                         self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
-                    pass
+                    self.logger.exception("asyncio.CancelledError occured.")
 
             except Exception as e:
                 if self.to_terminate:
 
             except Exception as e:
                 if self.to_terminate:
@@ -346,7 +334,7 @@ class VimAdminThread(threading.Thread):
                     )
                     kafka_working = False
 
                     )
                     kafka_working = False
 
-            await asyncio.sleep(10, loop=self.loop)
+            await asyncio.sleep(10)
 
     def run(self):
         """
 
     def run(self):
         """
@@ -369,11 +357,10 @@ class VimAdminThread(threading.Thread):
                         )
                     )
 
                         )
                     )
 
-            self.lock_renew.start(self.db, self.loop)
+            self.lock_renew.start(self.db)
 
             if not self.msg:
                 config_msg = self.config["message"].copy()
 
             if not self.msg:
                 config_msg = self.config["message"].copy()
-                config_msg["loop"] = self.loop
 
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
 
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
@@ -393,11 +380,7 @@ class VimAdminThread(threading.Thread):
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(
-                    asyncio.ensure_future(self.aiomain(), loop=self.loop)
-                )
-            # except asyncio.CancelledError:
-            #     break  # if cancelled it should end, breaking loop
+                asyncio.run(self.main_task())
             except Exception as e:
                 if not self.to_terminate:
                     self.logger.exception(
             except Exception as e:
                 if not self.to_terminate:
                     self.logger.exception(
@@ -408,6 +391,10 @@ class VimAdminThread(threading.Thread):
         self._stop()
         self.loop.close()
 
         self._stop()
         self.loop.close()
 
+    async def main_task(self):
+        task = asyncio.ensure_future(self.aiomain())
+        await task
+
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
@@ -471,12 +458,12 @@ class VimAdminThread(threading.Thread):
         self.lock_renew.to_terminate = True
 
         if self.aiomain_task_kafka:
         self.lock_renew.to_terminate = True
 
         if self.aiomain_task_kafka:
-            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel())
 
         if self.aiomain_task_vim:
 
         if self.aiomain_task_vim:
-            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel())
 
         if self.aiomain_task_renew_lock:
 
         if self.aiomain_task_renew_lock:
-            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel())
 
         self.lock_renew.stop()
 
         self.lock_renew.stop()