From: tierno Date: Wed, 9 Dec 2020 15:06:01 +0000 (+0000) Subject: fix 1386: enhance on lock procedure X-Git-Tag: v9.1.0~7 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F48%2F10148%2F1;p=osm%2FRO.git fix 1386: enhance on lock procedure locking of ro_tasks for HA enhancements: - lock time is configurable by ro.cfg or ENV (OSMRO_SERVER_TASK_LOCKED_TIME) - lock time is reduced to 10min by default - lock is renewed when needed by a new asyncio task at vim_admin Change-Id: Icae02803e57713aca2f028884142fa152ee3d032 Signed-off-by: tierno --- diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index c7c5464b..f967f832 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -32,11 +32,10 @@ import yaml from pkg_resources import iter_entry_points # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version from osm_common.dbbase import DbException -# from osm_common.fsbase import FsException -# from osm_common.msgbase import MsgException from osm_ro_plugin.vim_dummy import VimDummyConnector from osm_ro_plugin.sdn_dummy import SdnDummyConnector from osm_ro_plugin import vimconn, sdnconn +from osm_ng_ro.vim_admin import LockRenew from copy import deepcopy from unittest.mock import Mock from http import HTTPStatus @@ -779,10 +778,8 @@ class NsWorker(threading.Thread): REFRESH_ERROR = 600 REFRESH_IMAGE = 3600 * 10 REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 2000 + QUEUE_SIZE = 100 terminate = False - MAX_TIME_LOCKED = 3600 - MAX_TIME_VIM_LOCKED = 120 def __init__(self, worker_index, config, plugins, db): """ @@ -814,6 +811,7 @@ class NsWorker(threading.Thread): self.time_last_task_processed = None self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db self.idle = True # it is idle when there are not vim_targets associated + self.task_locked_time = config["global"]["task_locked_time"] def insert_task(self, task): try: @@ -915,7 +913,7 @@ class NsWorker(threading.Thread): if operation["operationState"] != "PROCESSING": continue locked_at = operation.get("locked_at") - if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED: + if locked_at is not None and locked_at >= now - self.task_locked_time: # some other thread is doing this operation return # lock @@ -1053,7 +1051,7 @@ class NsWorker(threading.Thread): "ro_tasks", q_filter={"target_id": self.vim_targets, "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at.lt": now - self.MAX_TIME_LOCKED, + "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed}, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False) @@ -1209,13 +1207,14 @@ class NsWorker(threading.Thread): ro_task["vim_info"]["refresh_at"] = next_refresh try: - # 0 get task_status_create + # 0: get task_status_create + lock_object = None task_status_create = None task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] in ("BUILD", "DONE")), None) if task_create: task_status_create = task_create["status"] - # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD + # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD for task_action in ("DELETE", "CREATE", "EXEC"): db_vim_update = None new_status = None @@ -1256,7 +1255,10 @@ class NsWorker(threading.Thread): if dependency_not_completed: # TODO set at vim_info.vim_details that it is waiting continue - + # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew + # the task of renew this locking. It will update database locket_at periodically + if not lock_object: + lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self) if task["action"] == "DELETE": new_status, db_vim_info_update = self._delete_task(ro_task, task_index, task_depends, db_ro_task_update) @@ -1324,12 +1326,19 @@ class NsWorker(threading.Thread): self.logger.error("Unexpected exception at _update_target task={}: {}". format(task["task_id"], e), exc_info=True) - q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]} + locked_at = ro_task["locked_at"] + if lock_object: + locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time] + # locked_at contains two times to avoid race condition. In case the lock has been renew, it will + # contain exactly locked_at + self.task_locked_time + LockRenew.remove_lock_object(lock_object) + q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at} # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, # outside this task (by ro_nbi) do not update it db_ro_task_update["locked_by"] = None # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked - db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) + db_ro_task_update["locked_at"] = int(now - self.task_locked_time) + db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at if not self.db.set_one("ro_tasks", update_dict=db_ro_task_update, diff --git a/NG-RO/osm_ng_ro/ro.cfg b/NG-RO/osm_ng_ro/ro.cfg index 808fc0ad..a2c78ecd 100644 --- a/NG-RO/osm_ng_ro/ro.cfg +++ b/NG-RO/osm_ng_ro/ro.cfg @@ -57,6 +57,11 @@ log.error_file: "" log.level: "DEBUG" #log.file: /var/log/osm/ro.log +# time a ro_task at database remain locked, before expiring it must be re-locked with a write at database +task_locked_time: 600 +task_max_locked_time: 7200 # lock is renewed until this maximum time +task_relock_time: 30 # 30s before expiring lock time, it is re-locked again + [database] # use env OSMRO_DATABASE_XXX to override diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py index 5af766c7..e843c80a 100644 --- a/NG-RO/osm_ng_ro/vim_admin.py +++ b/NG-RO/osm_ng_ro/vim_admin.py @@ -27,7 +27,6 @@ from http import HTTPStatus from osm_common import dbmongo, dbmemory, msglocal, msgkafka from osm_common.dbbase import DbException from osm_common.msgbase import MsgException -from osm_ng_ro.ns import NsException from time import time __author__ = "Alfonso Tierno " @@ -40,8 +39,103 @@ class VimAdminException(Exception): Exception.__init__(self, message) +class LockRenew: + + renew_list = [] + # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to + # be renewed. The time order is achieved as it is appended at the end + + def __init__(self, config, logger): + """ + Constructor of class + :param config: configuration parameters of database and messaging + """ + self.config = config + self.logger = logger + self.to_terminate = False + self.loop = None + self.db = None + self.task_locked_time = config["global"]["task_locked_time"] + self.task_relock_time = config["global"]["task_relock_time"] + self.task_max_locked_time = config["global"]["task_max_locked_time"] + + def start(self, db, loop): + self.db = db + self.loop = loop + + @staticmethod + def add_lock_object(database_table, database_object, thread_object): + """ + Insert a task to renew the locking + :param database_table: database collection where to maintain the lock + :param database_object: database object. '_id' and 'locked_at' are mandatory keys + :param thread_object: Thread object that has locked to check if it is alive + :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at' + """ + lock_object = { + "table": database_table, + "_id": database_object["_id"], + "initial_lock_time": database_object["locked_at"], + "locked_at": database_object["locked_at"], + "thread": thread_object, + "unlocked": False # True when it is not needed any more + } + LockRenew.renew_list.append(lock_object) + return lock_object + + @staticmethod + def remove_lock_object(lock_object): + lock_object["unlocked"] = True + + async def renew_locks(self): + while not self.to_terminate: + if not self.renew_list: + await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop) + continue + lock_object = self.renew_list[0] + if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive(): + # task has been finished or locker thread is dead, not needed to re-locked. + self.renew_list.pop(0) + continue + + locked_at = lock_object["locked_at"] + now = time() + time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now + if time_to_relock < 1: + if lock_object["initial_lock_time"] + self.task_max_locked_time < now: + self.renew_list.pop(0) + # re-lock + new_locked_at = locked_at + self.task_locked_time + try: + if self.db.set_one(lock_object["table"], + update_dict={"locked_at": new_locked_at, "modified_at": now}, + q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, + fail_on_empty=False): + self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"])) + lock_object["locked_at"] = new_locked_at + self.renew_list.append(lock_object) + else: + self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"], + lock_object["_id"])) + except Exception as e: + self.logger.error("Exception when trying to renew lock for {}.{}: {}".format( + lock_object["table"], lock_object["_id"], e)) + else: + # wait until it is time to re-lock it + await asyncio.sleep(time_to_relock, loop=self.loop) + + def stop(self): + # unlock all locked items + now = time() + for lock_object in self.renew_list: + locked_at = lock_object["locked_at"] + if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now: + self.db.set_one(lock_object["table"], update_dict={"locked_at": 0}, + q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, + fail_on_empty=False) + + class VimAdminThread(threading.Thread): - MAX_TIME_LOCKED = 3600 # 1h MAX_TIME_UNATTENDED = 600 # 10min TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h kafka_topics = ("vim_account", "wim_account", "sdn") @@ -64,6 +158,10 @@ class VimAdminThread(threading.Thread): self.logger = logging.getLogger("ro.vimadmin") self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody + self.aiomain_task_renew_lock = None + # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order + self.lock_renew = LockRenew(config, self.logger) + self.task_locked_time = config["global"]["task_locked_time"] async def vim_watcher(self): """ Reads database periodically looking for tasks not processed by nobody because of a reboot @@ -85,7 +183,7 @@ class VimAdminThread(threading.Thread): ro_tasks = self.db.get_list("ro_tasks", q_filter={"target_id.ncont": self.engine.get_assigned_vims(), "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at.lt": now - self.MAX_TIME_LOCKED, + "locked_at.lt": now - self.task_locked_time, "to_check_at.gt": self.last_rotask_time, "to_check_at.lte": now - self.MAX_TIME_UNATTENDED}) self.last_rotask_time = now - self.MAX_TIME_UNATTENDED @@ -101,7 +199,7 @@ class VimAdminThread(threading.Thread): if self.db.get_list("ro_tasks", q_filter={"target_id": ro_task["target_id"], "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at.gt": now - self.MAX_TIME_LOCKED}): + "locked_at.gt": now - self.task_locked_time}): continue # unattended, assign vim self.engine.assign_vim(ro_task["target_id"]) @@ -130,8 +228,12 @@ class VimAdminThread(threading.Thread): self.aiomain_task_vim = asyncio.ensure_future( self.vim_watcher(), loop=self.loop) - done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim], - timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED) + if not self.aiomain_task_renew_lock: + self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop) + + done, _ = await asyncio.wait( + [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock], + timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED) try: if self.aiomain_task_kafka in done: exc = self.aiomain_task_kafka.exception() @@ -141,6 +243,10 @@ class VimAdminThread(threading.Thread): exc = self.aiomain_task_vim.exception() self.logger.error("vim_account watcher task exception: {}".format(exc)) self.aiomain_task_vim = None + if self.aiomain_task_renew_lock in done: + exc = self.aiomain_task_renew_lock.exception() + self.logger.error("renew_locks task exception: {}".format(exc)) + self.aiomain_task_renew_lock = None except asyncio.CancelledError: pass @@ -170,6 +276,8 @@ class VimAdminThread(threading.Thread): else: raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format( self.config["database"]["driver"])) + self.lock_renew.start(self.db, self.loop) + if not self.msg: config_msg = self.config["message"].copy() config_msg["loop"] = self.loop @@ -223,7 +331,7 @@ class VimAdminThread(threading.Thread): self.engine.check_vim(target_id) self.logger.debug("ordered to check {}".format(target_id)) - except (NsException, DbException, MsgException) as e: + except (DbException, MsgException) as e: self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) except Exception as e: self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e), @@ -249,7 +357,11 @@ class VimAdminThread(threading.Thread): :return: None """ self.to_terminate = True + self.lock_renew.to_terminate = True if self.aiomain_task_kafka: self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel) if self.aiomain_task_vim: self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel) + if self.aiomain_task_renew_lock: + self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel) + self.lock_renew.stop()