from pkg_resources import iter_entry_points
# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
from osm_ro_plugin.vim_dummy import VimDummyConnector
from osm_ro_plugin.sdn_dummy import SdnDummyConnector
from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
from copy import deepcopy
from unittest.mock import Mock
from http import HTTPStatus
REFRESH_ERROR = 600
REFRESH_IMAGE = 3600 * 10
REFRESH_DELETE = 3600 * 10
- QUEUE_SIZE = 2000
- # TODO delete assigment_lock = Lock()
+ QUEUE_SIZE = 100
terminate = False
- # TODO delete assignment = {}
- MAX_TIME_LOCKED = 3600
- MAX_TIME_VIM_LOCKED = 120
def __init__(self, worker_index, config, plugins, db):
"""
}
self.time_last_task_processed = None
self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+ self.idle = True # it is idle when there are not vim_targets associated
+ self.task_locked_time = config["global"]["task_locked_time"]
def insert_task(self, task):
try:
:return: None.
"""
try:
- target, _, _id = target_id.partition(":")
self.db_vims.pop(target_id, None)
self.my_vims.pop(target_id, None)
- self.vim_targets.remove(target_id)
+ if target_id in self.vim_targets:
+ self.vim_targets.remove(target_id)
+ self.logger.info("Unloaded {}".format(target_id))
rmtree("{}:{}".format(target_id, self.worker_index))
except FileNotFoundError:
pass # this is raised by rmtree if folder does not exist
unset_dict = {}
op_text = ""
step = ""
- loaded = target_id in self.my_vims
+ loaded = target_id in self.vim_targets
target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
try:
step = "Getting {} from db".format(target_id)
if operation["operationState"] != "PROCESSING":
continue
locked_at = operation.get("locked_at")
- if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+ if locked_at is not None and locked_at >= now - self.task_locked_time:
# some other thread is doing this operation
return
# lock
"ro_tasks",
q_filter={"target_id": self.vim_targets,
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "locked_at.lt": now - self.task_locked_time,
"to_check_at.lt": self.time_last_task_processed},
update_dict={"locked_by": self.my_id, "locked_at": now},
fail_on_empty=False)
ro_task["vim_info"]["refresh_at"] = next_refresh
try:
- # 0 get task_status_create
+ # 0: get task_status_create
+ lock_object = None
task_status_create = None
task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
t["status"] in ("BUILD", "DONE")), None)
if task_create:
task_status_create = task_create["status"]
- # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
+ # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
for task_action in ("DELETE", "CREATE", "EXEC"):
db_vim_update = None
new_status = None
if dependency_not_completed:
# TODO set at vim_info.vim_details that it is waiting
continue
-
+ # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+ # the task of renew this locking. It will update database locket_at periodically
+ if not lock_object:
+ lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
if task["action"] == "DELETE":
new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
task_depends, db_ro_task_update)
self.logger.error("Unexpected exception at _update_target task={}: {}".
format(task["task_id"], e), exc_info=True)
- q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+ locked_at = ro_task["locked_at"]
+ if lock_object:
+ locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+ # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+ # contain exactly locked_at + self.task_locked_time
+ LockRenew.remove_lock_object(lock_object)
+ q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
# modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
# outside this task (by ro_nbi) do not update it
db_ro_task_update["locked_by"] = None
# locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
- db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+ db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+ db_ro_task_update["modified_at"] = now
db_ro_task_update["to_check_at"] = next_check_at
if not self.db.set_one("ro_tasks",
update_dict=db_ro_task_update,
def run(self):
# load database
- self.logger.debug("Starting")
+ self.logger.info("Starting")
while True:
# step 1: get commands from queue
try:
- task = self.task_queue.get(block=False if self.my_vims else True)
+ if self.vim_targets:
+ task = self.task_queue.get(block=False)
+ else:
+ if not self.idle:
+ self.logger.debug("enters in idle state")
+ self.idle = True
+ task = self.task_queue.get(block=True)
+ self.idle = False
+
if task[0] == "terminate":
break
elif task[0] == "load_vim":
+ self.logger.info("order to load vim {}".format(task[1]))
self._load_vim(task[1])
elif task[0] == "unload_vim":
+ self.logger.info("order to unload vim {}".format(task[1]))
self._unload_vim(task[1])
elif task[0] == "reload_vim":
self._reload_vim(task[1])
elif task[0] == "check_vim":
+ self.logger.info("order to check vim {}".format(task[1]))
self._check_vim(task[1])
continue
except Exception as e:
except Exception as e:
self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
- self.logger.debug("Finishing")
+ self.logger.info("Finishing")