from pkg_resources import iter_entry_points
# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
from osm_ro_plugin.vim_dummy import VimDummyConnector
from osm_ro_plugin.sdn_dummy import SdnDummyConnector
from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
from copy import deepcopy
from unittest.mock import Mock
from http import HTTPStatus
REFRESH_ERROR = 600
REFRESH_IMAGE = 3600 * 10
REFRESH_DELETE = 3600 * 10
- QUEUE_SIZE = 2000
+ QUEUE_SIZE = 100
terminate = False
- MAX_TIME_LOCKED = 3600
- MAX_TIME_VIM_LOCKED = 120
def __init__(self, worker_index, config, plugins, db):
"""
self.time_last_task_processed = None
self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
self.idle = True # it is idle when there are not vim_targets associated
+ self.task_locked_time = config["global"]["task_locked_time"]
def insert_task(self, task):
try:
if operation["operationState"] != "PROCESSING":
continue
locked_at = operation.get("locked_at")
- if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+ if locked_at is not None and locked_at >= now - self.task_locked_time:
# some other thread is doing this operation
return
# lock
"ro_tasks",
q_filter={"target_id": self.vim_targets,
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "locked_at.lt": now - self.task_locked_time,
"to_check_at.lt": self.time_last_task_processed},
update_dict={"locked_by": self.my_id, "locked_at": now},
fail_on_empty=False)
ro_task["vim_info"]["refresh_at"] = next_refresh
try:
- # 0 get task_status_create
+ # 0: get task_status_create
+ lock_object = None
task_status_create = None
task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
t["status"] in ("BUILD", "DONE")), None)
if task_create:
task_status_create = task_create["status"]
- # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
+ # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
for task_action in ("DELETE", "CREATE", "EXEC"):
db_vim_update = None
new_status = None
if dependency_not_completed:
# TODO set at vim_info.vim_details that it is waiting
continue
-
+ # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+ # the task of renew this locking. It will update database locket_at periodically
+ if not lock_object:
+ lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
if task["action"] == "DELETE":
new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
task_depends, db_ro_task_update)
self.logger.error("Unexpected exception at _update_target task={}: {}".
format(task["task_id"], e), exc_info=True)
- q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+ locked_at = ro_task["locked_at"]
+ if lock_object:
+ locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+ # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+ # contain exactly locked_at + self.task_locked_time
+ LockRenew.remove_lock_object(lock_object)
+ q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
# modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
# outside this task (by ro_nbi) do not update it
db_ro_task_update["locked_by"] = None
# locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
- db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+ db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+ db_ro_task_update["modified_at"] = now
db_ro_task_update["to_check_at"] = next_check_at
if not self.db.set_one("ro_tasks",
update_dict=db_ro_task_update,
from osm_common import dbmongo, dbmemory, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
-from osm_ng_ro.ns import NsException
from time import time
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
Exception.__init__(self, message)
+class LockRenew:
+
+ renew_list = []
+ # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
+ # be renewed. The time order is achieved as it is appended at the end
+
+ def __init__(self, config, logger):
+ """
+ Constructor of class
+ :param config: configuration parameters of database and messaging
+ """
+ self.config = config
+ self.logger = logger
+ self.to_terminate = False
+ self.loop = None
+ self.db = None
+ self.task_locked_time = config["global"]["task_locked_time"]
+ self.task_relock_time = config["global"]["task_relock_time"]
+ self.task_max_locked_time = config["global"]["task_max_locked_time"]
+
+ def start(self, db, loop):
+ self.db = db
+ self.loop = loop
+
+ @staticmethod
+ def add_lock_object(database_table, database_object, thread_object):
+ """
+ Insert a task to renew the locking
+ :param database_table: database collection where to maintain the lock
+ :param database_object: database object. '_id' and 'locked_at' are mandatory keys
+ :param thread_object: Thread object that has locked to check if it is alive
+ :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
+ """
+ lock_object = {
+ "table": database_table,
+ "_id": database_object["_id"],
+ "initial_lock_time": database_object["locked_at"],
+ "locked_at": database_object["locked_at"],
+ "thread": thread_object,
+ "unlocked": False # True when it is not needed any more
+ }
+ LockRenew.renew_list.append(lock_object)
+ return lock_object
+
+ @staticmethod
+ def remove_lock_object(lock_object):
+ lock_object["unlocked"] = True
+
+ async def renew_locks(self):
+ while not self.to_terminate:
+ if not self.renew_list:
+ await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
+ continue
+ lock_object = self.renew_list[0]
+ if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
+ # task has been finished or locker thread is dead, not needed to re-locked.
+ self.renew_list.pop(0)
+ continue
+
+ locked_at = lock_object["locked_at"]
+ now = time()
+ time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
+ if time_to_relock < 1:
+ if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
+ self.renew_list.pop(0)
+ # re-lock
+ new_locked_at = locked_at + self.task_locked_time
+ try:
+ if self.db.set_one(lock_object["table"],
+ update_dict={"locked_at": new_locked_at, "modified_at": now},
+ q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+ fail_on_empty=False):
+ self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
+ lock_object["locked_at"] = new_locked_at
+ self.renew_list.append(lock_object)
+ else:
+ self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
+ lock_object["_id"]))
+ except Exception as e:
+ self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
+ lock_object["table"], lock_object["_id"], e))
+ else:
+ # wait until it is time to re-lock it
+ await asyncio.sleep(time_to_relock, loop=self.loop)
+
+ def stop(self):
+ # unlock all locked items
+ now = time()
+ for lock_object in self.renew_list:
+ locked_at = lock_object["locked_at"]
+ if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
+ self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
+ q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+ fail_on_empty=False)
+
+
class VimAdminThread(threading.Thread):
- MAX_TIME_LOCKED = 3600 # 1h
MAX_TIME_UNATTENDED = 600 # 10min
TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
kafka_topics = ("vim_account", "wim_account", "sdn")
self.logger = logging.getLogger("ro.vimadmin")
self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
+ self.aiomain_task_renew_lock = None
+ # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
+ self.lock_renew = LockRenew(config, self.logger)
+ self.task_locked_time = config["global"]["task_locked_time"]
async def vim_watcher(self):
""" Reads database periodically looking for tasks not processed by nobody because of a reboot
ro_tasks = self.db.get_list("ro_tasks",
q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "locked_at.lt": now - self.task_locked_time,
"to_check_at.gt": self.last_rotask_time,
"to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
if self.db.get_list("ro_tasks",
q_filter={"target_id": ro_task["target_id"],
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+ "locked_at.gt": now - self.task_locked_time}):
continue
# unattended, assign vim
self.engine.assign_vim(ro_task["target_id"])
self.aiomain_task_vim = asyncio.ensure_future(
self.vim_watcher(),
loop=self.loop)
- done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
- timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ if not self.aiomain_task_renew_lock:
+ self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
+
+ done, _ = await asyncio.wait(
+ [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
+ timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
try:
if self.aiomain_task_kafka in done:
exc = self.aiomain_task_kafka.exception()
exc = self.aiomain_task_vim.exception()
self.logger.error("vim_account watcher task exception: {}".format(exc))
self.aiomain_task_vim = None
+ if self.aiomain_task_renew_lock in done:
+ exc = self.aiomain_task_renew_lock.exception()
+ self.logger.error("renew_locks task exception: {}".format(exc))
+ self.aiomain_task_renew_lock = None
except asyncio.CancelledError:
pass
else:
raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
self.config["database"]["driver"]))
+ self.lock_renew.start(self.db, self.loop)
+
if not self.msg:
config_msg = self.config["message"].copy()
config_msg["loop"] = self.loop
self.engine.check_vim(target_id)
self.logger.debug("ordered to check {}".format(target_id))
- except (NsException, DbException, MsgException) as e:
+ except (DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e:
self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
:return: None
"""
self.to_terminate = True
+ self.lock_renew.to_terminate = True
if self.aiomain_task_kafka:
self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
if self.aiomain_task_vim:
self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+ if self.aiomain_task_renew_lock:
+ self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+ self.lock_renew.stop()