Fix bug 1537 to send dummy message to all kafka topics during startup
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index d7a7c1d..d875272 100644 (file)
@@ -27,22 +27,152 @@ from http import HTTPStatus
 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from osm_ng_ro.ns import NsException
 from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 class VimAdminException(Exception):
 from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 class VimAdminException(Exception):
-
     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
         self.http_code = http_code
         Exception.__init__(self, message)
 
 
     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
         self.http_code = http_code
         Exception.__init__(self, message)
 
 
+class LockRenew:
+
+    renew_list = []
+    # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
+    # be renewed. The time order is achieved as it is appended at the end
+
+    def __init__(self, config, logger):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        """
+        self.config = config
+        self.logger = logger
+        self.to_terminate = False
+        self.loop = None
+        self.db = None
+        self.task_locked_time = config["global"]["task_locked_time"]
+        self.task_relock_time = config["global"]["task_relock_time"]
+        self.task_max_locked_time = config["global"]["task_max_locked_time"]
+
+    def start(self, db, loop):
+        self.db = db
+        self.loop = loop
+
+    @staticmethod
+    def add_lock_object(database_table, database_object, thread_object):
+        """
+        Insert a task to renew the locking
+        :param database_table: database collection where to maintain the lock
+        :param database_object: database object. '_id' and 'locked_at' are mandatory keys
+        :param thread_object: Thread object that has locked to check if it is alive
+        :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
+        """
+        lock_object = {
+            "table": database_table,
+            "_id": database_object["_id"],
+            "initial_lock_time": database_object["locked_at"],
+            "locked_at": database_object["locked_at"],
+            "thread": thread_object,
+            "unlocked": False,  # True when it is not needed any more
+        }
+        LockRenew.renew_list.append(lock_object)
+
+        return lock_object
+
+    @staticmethod
+    def remove_lock_object(lock_object):
+        lock_object["unlocked"] = True
+
+    async def renew_locks(self):
+        while not self.to_terminate:
+            if not self.renew_list:
+                await asyncio.sleep(
+                    self.task_locked_time - self.task_relock_time, loop=self.loop
+                )
+                continue
+
+            lock_object = self.renew_list[0]
+
+            if (
+                lock_object["unlocked"]
+                or not lock_object["thread"]
+                or not lock_object["thread"].is_alive()
+            ):
+                # task has been finished or locker thread is dead, not needed to re-locked.
+                self.renew_list.pop(0)
+                continue
+
+            locked_at = lock_object["locked_at"]
+            now = time()
+            time_to_relock = (
+                locked_at + self.task_locked_time - self.task_relock_time - now
+            )
+
+            if time_to_relock < 1:
+                if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
+                    self.renew_list.pop(0)
+                    # re-lock
+                    new_locked_at = locked_at + self.task_locked_time
+
+                    try:
+                        if self.db.set_one(
+                            lock_object["table"],
+                            update_dict={
+                                "locked_at": new_locked_at,
+                                "modified_at": now,
+                            },
+                            q_filter={
+                                "_id": lock_object["_id"],
+                                "locked_at": locked_at,
+                            },
+                            fail_on_empty=False,
+                        ):
+                            self.logger.debug(
+                                "Renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
+                            lock_object["locked_at"] = new_locked_at
+                            self.renew_list.append(lock_object)
+                        else:
+                            self.logger.info(
+                                "Cannot renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
+                    except Exception as e:
+                        self.logger.error(
+                            "Exception when trying to renew lock for {}.{}: {}".format(
+                                lock_object["table"], lock_object["_id"], e
+                            )
+                        )
+            else:
+                # wait until it is time to re-lock it
+                await asyncio.sleep(time_to_relock, loop=self.loop)
+
+    def stop(self):
+        # unlock all locked items
+        now = time()
+
+        for lock_object in self.renew_list:
+            locked_at = lock_object["locked_at"]
+
+            if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
+                self.db.set_one(
+                    lock_object["table"],
+                    update_dict={"locked_at": 0},
+                    q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+                    fail_on_empty=False,
+                )
+
+
 class VimAdminThread(threading.Thread):
 class VimAdminThread(threading.Thread):
-    MAX_TIME_LOCKED = 3600  # 1h
-    MAX_TIME_UNATTENDED = 60  # 600  # 10min
+    MAX_TIME_UNATTENDED = 600  # 10min
+    TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
 
     def __init__(self, config, engine):
     kafka_topics = ("vim_account", "wim_account", "sdn")
 
     def __init__(self, config, engine):
@@ -59,30 +189,87 @@ class VimAdminThread(threading.Thread):
         self.engine = engine
         self.loop = None
         self.last_rotask_time = 0
         self.engine = engine
         self.loop = None
         self.last_rotask_time = 0
+        self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
         self.logger = logging.getLogger("ro.vimadmin")
-        self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
-        self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+        # asyncio task for receiving vim actions from kafka bus
+        self.aiomain_task_kafka = None
+        # asyncio task for watching ro_tasks not processed by nobody
+        self.aiomain_task_vim = None
+        self.aiomain_task_renew_lock = None
+        # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
+        self.lock_renew = LockRenew(config, self.logger)
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     async def vim_watcher(self):
 
     async def vim_watcher(self):
-        """ Reads database periodically looking for tasks not processed by nobody because of a restar
+        """Reads database periodically looking for tasks not processed by nobody because of a reboot
         in order to load this vim"""
         in order to load this vim"""
+        # firstly read VIMS not processed
+        for target_database in ("vim_accounts", "wim_accounts", "sdns"):
+            unattended_targets = self.db.get_list(
+                target_database,
+                q_filter={"_admin.operations.operationState": "PROCESSING"},
+            )
+
+            for target in unattended_targets:
+                target_id = "{}:{}".format(target_database[:3], target["_id"])
+                self.logger.info("ordered to check {}".format(target_id))
+                self.engine.check_vim(target_id)
+
         while not self.to_terminate:
             now = time()
         while not self.to_terminate:
             now = time()
+            processed_vims = []
+
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
-            ro_tasks = self.db.get_list("ro_tasks",
-                                        q_filter={"target_id.ncont": self.engine.assignment_list,
-                                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                                  "locked_at.lt": now - self.MAX_TIME_LOCKED,
-                                                  "to_check_at.gt": self.last_rotask_time,
-                                                  "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+
+            ro_tasks = self.db.get_list(
+                "ro_tasks",
+                q_filter={
+                    "target_id.ncont": self.engine.get_assigned_vims(),
+                    "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                    "locked_at.lt": now - self.task_locked_time,
+                    "to_check_at.gt": self.last_rotask_time,
+                    "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
+                },
+            )
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+
             for ro_task in ro_tasks:
             for ro_task in ro_tasks:
-                if ro_task["target_id"] not in self.engine.assignment_list:
-                    self.engine.assign_vim(ro_task["target_id"])
-                    self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+                # if already checked ignore
+                if ro_task["target_id"] in processed_vims:
+                    continue
+
+                processed_vims.append(ro_task["target_id"])
+
+                # if already assigned ignore
+                if ro_task["target_id"] in self.engine.get_assigned_vims():
+                    continue
+
+                # if there is some task locked on this VIM, there is an RO working on it, so ignore
+                if self.db.get_list(
+                    "ro_tasks",
+                    q_filter={
+                        "target_id": ro_task["target_id"],
+                        "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                        "locked_at.gt": now - self.task_locked_time,
+                    },
+                ):
+                    continue
 
 
-            await asyncio.sleep(300, loop=self.loop)
+                # unattended, assign vim
+                self.engine.assign_vim(ro_task["target_id"])
+                self.logger.debug(
+                    "ordered to load {}. Inactivity detected".format(
+                        ro_task["target_id"]
+                    )
+                )
+
+            # every 2 hours check if there are vims without any ro_task and unload it
+            if now > self.next_check_unused_vim:
+                self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
+                self.engine.unload_unused_vims()
+
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
 
     async def aiomain(self):
         kafka_working = True
 
     async def aiomain(self):
         kafka_working = True
@@ -90,38 +277,76 @@ class VimAdminThread(threading.Thread):
             try:
                 if not self.aiomain_task_kafka:
                     # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
             try:
                 if not self.aiomain_task_kafka:
                     # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
-                    await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+                    for kafka_topic in self.kafka_topics:
+                        await self.msg.aiowrite(
+                            kafka_topic, "echo", "dummy message", loop=self.loop
+                        )
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
-                        self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
-                                         aiocallback=self._msg_callback),
-                        loop=self.loop)
+                        self.msg.aioread(
+                            self.kafka_topics,
+                            loop=self.loop,
+                            group_id=False,
+                            aiocallback=self._msg_callback,
+                        ),
+                        loop=self.loop,
+                    )
+
                 if not self.aiomain_task_vim:
                     self.aiomain_task_vim = asyncio.ensure_future(
                 if not self.aiomain_task_vim:
                     self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(),
-                        loop=self.loop)
-                done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
-                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                        self.vim_watcher(), loop=self.loop
+                    )
+
+                if not self.aiomain_task_renew_lock:
+                    self.aiomain_task_renew_lock = asyncio.ensure_future(
+                        self.lock_renew.renew_locks(), loop=self.loop
+                    )
+
+                done, _ = await asyncio.wait(
+                    [
+                        self.aiomain_task_kafka,
+                        self.aiomain_task_vim,
+                        self.aiomain_task_renew_lock,
+                    ],
+                    timeout=None,
+                    loop=self.loop,
+                    return_when=asyncio.FIRST_COMPLETED,
+                )
+
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
-                        self.logger.error("kafka subscription task exception: {}".format(exc))
+                        self.logger.error(
+                            "kafka subscription task exception: {}".format(exc)
+                        )
                         self.aiomain_task_kafka = None
                         self.aiomain_task_kafka = None
+
                     if self.aiomain_task_vim in done:
                         exc = self.aiomain_task_vim.exception()
                     if self.aiomain_task_vim in done:
                         exc = self.aiomain_task_vim.exception()
-                        self.logger.error("vim_account watcher task exception: {}".format(exc))
+                        self.logger.error(
+                            "vim_account watcher task exception: {}".format(exc)
+                        )
                         self.aiomain_task_vim = None
                         self.aiomain_task_vim = None
+
+                    if self.aiomain_task_renew_lock in done:
+                        exc = self.aiomain_task_renew_lock.exception()
+                        self.logger.error("renew_locks task exception: {}".format(exc))
+                        self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
                     pass
 
             except Exception as e:
                 if self.to_terminate:
                     return
                 except asyncio.CancelledError:
                     pass
 
             except Exception as e:
                 if self.to_terminate:
                     return
+
                 if kafka_working:
                     # logging only first time
                 if kafka_working:
                     # logging only first time
-                    self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+                    self.logger.critical(
+                        "Error accessing kafka '{}'. Retrying ...".format(e)
+                    )
                     kafka_working = False
                     kafka_working = False
+
             await asyncio.sleep(10, loop=self.loop)
 
     def run(self):
             await asyncio.sleep(10, loop=self.loop)
 
     def run(self):
@@ -139,11 +364,18 @@ class VimAdminThread(threading.Thread):
                     self.db = dbmemory.DbMemory()
                     self.db.db_connect(self.config["database"])
                 else:
                     self.db = dbmemory.DbMemory()
                     self.db.db_connect(self.config["database"])
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
-                        self.config["database"]["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[database]':'driver'".format(
+                            self.config["database"]["driver"]
+                        )
+                    )
+
+            self.lock_renew.start(self.db, self.loop)
+
             if not self.msg:
                 config_msg = self.config["message"].copy()
                 config_msg["loop"] = self.loop
             if not self.msg:
                 config_msg = self.config["message"].copy()
                 config_msg["loop"] = self.loop
+
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
@@ -151,22 +383,29 @@ class VimAdminThread(threading.Thread):
                     self.msg = msgkafka.MsgKafka()
                     self.msg.connect(config_msg)
                 else:
                     self.msg = msgkafka.MsgKafka()
                     self.msg.connect(config_msg)
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
-                        config_msg["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[message]':'driver'".format(
+                            config_msg["driver"]
+                        )
+                    )
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
-        self.logger.debug("Starting")
+        self.logger.info("Starting")
         while not self.to_terminate:
             try:
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
+                self.loop.run_until_complete(
+                    asyncio.ensure_future(self.aiomain(), loop=self.loop)
+                )
             # except asyncio.CancelledError:
             #     break  # if cancelled it should end, breaking loop
             except Exception as e:
                 if not self.to_terminate:
             # except asyncio.CancelledError:
             #     break  # if cancelled it should end, breaking loop
             except Exception as e:
                 if not self.to_terminate:
-                    self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+                    self.logger.exception(
+                        "Exception '{}' at messaging read loop".format(e), exc_info=True
+                    )
 
 
-        self.logger.debug("Finishing")
+        self.logger.info("Finishing")
         self._stop()
         self.loop.close()
 
         self._stop()
         self.loop.close()
 
@@ -181,9 +420,11 @@ class VimAdminThread(threading.Thread):
         try:
             if command == "echo":
                 return
         try:
             if command == "echo":
                 return
+
             if topic in self.kafka_topics:
             if topic in self.kafka_topics:
-                target = topic[0:3]   # vim, wim or sdn
+                target = topic[0:3]  # vim, wim or sdn
                 target_id = target + ":" + params["_id"]
                 target_id = target + ":" + params["_id"]
+
                 if command in ("edited", "edit"):
                     self.engine.reload_vim(target_id)
                     self.logger.debug("ordered to reload {}".format(target_id))
                 if command in ("edited", "edit"):
                     self.engine.reload_vim(target_id)
                     self.logger.debug("ordered to reload {}".format(target_id))
@@ -193,12 +434,19 @@ class VimAdminThread(threading.Thread):
                 elif command in ("create", "created"):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
                 elif command in ("create", "created"):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
-
-        except (NsException, DbException, MsgException) as e:
-            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+        except (DbException, MsgException) as e:
+            self.logger.error(
+                "Error while processing topic={} command={}: {}".format(
+                    topic, command, e
+                )
+            )
         except Exception as e:
         except Exception as e:
-            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
-                                  exc_info=True)
+            self.logger.exception(
+                "Exception while processing topic={} command={}: {}".format(
+                    topic, command, e
+                ),
+                exc_info=True,
+            )
 
     def _stop(self):
         """
 
     def _stop(self):
         """
@@ -208,6 +456,7 @@ class VimAdminThread(threading.Thread):
         try:
             if self.db:
                 self.db.db_disconnect()
         try:
             if self.db:
                 self.db.db_disconnect()
+
             if self.msg:
                 self.msg.disconnect()
         except (DbException, MsgException) as e:
             if self.msg:
                 self.msg.disconnect()
         except (DbException, MsgException) as e:
@@ -220,7 +469,15 @@ class VimAdminThread(threading.Thread):
         :return: None
         """
         self.to_terminate = True
         :return: None
         """
         self.to_terminate = True
+        self.lock_renew.to_terminate = True
+
         if self.aiomain_task_kafka:
             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
         if self.aiomain_task_kafka:
             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+
         if self.aiomain_task_vim:
             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
         if self.aiomain_task_vim:
             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+
+        if self.aiomain_task_renew_lock:
+            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+
+        self.lock_renew.stop()