fix 1385: enhance unload vim from ns_thread when not needed 80/10080/6
authortierno <alfonso.tiernosepulveda@telefonica.com>
Sun, 6 Dec 2020 18:27:16 +0000 (18:27 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 18 Dec 2020 14:13:43 +0000 (14:13 +0000)
control of threads in in idle status (without vims to process)
and reuse it when needed

Change-Id: Ib7c5023eec18229fb86a1632b63aca5aef8d2a14
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
NG-RO/osm_ng_ro/ns.py
NG-RO/osm_ng_ro/ns_thread.py
NG-RO/osm_ng_ro/vim_admin.py

index aa1040d..f621051 100644 (file)
@@ -87,8 +87,7 @@ class Ns(object):
         # If done now it will not be linked to parent not getting its handler and level
         self.map_topic = {}
         self.write_lock = None
-        self.assignment = {}
-        self.assignment_list = []
+        self.vims_assigned = {}
         self.next_worker = 0
         self.plugins = {}
         self.workers = []
@@ -150,6 +149,9 @@ class Ns(object):
             self.write_lock = Lock()
         except (DbException, FsException, MsgException) as e:
             raise NsException(str(e), http_code=e.http_code)
+    
+    def get_assigned_vims(self):
+        return list(self.vims_assigned.keys())
 
     def stop(self):
         try:
@@ -165,12 +167,17 @@ class Ns(object):
         for worker in self.workers:
             worker.insert_task(("terminate",))
 
-    def _create_worker(self, target_id, load=True):
-        # Look for a thread not alive
-        worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
-        if worker_id:
-            # re-start worker
-            self.workers[worker_id].start()
+    def _create_worker(self):
+        """
+        Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
+        limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
+        return the index of the assigned worker thread. Worker threads are storead at self.workers
+        """
+        # Look for a thread in idle status
+        worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None)
+        if worker_id is not None:
+            # unset idle status to avoid race conditions
+            self.workers[worker_id].idle = False
         else:
             worker_id = len(self.workers)
             if worker_id < self.config["global"]["server.ns_threads"]:
@@ -181,44 +188,60 @@ class Ns(object):
                 # reached maximum number of threads, assign VIM to an existing one
                 worker_id = self.next_worker
                 self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
-        if load:
-            self.workers[worker_id].insert_task(("load_vim", target_id))
         return worker_id
 
     def assign_vim(self, target_id):
-        if target_id not in self.assignment:
-            self.assignment[target_id] = self._create_worker(target_id)
-            self.assignment_list.append(target_id)
+        with self.write_lock:
+            return self._assign_vim(target_id)
+
+    def _assign_vim(self, target_id):
+        if target_id not in self.vims_assigned:
+            worker_id = self.vims_assigned[target_id] = self._create_worker()
+            self.workers[worker_id].insert_task(("load_vim", target_id))
 
     def reload_vim(self, target_id):
         # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
         # this is because database VIM information is cached for threads working with SDN
-        # if target_id in self.assignment:
-        #     worker_id = self.assignment[target_id]
-        #     self.workers[worker_id].insert_task(("reload_vim", target_id))
-        for worker in self.workers:
-            if worker.is_alive():
-                worker.insert_task(("reload_vim", target_id))
+        with self.write_lock:
+            for worker in self.workers:
+                if worker and not worker.idle:
+                    worker.insert_task(("reload_vim", target_id))
 
     def unload_vim(self, target_id):
-        if target_id in self.assignment:
-            worker_id = self.assignment[target_id]
+        with self.write_lock:
+            return self._unload_vim(target_id)
+
+    def _unload_vim(self, target_id):
+        if target_id in self.vims_assigned:
+            worker_id = self.vims_assigned[target_id]
             self.workers[worker_id].insert_task(("unload_vim", target_id))
-            del self.assignment[target_id]
-            self.assignment_list.remove(target_id)
+            del self.vims_assigned[target_id]
 
     def check_vim(self, target_id):
-        if target_id in self.assignment:
-            worker_id = self.assignment[target_id]
-        else:
-            worker_id = self._create_worker(target_id, load=False)
+        with self.write_lock:
+            if target_id in self.vims_assigned:
+                worker_id = self.vims_assigned[target_id]
+            else:
+                worker_id = self._create_worker()
 
         worker = self.workers[worker_id]
         worker.insert_task(("check_vim", target_id))
 
+    def unload_unused_vims(self):
+        with self.write_lock:
+            vims_to_unload = []
+            for target_id in self.vims_assigned:
+                if not self.db.get_one("ro_tasks",
+                                       q_filter={"target_id": target_id,
+                                                 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']},
+                                       fail_on_empty=False):
+                    vims_to_unload.append(target_id)
+            for target_id in vims_to_unload:
+                self._unload_vim(target_id)
+
     def _get_cloud_init(self, where):
         """
-
+        Not used as cloud init content is provided in the http body. This method reads cloud init from a file
         :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
         :return:
         """
@@ -286,7 +309,7 @@ class Ns(object):
             },
             "public_key": public_key,
             "private_key": private_key_encrypted,
-            "actions": [],
+            "actions": []
         }
         self.db.create("ro_nsrs", db_content)
         return db_content
@@ -638,7 +661,7 @@ class Ns(object):
                             target_viminfo = None
                         if target_viminfo is None:
                             # must be deleted
-                            self.assign_vim(target_vim)
+                            self._assign_vim(target_vim)
                             target_record_id = "{}.{}".format(db_record, existing_item["id"])
                             item_ = item
                             if target_vim.startswith("sdn"):
@@ -683,7 +706,7 @@ class Ns(object):
                             target_record_id += ".sdn"
                         extra_dict = process_params(target_item, target_viminfo, target_record_id)
 
-                        self.assign_vim(target_vim)
+                        self._assign_vim(target_vim)
                         task = _create_task(
                             target_vim, item_, "CREATE",
                             target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
@@ -716,7 +739,7 @@ class Ns(object):
                             if not vdur:
                                 raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
                             target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
-                            self.assign_vim(target_vim)
+                            self._assign_vim(target_vim)
                             target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
                             extra_dict = {
                                 "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
index 0ec8452..c7c5464 100644 (file)
@@ -780,9 +780,7 @@ class NsWorker(threading.Thread):
     REFRESH_IMAGE = 3600 * 10
     REFRESH_DELETE = 3600 * 10
     QUEUE_SIZE = 2000
-    # TODO delete assigment_lock = Lock()
     terminate = False
-    # TODO delete assignment = {}
     MAX_TIME_LOCKED = 3600
     MAX_TIME_VIM_LOCKED = 120
 
@@ -815,6 +813,7 @@ class NsWorker(threading.Thread):
         }
         self.time_last_task_processed = None
         self.tasks_to_delete = []  # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+        self.idle = True  # it is idle when there are not vim_targets associated
 
     def insert_task(self, task):
         try:
@@ -884,10 +883,11 @@ class NsWorker(threading.Thread):
         :return: None.
         """
         try:
-            target, _, _id = target_id.partition(":")
             self.db_vims.pop(target_id, None)
             self.my_vims.pop(target_id, None)
-            self.vim_targets.remove(target_id)
+            if target_id in self.vim_targets:
+                self.vim_targets.remove(target_id)
+            self.logger.info("Unloaded {}".format(target_id))
             rmtree("{}:{}".format(target_id, self.worker_index))
         except FileNotFoundError:
             pass  # this is raised by rmtree if folder does not exist
@@ -906,7 +906,7 @@ class NsWorker(threading.Thread):
         unset_dict = {}
         op_text = ""
         step = ""
-        loaded = target_id in self.my_vims
+        loaded = target_id in self.vim_targets
         target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
         try:
             step = "Getting {} from db".format(target_id)
@@ -1451,20 +1451,31 @@ class NsWorker(threading.Thread):
 
     def run(self):
         # load database
-        self.logger.debug("Starting")
+        self.logger.info("Starting")
         while True:
             # step 1: get commands from queue
             try:
-                task = self.task_queue.get(block=False if self.my_vims else True)
+                if self.vim_targets:
+                    task = self.task_queue.get(block=False)
+                else:
+                    if not self.idle:
+                        self.logger.debug("enters in idle state")
+                    self.idle = True
+                    task = self.task_queue.get(block=True)
+                    self.idle = False
+
                 if task[0] == "terminate":
                     break
                 elif task[0] == "load_vim":
+                    self.logger.info("order to load vim {}".format(task[1]))
                     self._load_vim(task[1])
                 elif task[0] == "unload_vim":
+                    self.logger.info("order to unload vim {}".format(task[1]))
                     self._unload_vim(task[1])
                 elif task[0] == "reload_vim":
                     self._reload_vim(task[1])
                 elif task[0] == "check_vim":
+                    self.logger.info("order to check vim {}".format(task[1]))
                     self._check_vim(task[1])
                 continue
             except Exception as e:
@@ -1487,4 +1498,4 @@ class NsWorker(threading.Thread):
             except Exception as e:
                 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
 
-        self.logger.debug("Finishing")
+        self.logger.info("Finishing")
index d7a7c1d..5af766c 100644 (file)
@@ -42,7 +42,8 @@ class VimAdminException(Exception):
 
 class VimAdminThread(threading.Thread):
     MAX_TIME_LOCKED = 3600  # 1h
-    MAX_TIME_UNATTENDED = 60  # 600  # 10min
+    MAX_TIME_UNATTENDED = 600  # 10min
+    TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
 
     def __init__(self, config, engine):
@@ -59,30 +60,58 @@ class VimAdminThread(threading.Thread):
         self.engine = engine
         self.loop = None
         self.last_rotask_time = 0
+        self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
         self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
         self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
 
     async def vim_watcher(self):
-        """ Reads database periodically looking for tasks not processed by nobody because of a restar
+        """ Reads database periodically looking for tasks not processed by nobody because of a reboot
         in order to load this vim"""
+        # firstly read VIMS not processed
+        for target_database in ("vim_accounts", "wim_accounts", "sdns"):
+            unattended_targets = self.db.get_list(target_database,
+                                                  q_filter={"_admin.operations.operationState": "PROCESSING"})
+            for target in unattended_targets:
+                target_id = "{}:{}".format(target_database[:3], target["_id"])
+                self.logger.info("ordered to check {}".format(target_id))
+                self.engine.check_vim(target_id)
+
         while not self.to_terminate:
             now = time()
+            processed_vims = []
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
             ro_tasks = self.db.get_list("ro_tasks",
-                                        q_filter={"target_id.ncont": self.engine.assignment_list,
+                                        q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
                                                   "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
                                                   "locked_at.lt": now - self.MAX_TIME_LOCKED,
                                                   "to_check_at.gt": self.last_rotask_time,
                                                   "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
             for ro_task in ro_tasks:
-                if ro_task["target_id"] not in self.engine.assignment_list:
-                    self.engine.assign_vim(ro_task["target_id"])
-                    self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
-
-            await asyncio.sleep(300, loop=self.loop)
+                # if already checked ignore
+                if ro_task["target_id"] in processed_vims:
+                    continue
+                processed_vims.append(ro_task["target_id"])
+                # if already assigned ignore
+                if ro_task["target_id"] in self.engine.get_assigned_vims():
+                    continue
+                # if there is some task locked on this VIM, there is an RO working on it, so ignore
+                if self.db.get_list("ro_tasks",
+                                    q_filter={"target_id": ro_task["target_id"],
+                                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                              "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+                    continue
+                # unattended, assign vim
+                self.engine.assign_vim(ro_task["target_id"])
+                self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+            # every 2 hours check if there are vims without any ro_task and unload it
+            if now > self.next_check_unused_vim:
+                self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
+                self.engine.unload_unused_vims()
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
 
     async def aiomain(self):
         kafka_working = True
@@ -156,7 +185,7 @@ class VimAdminThread(threading.Thread):
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
-        self.logger.debug("Starting")
+        self.logger.info("Starting")
         while not self.to_terminate:
             try:
                 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
@@ -166,7 +195,7 @@ class VimAdminThread(threading.Thread):
                 if not self.to_terminate:
                     self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
 
-        self.logger.debug("Finishing")
+        self.logger.info("Finishing")
         self._stop()
         self.loop.close()