fix 1386: enhance on lock procedure
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
index 013cae9..f967f83 100644 (file)
@@ -32,11 +32,10 @@ import yaml
 from pkg_resources import iter_entry_points
 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
 from osm_common.dbbase import DbException
-# from osm_common.fsbase import FsException
-# from osm_common.msgbase import MsgException
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin import vimconn, sdnconn
+from osm_ng_ro.vim_admin import LockRenew
 from copy import deepcopy
 from unittest.mock import Mock
 from http import HTTPStatus
@@ -779,12 +778,8 @@ class NsWorker(threading.Thread):
     REFRESH_ERROR = 600
     REFRESH_IMAGE = 3600 * 10
     REFRESH_DELETE = 3600 * 10
-    QUEUE_SIZE = 2000
-    # TODO delete assigment_lock = Lock()
+    QUEUE_SIZE = 100
     terminate = False
-    # TODO delete assignment = {}
-    MAX_TIME_LOCKED = 3600
-    MAX_TIME_VIM_LOCKED = 120
 
     def __init__(self, worker_index, config, plugins, db):
         """
@@ -815,6 +810,8 @@ class NsWorker(threading.Thread):
         }
         self.time_last_task_processed = None
         self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+        self.idle = True  # it is idle when there are not vim_targets associated
+        self.task_locked_time = config["global"]["task_locked_time"]
 
     def insert_task(self, task):
         try:
@@ -884,10 +881,11 @@ class NsWorker(threading.Thread):
         :return: None.
         """
         try:
-            target, _, _id = target_id.partition(":")
             self.db_vims.pop(target_id, None)
             self.my_vims.pop(target_id, None)
-            self.vim_targets.remove(target_id)
+            if target_id in self.vim_targets:
+                self.vim_targets.remove(target_id)
+            self.logger.info("Unloaded {}".format(target_id))
             rmtree("{}:{}".format(target_id, self.worker_index))
         except FileNotFoundError:
             pass  # this is raised by rmtree if folder does not exist
@@ -906,7 +904,7 @@ class NsWorker(threading.Thread):
         unset_dict = {}
         op_text = ""
         step = ""
-        loaded = target_id in self.my_vims
+        loaded = target_id in self.vim_targets
         target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
         try:
             step = "Getting {} from db".format(target_id)
@@ -915,7 +913,7 @@ class NsWorker(threading.Thread):
                 if operation["operationState"] != "PROCESSING":
                     continue
                 locked_at = operation.get("locked_at")
-                if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+                if locked_at is not None and locked_at >= now - self.task_locked_time:
                     # some other thread is doing this operation
                     return
                 # lock
@@ -1053,7 +1051,7 @@ class NsWorker(threading.Thread):
                     "ro_tasks",
                     q_filter={"target_id": self.vim_targets,
                               "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
-                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "locked_at.lt": now - self.task_locked_time,
                               "to_check_at.lt": self.time_last_task_processed},
                     update_dict={"locked_by": self.my_id, "locked_at": now},
                     fail_on_empty=False)
@@ -1209,19 +1207,21 @@ class NsWorker(threading.Thread):
             ro_task["vim_info"]["refresh_at"] = next_refresh
 
         try:
-            # 0 get task_status_create
+            # 0: get task_status_create
+            lock_object = None
             task_status_create = None
             task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
                                 t["status"] in ("BUILD", "DONE")), None)
             if task_create:
                 task_status_create = task_create["status"]
-            # 1. look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
+            # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
             for task_action in ("DELETE", "CREATE", "EXEC"):
                 db_vim_update = None
                 new_status = None
                 for task_index, task in enumerate(ro_task["tasks"]):
                     if not task:
                         continue  # task deleted
+                    task_depends = {}
                     target_update = None
                     if (task_action in ("DELETE", "EXEC") and task["status"] not in ("SCHEDULED", "BUILD")) or \
                             task["action"] != task_action or \
@@ -1231,7 +1231,6 @@ class NsWorker(threading.Thread):
                     try:
                         db_vim_info_update = None
                         if task["status"] == "SCHEDULED":
-                            task_depends = {}
                             # check if tasks that this depends on have been completed
                             dependency_not_completed = False
                             for dependency_task_id in (task.get("depends_on") or ()):
@@ -1256,7 +1255,10 @@ class NsWorker(threading.Thread):
                             if dependency_not_completed:
                                 # TODO set at vim_info.vim_details that it is waiting
                                 continue
-
+                        # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
+                        # the task of renew this locking. It will update database locket_at periodically
+                        if not lock_object:
+                            lock_object = LockRenew.add_lock_object("ro_tasks", ro_task, self)
                         if task["action"] == "DELETE":
                             new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
                                                                                task_depends, db_ro_task_update)
@@ -1324,12 +1326,19 @@ class NsWorker(threading.Thread):
                             self.logger.error("Unexpected exception at _update_target task={}: {}".
                                               format(task["task_id"], e), exc_info=True)
 
-            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+            locked_at = ro_task["locked_at"]
+            if lock_object:
+                locked_at = [lock_object["locked_at"], lock_object["locked_at"] + self.task_locked_time]
+                # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
+                # contain exactly locked_at + self.task_locked_time
+                LockRenew.remove_lock_object(lock_object)
+            q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": locked_at}
             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
             # outside this task (by ro_nbi) do not update it
             db_ro_task_update["locked_by"] = None
             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
-            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
+            db_ro_task_update["modified_at"] = now
             db_ro_task_update["to_check_at"] = next_check_at
             if not self.db.set_one("ro_tasks",
                                    update_dict=db_ro_task_update,
@@ -1451,20 +1460,31 @@ class NsWorker(threading.Thread):
 
     def run(self):
         # load database
-        self.logger.debug("Starting")
+        self.logger.info("Starting")
         while True:
             # step 1: get commands from queue
             try:
-                task = self.task_queue.get(block=False if self.my_vims else True)
+                if self.vim_targets:
+                    task = self.task_queue.get(block=False)
+                else:
+                    if not self.idle:
+                        self.logger.debug("enters in idle state")
+                    self.idle = True
+                    task = self.task_queue.get(block=True)
+                    self.idle = False
+
                 if task[0] == "terminate":
                     break
                 elif task[0] == "load_vim":
+                    self.logger.info("order to load vim {}".format(task[1]))
                     self._load_vim(task[1])
                 elif task[0] == "unload_vim":
+                    self.logger.info("order to unload vim {}".format(task[1]))
                     self._unload_vim(task[1])
                 elif task[0] == "reload_vim":
                     self._reload_vim(task[1])
                 elif task[0] == "check_vim":
+                    self.logger.info("order to check vim {}".format(task[1]))
                     self._check_vim(task[1])
                 continue
             except Exception as e:
@@ -1487,4 +1507,4 @@ class NsWorker(threading.Thread):
             except Exception as e:
                 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
 
-        self.logger.debug("Finishing")
+        self.logger.info("Finishing")