from pkg_resources import iter_entry_points
# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
from osm_ro_plugin.vim_dummy import VimDummyConnector
from osm_ro_plugin.sdn_dummy import SdnDummyConnector
from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
from copy import deepcopy
from unittest.mock import Mock
from http import HTTPStatus
REFRESH_ERROR = 600
REFRESH_IMAGE = 3600 * 10
REFRESH_DELETE = 3600 * 10
- QUEUE_SIZE = 2000
+ QUEUE_SIZE = 100
terminate = False
- MAX_TIME_LOCKED = 3600
- MAX_TIME_VIM_LOCKED = 120
def __init__(self, worker_index, config, plugins, db):
"""
self.time_last_task_processed = None
self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
self.idle = True # it is idle when there are not vim_targets associated
+ self.task_locked_time = config["global"]["task_locked_time"]
def insert_task(self, task):
try:
if operation["operationState"] != "PROCESSING":
continue
locked_at = operation.get("locked_at")
- if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+ if locked_at is not None and locked_at >= now - self.task_locked_time:
# some other thread is doing this operation
return
# lock
"ro_tasks",
q_filter={"target_id": self.vim_targets,
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "locked_at.lt": now - self.task_locked_time,
"to_check_at.lt": self.time_last_task_processed},
update_dict={"locked_by": self.my_id, "locked_at": now},
fail_on_empty=False)
ro_task["vim_info"]["refresh_at"] = next_refresh
try:
- # 0 get task_status_create
+ # 0: get task_status_create
+ lock_object = None
task_status_create = None
task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
t["status"] in ("BUILD", "DONE")), None)
if task_create:
task_status_create = task_create["status"]
- # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
+ # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
for task_action in ("DELETE", "CREATE", "EXEC"):
db_vim_update = None
new_status = None
if dependency_not_completed:
# TODO set at vim_info.vim_details that it is waiting
continue
-
+ # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+ # the task of renew this locking. It will update database locket_at periodically
+ if not lock_object:
+ lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
if task["action"] == "DELETE":
new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
task_depends, db_ro_task_update)
self.logger.error("Unexpected exception at _update_target task={}: {}".
format(task["task_id"], e), exc_info=True)
- q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+ locked_at = ro_task["locked_at"]
+ if lock_object:
+ locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+ # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+ # contain exactly locked_at + self.task_locked_time
+ LockRenew.remove_lock_object(lock_object)
+ q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
# modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
# outside this task (by ro_nbi) do not update it
db_ro_task_update["locked_by"] = None
# locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
- db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+ db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+ db_ro_task_update["modified_at"] = now
db_ro_task_update["to_check_at"] = next_check_at
if not self.db.set_one("ro_tasks",
update_dict=db_ro_task_update,