fix 1386: enhance on lock procedure
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
index c7c5464..f967f83 100644 (file)
@@ -32,11 +32,10 @@ import yaml
 from pkg_resources import iter_entry_points
 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
 from pkg_resources import iter_entry_points
 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin import vimconn, sdnconn
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
 from copy import deepcopy
 from unittest.mock import Mock
 from http import HTTPStatus
 from copy import deepcopy
 from unittest.mock import Mock
 from http import HTTPStatus
@@ -779,10 +778,8 @@ class NsWorker(threading.Thread):
     REFRESH_ERROR = 600
     REFRESH_IMAGE = 3600 * 10
     REFRESH_DELETE = 3600 * 10
     REFRESH_ERROR = 600
     REFRESH_IMAGE = 3600 * 10
     REFRESH_DELETE = 3600 * 10
-    QUEUE_SIZE = 2000
+    QUEUE_SIZE = 100
     terminate = False
     terminate = False
-    MAX_TIME_LOCKED = 3600
-    MAX_TIME_VIM_LOCKED = 120
 
     def __init__(self, worker_index, config, plugins, db):
         """
 
     def __init__(self, worker_index, config, plugins, db):
         """
@@ -814,6 +811,7 @@ class NsWorker(threading.Thread):
         self.time_last_task_processed = None
         self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
         self.idle = True  # it is idle when there are not vim_targets associated
         self.time_last_task_processed = None
         self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
         self.idle = True  # it is idle when there are not vim_targets associated
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     def insert_task(self, task):
         try:
 
     def insert_task(self, task):
         try:
@@ -915,7 +913,7 @@ class NsWorker(threading.Thread):
                 if operation["operationState"] != "PROCESSING":
                     continue
                 locked_at = operation.get("locked_at")
                 if operation["operationState"] != "PROCESSING":
                     continue
                 locked_at = operation.get("locked_at")
-                if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+                if locked_at is not None and locked_at >= now - self.task_locked_time:
                     # some other thread is doing this operation
                     return
                 # lock
                     # some other thread is doing this operation
                     return
                 # lock
@@ -1053,7 +1051,7 @@ class NsWorker(threading.Thread):
                     "ro_tasks",
                     q_filter={"target_id": self.vim_targets,
                               "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
                     "ro_tasks",
                     q_filter={"target_id": self.vim_targets,
                               "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "locked_at.lt": now - self.task_locked_time,
                               "to_check_at.lt": self.time_last_task_processed},
                     update_dict={"locked_by": self.my_id, "locked_at": now},
                     fail_on_empty=False)
                               "to_check_at.lt": self.time_last_task_processed},
                     update_dict={"locked_by": self.my_id, "locked_at": now},
                     fail_on_empty=False)
@@ -1209,13 +1207,14 @@ class NsWorker(threading.Thread):
             ro_task["vim_info"]["refresh_at"] = next_refresh
 
         try:
             ro_task["vim_info"]["refresh_at"] = next_refresh
 
         try:
-            # 0 get task_status_create
+            # 0: get task_status_create
+            lock_object = None
             task_status_create = None
             task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
                                 t["status"] in ("BUILD", "DONE")), None)
             if task_create:
                 task_status_create = task_create["status"]
             task_status_create = None
             task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
                                 t["status"] in ("BUILD", "DONE")), None)
             if task_create:
                 task_status_create = task_create["status"]
-            # 1. look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
+            # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
             for task_action in ("DELETE", "CREATE", "EXEC"):
                 db_vim_update = None
                 new_status = None
             for task_action in ("DELETE", "CREATE", "EXEC"):
                 db_vim_update = None
                 new_status = None
@@ -1256,7 +1255,10 @@ class NsWorker(threading.Thread):
                             if dependency_not_completed:
                                 # TODO set at vim_info.vim_details that it is waiting
                                 continue
                             if dependency_not_completed:
                                 # TODO set at vim_info.vim_details that it is waiting
                                 continue
-
+                        # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+                        # the task of renew this locking. It will update database locket_at periodically
+                        if not lock_object:
+                            lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
                         if task["action"] == "DELETE":
                             new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
                                                                                task_depends, db_ro_task_update)
                         if task["action"] == "DELETE":
                             new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
                                                                                task_depends, db_ro_task_update)
@@ -1324,12 +1326,19 @@ class NsWorker(threading.Thread):
                             self.logger.error("Unexpected exception at _update_target task={}: {}".
                                               format(task["task_id"], e), exc_info=True)
 
                             self.logger.error("Unexpected exception at _update_target task={}: {}".
                                               format(task["task_id"], e), exc_info=True)
 
-            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+            locked_at = ro_task["locked_at"]
+            if lock_object:
+                locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+                # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+                # contain exactly locked_at + self.task_locked_time
+                LockRenew.remove_lock_object(lock_object)
+            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
             # outside this task (by ro_nbi) do not update it
             db_ro_task_update["locked_by"] = None
             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
             # outside this task (by ro_nbi) do not update it
             db_ro_task_update["locked_by"] = None
             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
-            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+            db_ro_task_update["modified_at"] = now
             db_ro_task_update["to_check_at"] = next_check_at
             if not self.db.set_one("ro_tasks",
                                    update_dict=db_ro_task_update,
             db_ro_task_update["to_check_at"] = next_check_at
             if not self.db.set_one("ro_tasks",
                                    update_dict=db_ro_task_update,