Fix bug 1537 to send dummy message to all kafka topics during startup
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index e843c80..d875272 100644 (file)
@@ -33,7 +33,6 @@ __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 class VimAdminException(Exception):
-
     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
         self.http_code = http_code
         Exception.__init__(self, message)
@@ -78,9 +77,10 @@ class LockRenew:
             "initial_lock_time": database_object["locked_at"],
             "locked_at": database_object["locked_at"],
             "thread": thread_object,
-            "unlocked": False  # True when it is not needed any more
+            "unlocked": False,  # True when it is not needed any more
         }
         LockRenew.renew_list.append(lock_object)
+
         return lock_object
 
     @staticmethod
@@ -90,36 +90,66 @@ class LockRenew:
     async def renew_locks(self):
         while not self.to_terminate:
             if not self.renew_list:
-                await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
+                await asyncio.sleep(
+                    self.task_locked_time - self.task_relock_time, loop=self.loop
+                )
                 continue
+
             lock_object = self.renew_list[0]
-            if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
+
+            if (
+                lock_object["unlocked"]
+                or not lock_object["thread"]
+                or not lock_object["thread"].is_alive()
+            ):
                 # task has been finished or locker thread is dead, not needed to re-locked.
                 self.renew_list.pop(0)
                 continue
 
             locked_at = lock_object["locked_at"]
             now = time()
-            time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
+            time_to_relock = (
+                locked_at + self.task_locked_time - self.task_relock_time - now
+            )
+
             if time_to_relock < 1:
                 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
                     self.renew_list.pop(0)
                     # re-lock
                     new_locked_at = locked_at + self.task_locked_time
+
                     try:
-                        if self.db.set_one(lock_object["table"],
-                                           update_dict={"locked_at": new_locked_at, "modified_at": now},
-                                           q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
-                                           fail_on_empty=False):
-                            self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
+                        if self.db.set_one(
+                            lock_object["table"],
+                            update_dict={
+                                "locked_at": new_locked_at,
+                                "modified_at": now,
+                            },
+                            q_filter={
+                                "_id": lock_object["_id"],
+                                "locked_at": locked_at,
+                            },
+                            fail_on_empty=False,
+                        ):
+                            self.logger.debug(
+                                "Renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
                             lock_object["locked_at"] = new_locked_at
                             self.renew_list.append(lock_object)
                         else:
-                            self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
-                                                                                  lock_object["_id"]))
+                            self.logger.info(
+                                "Cannot renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
                     except Exception as e:
-                        self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
-                            lock_object["table"], lock_object["_id"], e))
+                        self.logger.error(
+                            "Exception when trying to renew lock for {}.{}: {}".format(
+                                lock_object["table"], lock_object["_id"], e
+                            )
+                        )
             else:
                 # wait until it is time to re-lock it
                 await asyncio.sleep(time_to_relock, loop=self.loop)
@@ -127,12 +157,17 @@ class LockRenew:
     def stop(self):
         # unlock all locked items
         now = time()
+
         for lock_object in self.renew_list:
             locked_at = lock_object["locked_at"]
+
             if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
-                self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
-                                q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
-                                fail_on_empty=False)
+                self.db.set_one(
+                    lock_object["table"],
+                    update_dict={"locked_at": 0},
+                    q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+                    fail_on_empty=False,
+                )
 
 
 class VimAdminThread(threading.Thread):
@@ -156,20 +191,25 @@ class VimAdminThread(threading.Thread):
         self.last_rotask_time = 0
         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
-        self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
-        self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+        # asyncio task for receiving vim actions from kafka bus
+        self.aiomain_task_kafka = None
+        # asyncio task for watching ro_tasks not processed by nobody
+        self.aiomain_task_vim = None
         self.aiomain_task_renew_lock = None
         # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
         self.lock_renew = LockRenew(config, self.logger)
         self.task_locked_time = config["global"]["task_locked_time"]
 
     async def vim_watcher(self):
-        """ Reads database periodically looking for tasks not processed by nobody because of a reboot
+        """Reads database periodically looking for tasks not processed by nobody because of a reboot
         in order to load this vim"""
         # firstly read VIMS not processed
         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
-            unattended_targets = self.db.get_list(target_database,
-                                                  q_filter={"_admin.operations.operationState": "PROCESSING"})
+            unattended_targets = self.db.get_list(
+                target_database,
+                q_filter={"_admin.operations.operationState": "PROCESSING"},
+            )
+
             for target in unattended_targets:
                 target_id = "{}:{}".format(target_database[:3], target["_id"])
                 self.logger.info("ordered to check {}".format(target_id))
@@ -178,37 +218,57 @@ class VimAdminThread(threading.Thread):
         while not self.to_terminate:
             now = time()
             processed_vims = []
+
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
-            ro_tasks = self.db.get_list("ro_tasks",
-                                        q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
-                                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                                  "locked_at.lt": now - self.task_locked_time,
-                                                  "to_check_at.gt": self.last_rotask_time,
-                                                  "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+
+            ro_tasks = self.db.get_list(
+                "ro_tasks",
+                q_filter={
+                    "target_id.ncont": self.engine.get_assigned_vims(),
+                    "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                    "locked_at.lt": now - self.task_locked_time,
+                    "to_check_at.gt": self.last_rotask_time,
+                    "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
+                },
+            )
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+
             for ro_task in ro_tasks:
                 # if already checked ignore
                 if ro_task["target_id"] in processed_vims:
                     continue
+
                 processed_vims.append(ro_task["target_id"])
+
                 # if already assigned ignore
                 if ro_task["target_id"] in self.engine.get_assigned_vims():
                     continue
+
                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
-                if self.db.get_list("ro_tasks",
-                                    q_filter={"target_id": ro_task["target_id"],
-                                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                              "locked_at.gt": now - self.task_locked_time}):
+                if self.db.get_list(
+                    "ro_tasks",
+                    q_filter={
+                        "target_id": ro_task["target_id"],
+                        "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                        "locked_at.gt": now - self.task_locked_time,
+                    },
+                ):
                     continue
+
                 # unattended, assign vim
                 self.engine.assign_vim(ro_task["target_id"])
-                self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+                self.logger.debug(
+                    "ordered to load {}. Inactivity detected".format(
+                        ro_task["target_id"]
+                    )
+                )
 
             # every 2 hours check if there are vims without any ro_task and unload it
             if now > self.next_check_unused_vim:
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
+
             await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
 
     async def aiomain(self):
@@ -217,32 +277,58 @@ class VimAdminThread(threading.Thread):
             try:
                 if not self.aiomain_task_kafka:
                     # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
-                    await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+                    for kafka_topic in self.kafka_topics:
+                        await self.msg.aiowrite(
+                            kafka_topic, "echo", "dummy message", loop=self.loop
+                        )
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
-                        self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
-                                         aiocallback=self._msg_callback),
-                        loop=self.loop)
+                        self.msg.aioread(
+                            self.kafka_topics,
+                            loop=self.loop,
+                            group_id=False,
+                            aiocallback=self._msg_callback,
+                        ),
+                        loop=self.loop,
+                    )
+
                 if not self.aiomain_task_vim:
                     self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(),
-                        loop=self.loop)
+                        self.vim_watcher(), loop=self.loop
+                    )
+
                 if not self.aiomain_task_renew_lock:
-                    self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
+                    self.aiomain_task_renew_lock = asyncio.ensure_future(
+                        self.lock_renew.renew_locks(), loop=self.loop
+                    )
 
                 done, _ = await asyncio.wait(
-                    [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
-                    timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                    [
+                        self.aiomain_task_kafka,
+                        self.aiomain_task_vim,
+                        self.aiomain_task_renew_lock,
+                    ],
+                    timeout=None,
+                    loop=self.loop,
+                    return_when=asyncio.FIRST_COMPLETED,
+                )
+
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
-                        self.logger.error("kafka subscription task exception: {}".format(exc))
+                        self.logger.error(
+                            "kafka subscription task exception: {}".format(exc)
+                        )
                         self.aiomain_task_kafka = None
+
                     if self.aiomain_task_vim in done:
                         exc = self.aiomain_task_vim.exception()
-                        self.logger.error("vim_account watcher task exception: {}".format(exc))
+                        self.logger.error(
+                            "vim_account watcher task exception: {}".format(exc)
+                        )
                         self.aiomain_task_vim = None
+
                     if self.aiomain_task_renew_lock in done:
                         exc = self.aiomain_task_renew_lock.exception()
                         self.logger.error("renew_locks task exception: {}".format(exc))
@@ -253,10 +339,14 @@ class VimAdminThread(threading.Thread):
             except Exception as e:
                 if self.to_terminate:
                     return
+
                 if kafka_working:
                     # logging only first time
-                    self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+                    self.logger.critical(
+                        "Error accessing kafka '{}'. Retrying ...".format(e)
+                    )
                     kafka_working = False
+
             await asyncio.sleep(10, loop=self.loop)
 
     def run(self):
@@ -274,13 +364,18 @@ class VimAdminThread(threading.Thread):
                     self.db = dbmemory.DbMemory()
                     self.db.db_connect(self.config["database"])
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
-                        self.config["database"]["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[database]':'driver'".format(
+                            self.config["database"]["driver"]
+                        )
+                    )
+
             self.lock_renew.start(self.db, self.loop)
 
             if not self.msg:
                 config_msg = self.config["message"].copy()
                 config_msg["loop"] = self.loop
+
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
@@ -288,20 +383,27 @@ class VimAdminThread(threading.Thread):
                     self.msg = msgkafka.MsgKafka()
                     self.msg.connect(config_msg)
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
-                        config_msg["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[message]':'driver'".format(
+                            config_msg["driver"]
+                        )
+                    )
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
+                self.loop.run_until_complete(
+                    asyncio.ensure_future(self.aiomain(), loop=self.loop)
+                )
             # except asyncio.CancelledError:
             #     break  # if cancelled it should end, breaking loop
             except Exception as e:
                 if not self.to_terminate:
-                    self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+                    self.logger.exception(
+                        "Exception '{}' at messaging read loop".format(e), exc_info=True
+                    )
 
         self.logger.info("Finishing")
         self._stop()
@@ -318,9 +420,11 @@ class VimAdminThread(threading.Thread):
         try:
             if command == "echo":
                 return
+
             if topic in self.kafka_topics:
-                target = topic[0:3]   # vim, wim or sdn
+                target = topic[0:3]  # vim, wim or sdn
                 target_id = target + ":" + params["_id"]
+
                 if command in ("edited", "edit"):
                     self.engine.reload_vim(target_id)
                     self.logger.debug("ordered to reload {}".format(target_id))
@@ -330,12 +434,19 @@ class VimAdminThread(threading.Thread):
                 elif command in ("create", "created"):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
-
         except (DbException, MsgException) as e:
-            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+            self.logger.error(
+                "Error while processing topic={} command={}: {}".format(
+                    topic, command, e
+                )
+            )
         except Exception as e:
-            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
-                                  exc_info=True)
+            self.logger.exception(
+                "Exception while processing topic={} command={}: {}".format(
+                    topic, command, e
+                ),
+                exc_info=True,
+            )
 
     def _stop(self):
         """
@@ -345,6 +456,7 @@ class VimAdminThread(threading.Thread):
         try:
             if self.db:
                 self.db.db_disconnect()
+
             if self.msg:
                 self.msg.disconnect()
         except (DbException, MsgException) as e:
@@ -358,10 +470,14 @@ class VimAdminThread(threading.Thread):
         """
         self.to_terminate = True
         self.lock_renew.to_terminate = True
+
         if self.aiomain_task_kafka:
             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+
         if self.aiomain_task_vim:
             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+
         if self.aiomain_task_renew_lock:
             self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+
         self.lock_renew.stop()