X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=f967f83248cd3edfd76294c7d19248d450781122;hp=c7c5464bf6cab2bec65b4325e277fb343811f88e;hb=f1b640f418cb13cfb19c080236254445a174e5ab;hpb=8615352fd9db8a157462ad848e37260d4f3468d2 diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index c7c5464b..f967f832 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -32,11 +32,10 @@ import yaml from pkg_resources import iter_entry_points # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version from osm_common.dbbase import DbException -# from osm_common.fsbase import FsException -# from osm_common.msgbase import MsgException from osm_ro_plugin.vim_dummy import VimDummyConnector from osm_ro_plugin.sdn_dummy import SdnDummyConnector from osm_ro_plugin import vimconn, sdnconn +from osm_ng_ro.vim_admin import LockRenew from copy import deepcopy from unittest.mock import Mock from http import HTTPStatus @@ -779,10 +778,8 @@ class NsWorker(threading.Thread): REFRESH_ERROR = 600 REFRESH_IMAGE = 3600 * 10 REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 2000 + QUEUE_SIZE = 100 terminate = False - MAX_TIME_LOCKED = 3600 - MAX_TIME_VIM_LOCKED = 120 def __init__(self, worker_index, config, plugins, db): """ @@ -814,6 +811,7 @@ class NsWorker(threading.Thread): self.time_last_task_processed = None self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db self.idle = True # it is idle when there are not vim_targets associated + self.task_locked_time = config["global"]["task_locked_time"] def insert_task(self, task): try: @@ -915,7 +913,7 @@ class NsWorker(threading.Thread): if operation["operationState"] != "PROCESSING": continue locked_at = operation.get("locked_at") - if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED: + if locked_at is not None and locked_at >= now - self.task_locked_time: # some other thread is doing this operation return # lock @@ -1053,7 +1051,7 @@ class NsWorker(threading.Thread): "ro_tasks", q_filter={"target_id": self.vim_targets, "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], - "locked_at.lt": now - self.MAX_TIME_LOCKED, + "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed}, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False) @@ -1209,13 +1207,14 @@ class NsWorker(threading.Thread): ro_task["vim_info"]["refresh_at"] = next_refresh try: - # 0 get task_status_create + # 0: get task_status_create + lock_object = None task_status_create = None task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] in ("BUILD", "DONE")), None) if task_create: task_status_create = task_create["status"] - # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD + # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD for task_action in ("DELETE", "CREATE", "EXEC"): db_vim_update = None new_status = None @@ -1256,7 +1255,10 @@ class NsWorker(threading.Thread): if dependency_not_completed: # TODO set at vim_info.vim_details that it is waiting continue - + # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew + # the task of renew this locking. It will update database locket_at periodically + if not lock_object: + lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self) if task["action"] == "DELETE": new_status, db_vim_info_update = self._delete_task(ro_task, task_index, task_depends, db_ro_task_update) @@ -1324,12 +1326,19 @@ class NsWorker(threading.Thread): self.logger.error("Unexpected exception at _update_target task={}: {}". format(task["task_id"], e), exc_info=True) - q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]} + locked_at = ro_task["locked_at"] + if lock_object: + locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time] + # locked_at contains two times to avoid race condition. In case the lock has been renew, it will + # contain exactly locked_at + self.task_locked_time + LockRenew.remove_lock_object(lock_object) + q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at} # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, # outside this task (by ro_nbi) do not update it db_ro_task_update["locked_by"] = None # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked - db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) + db_ro_task_update["locked_at"] = int(now - self.task_locked_time) + db_ro_task_update["modified_at"] = now db_ro_task_update["to_check_at"] = next_check_at if not self.db.set_one("ro_tasks", update_dict=db_ro_task_update,