fix 1386: enhance on lock procedure 91/10091/3
authortierno <alfonso.tiernosepulveda@telefonica.com>
Wed, 9 Dec 2020 15:06:01 +0000 (15:06 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 18 Dec 2020 14:14:39 +0000 (14:14 +0000)
locking of ro_tasks for HA enhancements:
- lock time is configurable by ro.cfg or ENV
(OSMRO_SERVER_TASK_LOCKED_TIME)
- lock time is reduced to 10min by default
- lock is renewed when needed by a new asyncio task at vim_admin

Change-Id: Icae02803e57713aca2f028884142fa152ee3d032
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
NG-RO/osm_ng_ro/ns_thread.py
NG-RO/osm_ng_ro/ro.cfg
NG-RO/osm_ng_ro/vim_admin.py

index c7c5464..f967f83 100644 (file)
@@ -32,11 +32,10 @@ import yaml
 from pkg_resources import iter_entry_points
 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
 from copy import deepcopy
 from unittest.mock import Mock
 from http import HTTPStatus
@@ -779,10 +778,8 @@ class NsWorker(threading.Thread):
     REFRESH_ERROR = 600
     REFRESH_IMAGE = 3600 * 10
     REFRESH_DELETE = 3600 * 10
-    QUEUE_SIZE = 2000
+    QUEUE_SIZE = 100
     terminate = False
-    MAX_TIME_LOCKED = 3600
-    MAX_TIME_VIM_LOCKED = 120
 
     def __init__(self, worker_index, config, plugins, db):
         """
@@ -814,6 +811,7 @@ class NsWorker(threading.Thread):
         self.time_last_task_processed = None
         self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
         self.idle = True  # it is idle when there are not vim_targets associated
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     def insert_task(self, task):
         try:
@@ -915,7 +913,7 @@ class NsWorker(threading.Thread):
                 if operation["operationState"] != "PROCESSING":
                     continue
                 locked_at = operation.get("locked_at")
-                if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+                if locked_at is not None and locked_at >= now - self.task_locked_time:
                     # some other thread is doing this operation
                     return
                 # lock
@@ -1053,7 +1051,7 @@ class NsWorker(threading.Thread):
                     "ro_tasks",
                     q_filter={"target_id": self.vim_targets,
                               "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "locked_at.lt": now - self.task_locked_time,
                               "to_check_at.lt": self.time_last_task_processed},
                     update_dict={"locked_by": self.my_id, "locked_at": now},
                     fail_on_empty=False)
@@ -1209,13 +1207,14 @@ class NsWorker(threading.Thread):
             ro_task["vim_info"]["refresh_at"] = next_refresh
 
         try:
-            # 0 get task_status_create
+            # 0: get task_status_create
+            lock_object = None
             task_status_create = None
             task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
                                 t["status"] in ("BUILD", "DONE")), None)
             if task_create:
                 task_status_create = task_create["status"]
-            # 1. look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
+            # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
             for task_action in ("DELETE", "CREATE", "EXEC"):
                 db_vim_update = None
                 new_status = None
@@ -1256,7 +1255,10 @@ class NsWorker(threading.Thread):
                             if dependency_not_completed:
                                 # TODO set at vim_info.vim_details that it is waiting
                                 continue
-
+                        # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+                        # the task of renew this locking. It will update database locket_at periodically
+                        if not lock_object:
+                            lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
                         if task["action"] == "DELETE":
                             new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
                                                                                task_depends, db_ro_task_update)
@@ -1324,12 +1326,19 @@ class NsWorker(threading.Thread):
                             self.logger.error("Unexpected exception at _update_target task={}: {}".
                                               format(task["task_id"], e), exc_info=True)
 
-            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+            locked_at = ro_task["locked_at"]
+            if lock_object:
+                locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+                # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+                # contain exactly locked_at + self.task_locked_time
+                LockRenew.remove_lock_object(lock_object)
+            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
             # outside this task (by ro_nbi) do not update it
             db_ro_task_update["locked_by"] = None
             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
-            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+            db_ro_task_update["modified_at"] = now
             db_ro_task_update["to_check_at"] = next_check_at
             if not self.db.set_one("ro_tasks",
                                    update_dict=db_ro_task_update,
index 808fc0a..a2c78ec 100644 (file)
@@ -57,6 +57,11 @@ log.error_file: ""
 log.level: "DEBUG"
 #log.file: /var/log/osm/ro.log
 
+# time a ro_task at database remain locked, before expiring it must be re-locked with a write at database
+task_locked_time: 600
+task_max_locked_time: 7200  # lock is renewed until this maximum time
+task_relock_time: 30   # 30s before expiring lock time, it is re-locked again
+
 
 [database]
 # use env OSMRO_DATABASE_XXX to override
index 5af766c..e843c80 100644 (file)
@@ -27,7 +27,6 @@ from http import HTTPStatus
 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from osm_ng_ro.ns import NsException
 from time import time
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
@@ -40,8 +39,103 @@ class VimAdminException(Exception):
         Exception.__init__(self, message)
 
 
+class LockRenew:
+
+    renew_list = []
+    # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
+    # be renewed. The time order is achieved as it is appended at the end
+
+    def __init__(self, config, logger):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        """
+        self.config = config
+        self.logger = logger
+        self.to_terminate = False
+        self.loop = None
+        self.db = None
+        self.task_locked_time = config["global"]["task_locked_time"]
+        self.task_relock_time = config["global"]["task_relock_time"]
+        self.task_max_locked_time = config["global"]["task_max_locked_time"]
+
+    def start(self, db, loop):
+        self.db = db
+        self.loop = loop
+
+    @staticmethod
+    def add_lock_object(database_table, database_object, thread_object):
+        """
+        Insert a task to renew the locking
+        :param database_table: database collection where to maintain the lock
+        :param database_object: database object. '_id' and 'locked_at' are mandatory keys
+        :param thread_object: Thread object that has locked to check if it is alive
+        :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
+        """
+        lock_object = {
+            "table": database_table,
+            "_id": database_object["_id"],
+            "initial_lock_time": database_object["locked_at"],
+            "locked_at": database_object["locked_at"],
+            "thread": thread_object,
+            "unlocked": False  # True when it is not needed any more
+        }
+        LockRenew.renew_list.append(lock_object)
+        return lock_object
+
+    @staticmethod
+    def remove_lock_object(lock_object):
+        lock_object["unlocked"] = True
+
+    async def renew_locks(self):
+        while not self.to_terminate:
+            if not self.renew_list:
+                await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
+                continue
+            lock_object = self.renew_list[0]
+            if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
+                # task has been finished or locker thread is dead, not needed to re-locked.
+                self.renew_list.pop(0)
+                continue
+
+            locked_at = lock_object["locked_at"]
+            now = time()
+            time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
+            if time_to_relock < 1:
+                if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
+                    self.renew_list.pop(0)
+                    # re-lock
+                    new_locked_at = locked_at + self.task_locked_time
+                    try:
+                        if self.db.set_one(lock_object["table"],
+                                           update_dict={"locked_at": new_locked_at, "modified_at": now},
+                                           q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+                                           fail_on_empty=False):
+                            self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
+                            lock_object["locked_at"] = new_locked_at
+                            self.renew_list.append(lock_object)
+                        else:
+                            self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
+                                                                                  lock_object["_id"]))
+                    except Exception as e:
+                        self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
+                            lock_object["table"], lock_object["_id"], e))
+            else:
+                # wait until it is time to re-lock it
+                await asyncio.sleep(time_to_relock, loop=self.loop)
+
+    def stop(self):
+        # unlock all locked items
+        now = time()
+        for lock_object in self.renew_list:
+            locked_at = lock_object["locked_at"]
+            if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
+                self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
+                                q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+                                fail_on_empty=False)
+
+
 class VimAdminThread(threading.Thread):
-    MAX_TIME_LOCKED = 3600  # 1h
     MAX_TIME_UNATTENDED = 600  # 10min
     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
@@ -64,6 +158,10 @@ class VimAdminThread(threading.Thread):
         self.logger = logging.getLogger("ro.vimadmin")
         self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
         self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+        self.aiomain_task_renew_lock = None
+        # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
+        self.lock_renew = LockRenew(config, self.logger)
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     async def vim_watcher(self):
         """ Reads database periodically looking for tasks not processed by nobody because of a reboot
@@ -85,7 +183,7 @@ class VimAdminThread(threading.Thread):
             ro_tasks = self.db.get_list("ro_tasks",
                                         q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
                                                   "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                                  "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                                                  "locked_at.lt": now - self.task_locked_time,
                                                   "to_check_at.gt": self.last_rotask_time,
                                                   "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
@@ -101,7 +199,7 @@ class VimAdminThread(threading.Thread):
                 if self.db.get_list("ro_tasks",
                                     q_filter={"target_id": ro_task["target_id"],
                                               "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                                              "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+                                              "locked_at.gt": now - self.task_locked_time}):
                     continue
                 # unattended, assign vim
                 self.engine.assign_vim(ro_task["target_id"])
@@ -130,8 +228,12 @@ class VimAdminThread(threading.Thread):
                     self.aiomain_task_vim = asyncio.ensure_future(
                         self.vim_watcher(),
                         loop=self.loop)
-                done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
-                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                if not self.aiomain_task_renew_lock:
+                    self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
+
+                done, _ = await asyncio.wait(
+                    [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
+                    timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
                 try:
                     if self.aiomain_task_kafka in done:
                         exc = self.aiomain_task_kafka.exception()
@@ -141,6 +243,10 @@ class VimAdminThread(threading.Thread):
                         exc = self.aiomain_task_vim.exception()
                         self.logger.error("vim_account watcher task exception: {}".format(exc))
                         self.aiomain_task_vim = None
+                    if self.aiomain_task_renew_lock in done:
+                        exc = self.aiomain_task_renew_lock.exception()
+                        self.logger.error("renew_locks task exception: {}".format(exc))
+                        self.aiomain_task_renew_lock = None
                 except asyncio.CancelledError:
                     pass
 
@@ -170,6 +276,8 @@ class VimAdminThread(threading.Thread):
                 else:
                     raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
                         self.config["database"]["driver"]))
+            self.lock_renew.start(self.db, self.loop)
+
             if not self.msg:
                 config_msg = self.config["message"].copy()
                 config_msg["loop"] = self.loop
@@ -223,7 +331,7 @@ class VimAdminThread(threading.Thread):
                     self.engine.check_vim(target_id)
                     self.logger.debug("ordered to check {}".format(target_id))
 
-        except (NsException, DbException, MsgException) as e:
+        except (DbException, MsgException) as e:
             self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
         except Exception as e:
             self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
@@ -249,7 +357,11 @@ class VimAdminThread(threading.Thread):
         :return: None
         """
         self.to_terminate = True
+        self.lock_renew.to_terminate = True
         if self.aiomain_task_kafka:
             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
         if self.aiomain_task_vim:
             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+        if self.aiomain_task_renew_lock:
+            self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+        self.lock_renew.stop()