Disable the check of the release notes
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index 5af766c..e34cce4 100644 (file)
@@ -19,29 +19,153 @@ It is based on asyncio.
 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
 """
 
 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
 """
 
-import logging
-import threading
 import asyncio
 from http import HTTPStatus
 import asyncio
 from http import HTTPStatus
+import logging
+import threading
+from time import time
 
 
-from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common import dbmemory, dbmongo, msgkafka, msglocal
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from osm_ng_ro.ns import NsException
-from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 class VimAdminException(Exception):
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 class VimAdminException(Exception):
-
     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
         self.http_code = http_code
         Exception.__init__(self, message)
 
 
     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
         self.http_code = http_code
         Exception.__init__(self, message)
 
 
+class LockRenew:
+    renew_list = []
+    # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
+    # be renewed. The time order is achieved as it is appended at the end
+
+    def __init__(self, config, logger):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        """
+        self.config = config
+        self.logger = logger
+        self.to_terminate = False
+        self.db = None
+        self.task_locked_time = config["global"]["task_locked_time"]
+        self.task_relock_time = config["global"]["task_relock_time"]
+        self.task_max_locked_time = config["global"]["task_max_locked_time"]
+
+    def start(self, db):
+        self.db = db
+
+    @staticmethod
+    def add_lock_object(database_table, database_object, thread_object):
+        """
+        Insert a task to renew the locking
+        :param database_table: database collection where to maintain the lock
+        :param database_object: database object. '_id' and 'locked_at' are mandatory keys
+        :param thread_object: Thread object that has locked to check if it is alive
+        :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
+        """
+        lock_object = {
+            "table": database_table,
+            "_id": database_object["_id"],
+            "initial_lock_time": database_object["locked_at"],
+            "locked_at": database_object["locked_at"],
+            "thread": thread_object,
+            "unlocked": False,  # True when it is not needed any more
+        }
+        LockRenew.renew_list.append(lock_object)
+
+        return lock_object
+
+    @staticmethod
+    def remove_lock_object(lock_object):
+        lock_object["unlocked"] = True
+
+    async def renew_locks(self):
+        while not self.to_terminate:
+            if not self.renew_list:
+                await asyncio.sleep(self.task_locked_time - self.task_relock_time)
+                continue
+
+            lock_object = self.renew_list[0]
+
+            if (
+                lock_object["unlocked"]
+                or not lock_object["thread"]
+                or not lock_object["thread"].is_alive()
+            ):
+                # task has been finished or locker thread is dead, not needed to re-locked.
+                self.renew_list.pop(0)
+                continue
+
+            locked_at = lock_object["locked_at"]
+            now = time()
+            time_to_relock = (
+                locked_at + self.task_locked_time - self.task_relock_time - now
+            )
+
+            if time_to_relock < 1:
+                if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
+                    self.renew_list.pop(0)
+                    # re-lock
+                    new_locked_at = locked_at + self.task_locked_time
+
+                    try:
+                        if self.db.set_one(
+                            lock_object["table"],
+                            update_dict={
+                                "locked_at": new_locked_at,
+                                "modified_at": now,
+                            },
+                            q_filter={
+                                "_id": lock_object["_id"],
+                                "locked_at": locked_at,
+                            },
+                            fail_on_empty=False,
+                        ):
+                            self.logger.debug(
+                                "Renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
+                            lock_object["locked_at"] = new_locked_at
+                            self.renew_list.append(lock_object)
+                        else:
+                            self.logger.info(
+                                "Cannot renew lock for {}.{}".format(
+                                    lock_object["table"], lock_object["_id"]
+                                )
+                            )
+                    except Exception as e:
+                        self.logger.error(
+                            "Exception when trying to renew lock for {}.{}: {}".format(
+                                lock_object["table"], lock_object["_id"], e
+                            )
+                        )
+            else:
+                # wait until it is time to re-lock it
+                await asyncio.sleep(time_to_relock)
+
+    def stop(self):
+        # unlock all locked items
+        now = time()
+
+        for lock_object in self.renew_list:
+            locked_at = lock_object["locked_at"]
+
+            if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
+                self.db.set_one(
+                    lock_object["table"],
+                    update_dict={"locked_at": 0},
+                    q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+                    fail_on_empty=False,
+                )
+
+
 class VimAdminThread(threading.Thread):
 class VimAdminThread(threading.Thread):
-    MAX_TIME_LOCKED = 3600  # 1h
     MAX_TIME_UNATTENDED = 600  # 10min
     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
     MAX_TIME_UNATTENDED = 600  # 10min
     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
@@ -62,16 +186,25 @@ class VimAdminThread(threading.Thread):
         self.last_rotask_time = 0
         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
         self.last_rotask_time = 0
         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
-        self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
-        self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+        # asyncio task for receiving vim actions from kafka bus
+        self.aiomain_task_kafka = None
+        # asyncio task for watching ro_tasks not processed by nobody
+        self.aiomain_task_vim = None
+        self.aiomain_task_renew_lock = None
+        # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
+        self.lock_renew = LockRenew(config, self.logger)
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     async def vim_watcher(self):
 
     async def vim_watcher(self):
-        """ Reads database periodically looking for tasks not processed by nobody because of a reboot
+        """Reads database periodically looking for tasks not processed by nobody because of a reboot
         in order to load this vim"""
         # firstly read VIMS not processed
         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
         in order to load this vim"""
         # firstly read VIMS not processed
         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
-            unattended_targets = self.db.get_list(target_database,
-                                                  q_filter={"_admin.operations.operationState": "PROCESSING"})
+            unattended_targets = self.db.get_list(
+                target_database,
+                q_filter={"_admin.operations.operationState": "PROCESSING"},
+            )
+
             for target in unattended_targets:
                 target_id = "{}:{}".format(target_database[:3], target["_id"])
                 self.logger.info("ordered to check {}".format(target_id))
             for target in unattended_targets:
                 target_id = "{}:{}".format(target_database[:3], target["_id"])
                 self.logger.info("ordered to check {}".format(target_id))
@@ -80,78 +213,128 @@ class VimAdminThread(threading.Thread):
         while not self.to_terminate:
             now = time()
             processed_vims = []
         while not self.to_terminate:
             now = time()
             processed_vims = []
+
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
-            ro_tasks = self.db.get_list("ro_tasks",
-                                        q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
-                                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                                  "locked_at.lt": now - self.MAX_TIME_LOCKED,
-                                                  "to_check_at.gt": self.last_rotask_time,
-                                                  "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+
+            ro_tasks = self.db.get_list(
+                "ro_tasks",
+                q_filter={
+                    "target_id.ncont": self.engine.get_assigned_vims(),
+                    "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                    "locked_at.lt": now - self.task_locked_time,
+                    "to_check_at.gt": self.last_rotask_time,
+                    "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
+                },
+            )
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+
             for ro_task in ro_tasks:
                 # if already checked ignore
                 if ro_task["target_id"] in processed_vims:
                     continue
             for ro_task in ro_tasks:
                 # if already checked ignore
                 if ro_task["target_id"] in processed_vims:
                     continue
+
                 processed_vims.append(ro_task["target_id"])
                 processed_vims.append(ro_task["target_id"])
+
                 # if already assigned ignore
                 if ro_task["target_id"] in self.engine.get_assigned_vims():
                     continue
                 # if already assigned ignore
                 if ro_task["target_id"] in self.engine.get_assigned_vims():
                     continue
+
                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
-                if self.db.get_list("ro_tasks",
-                                    q_filter={"target_id": ro_task["target_id"],
-                                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                              "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+                if self.db.get_list(
+                    "ro_tasks",
+                    q_filter={
+                        "target_id": ro_task["target_id"],
+                        "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+                        "locked_at.gt": now - self.task_locked_time,
+                    },
+                ):
                     continue
                     continue
+
                 # unattended, assign vim
                 self.engine.assign_vim(ro_task["target_id"])
                 # unattended, assign vim
                 self.engine.assign_vim(ro_task["target_id"])
-                self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+                self.logger.debug(
+                    "ordered to load {}. Inactivity detected".format(
+                        ro_task["target_id"]
+                    )
+                )
 
             # every 2 hours check if there are vims without any ro_task and unload it
             if now > self.next_check_unused_vim:
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
 
             # every 2 hours check if there are vims without any ro_task and unload it
             if now > self.next_check_unused_vim:
                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
                 self.engine.unload_unused_vims()
-            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
+
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED)
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
 
     async def aiomain(self):
         kafka_working = True
         while not self.to_terminate:
             try:
                 if not self.aiomain_task_kafka:
-                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
-                    await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+                    for kafka_topic in self.kafka_topics:
+                        await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
                     kafka_working = True
                     self.logger.debug("Starting vim_account subscription task")
                     self.aiomain_task_kafka = asyncio.ensure_future(
-                        self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
-                                         aiocallback=self._msg_callback),
-                        loop=self.loop)
+                        self.msg.aioread(
+                            self.kafka_topics,
+                            group_id=False,
+                            aiocallback=self._msg_callback,
+                        ),
+                    )
+
                 if not self.aiomain_task_vim:
                 if not self.aiomain_task_vim:
-                    self.aiomain_task_vim = asyncio.ensure_future(
-                        self.vim_watcher(),
-                        loop=self.loop)
-                done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
-                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                    self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
+
+                if not self.aiomain_task_renew_lock:
+                    self.aiomain_task_renew_lock = asyncio.ensure_future(
+                        self.lock_renew.renew_locks()
+                    )
+
+                done, _ = await asyncio.wait(
+                    [
+                        self.aiomain_task_kafka,
+                        self.aiomain_task_vim,
+                        self.aiomain_task_renew_lock,
+                    ],
+                    timeout=None,
+                    return_when=asyncio.FIRST_COMPLETED,
+                )
+
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
-                        self.logger.error("kafka subscription task exception: {}".format(exc))
+                        self.logger.error(
+                            "kafka subscription task exception: {}".format(exc)
+                        )
                         self.aiomain_task_kafka = None
                         self.aiomain_task_kafka = None
+
                     if self.aiomain_task_vim in done:
                         exc = self.aiomain_task_vim.exception()
                     if self.aiomain_task_vim in done:
                         exc = self.aiomain_task_vim.exception()
-                        self.logger.error("vim_account watcher task exception: {}".format(exc))
+                        self.logger.error(
+                            "vim_account watcher task exception: {}".format(exc)
+                        )
                         self.aiomain_task_vim = None
                         self.aiomain_task_vim = None
+
+                    if self.aiomain_task_renew_lock in done:
+                        exc = self.aiomain_task_renew_lock.exception()
+                        self.logger.error("renew_locks task exception: {}".format(exc))
+                        self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
                 except asyncio.CancelledError:
-                    pass
+                    self.logger.exception("asyncio.CancelledError occured.")
 
             except Exception as e:
                 if self.to_terminate:
                     return
 
             except Exception as e:
                 if self.to_terminate:
                     return
+
                 if kafka_working:
                     # logging only first time
                 if kafka_working:
                     # logging only first time
-                    self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+                    self.logger.critical(
+                        "Error accessing kafka '{}'. Retrying ...".format(e)
+                    )
                     kafka_working = False
                     kafka_working = False
-            await asyncio.sleep(10, loop=self.loop)
+
+            await asyncio.sleep(10)
 
     def run(self):
         """
 
     def run(self):
         """
@@ -168,11 +351,17 @@ class VimAdminThread(threading.Thread):
                     self.db = dbmemory.DbMemory()
                     self.db.db_connect(self.config["database"])
                 else:
                     self.db = dbmemory.DbMemory()
                     self.db.db_connect(self.config["database"])
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
-                        self.config["database"]["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[database]':'driver'".format(
+                            self.config["database"]["driver"]
+                        )
+                    )
+
+            self.lock_renew.start(self.db)
+
             if not self.msg:
                 config_msg = self.config["message"].copy()
             if not self.msg:
                 config_msg = self.config["message"].copy()
-                config_msg["loop"] = self.loop
+
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
@@ -180,25 +369,32 @@ class VimAdminThread(threading.Thread):
                     self.msg = msgkafka.MsgKafka()
                     self.msg.connect(config_msg)
                 else:
                     self.msg = msgkafka.MsgKafka()
                     self.msg.connect(config_msg)
                 else:
-                    raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
-                        config_msg["driver"]))
+                    raise VimAdminException(
+                        "Invalid configuration param '{}' at '[message]':'driver'".format(
+                            config_msg["driver"]
+                        )
+                    )
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
         self.logger.info("Starting")
         while not self.to_terminate:
             try:
-                self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
-            # except asyncio.CancelledError:
-            #     break  # if cancelled it should end, breaking loop
+                asyncio.run(self.main_task())
             except Exception as e:
                 if not self.to_terminate:
             except Exception as e:
                 if not self.to_terminate:
-                    self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+                    self.logger.exception(
+                        "Exception '{}' at messaging read loop".format(e), exc_info=True
+                    )
 
         self.logger.info("Finishing")
         self._stop()
         self.loop.close()
 
 
         self.logger.info("Finishing")
         self._stop()
         self.loop.close()
 
+    async def main_task(self):
+        task = asyncio.ensure_future(self.aiomain())
+        await task
+
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
     async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
@@ -210,9 +406,11 @@ class VimAdminThread(threading.Thread):
         try:
             if command == "echo":
                 return
         try:
             if command == "echo":
                 return
+
             if topic in self.kafka_topics:
             if topic in self.kafka_topics:
-                target = topic[0:3]   # vim, wim or sdn
+                target = topic[0:3]  # vim, wim or sdn
                 target_id = target + ":" + params["_id"]
                 target_id = target + ":" + params["_id"]
+
                 if command in ("edited", "edit"):
                     self.engine.reload_vim(target_id)
                     self.logger.debug("ordered to reload {}".format(target_id))
                 if command in ("edited", "edit"):
                     self.engine.reload_vim(target_id)
                     self.logger.debug("ordered to reload {}".format(target_id))
@@ -222,12 +420,19 @@ class VimAdminThread(threading.Thread):
                 elif command in ("create", "created"):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
                 elif command in ("create", "created"):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
-
-        except (NsException, DbException, MsgException) as e:
-            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+        except (DbException, MsgException) as e:
+            self.logger.error(
+                "Error while processing topic={} command={}: {}".format(
+                    topic, command, e
+                )
+            )
         except Exception as e:
         except Exception as e:
-            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
-                                  exc_info=True)
+            self.logger.exception(
+                "Exception while processing topic={} command={}: {}".format(
+                    topic, command, e
+                ),
+                exc_info=True,
+            )
 
     def _stop(self):
         """
 
     def _stop(self):
         """
@@ -237,6 +442,7 @@ class VimAdminThread(threading.Thread):
         try:
             if self.db:
                 self.db.db_disconnect()
         try:
             if self.db:
                 self.db.db_disconnect()
+
             if self.msg:
                 self.msg.disconnect()
         except (DbException, MsgException) as e:
             if self.msg:
                 self.msg.disconnect()
         except (DbException, MsgException) as e:
@@ -249,7 +455,15 @@ class VimAdminThread(threading.Thread):
         :return: None
         """
         self.to_terminate = True
         :return: None
         """
         self.to_terminate = True
+        self.lock_renew.to_terminate = True
+
         if self.aiomain_task_kafka:
         if self.aiomain_task_kafka:
-            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel())
+
         if self.aiomain_task_vim:
         if self.aiomain_task_vim:
-            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel())
+
+        if self.aiomain_task_renew_lock:
+            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel())
+
+        self.lock_renew.stop()