fix 1385: enhance unload vim from ns_thread when not needed
[osm/RO.git] / NG-RO / osm_ng_ro / ns.py
index aa1040d..f621051 100644 (file)
@@ -87,8 +87,7 @@ class Ns(object):
         # If done now it will not be linked to parent not getting its handler and level
         self.map_topic = {}
         self.write_lock = None
-        self.assignment = {}
-        self.assignment_list = []
+        self.vims_assigned = {}
         self.next_worker = 0
         self.plugins = {}
         self.workers = []
@@ -150,6 +149,9 @@ class Ns(object):
             self.write_lock = Lock()
         except (DbException, FsException, MsgException) as e:
             raise NsException(str(e), http_code=e.http_code)
+    
+    def get_assigned_vims(self):
+        return list(self.vims_assigned.keys())
 
     def stop(self):
         try:
@@ -165,12 +167,17 @@ class Ns(object):
         for worker in self.workers:
             worker.insert_task(("terminate",))
 
-    def _create_worker(self, target_id, load=True):
-        # Look for a thread not alive
-        worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
-        if worker_id:
-            # re-start worker
-            self.workers[worker_id].start()
+    def _create_worker(self):
+        """
+        Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
+        limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
+        return the index of the assigned worker thread. Worker threads are storead at self.workers
+        """
+        # Look for a thread in idle status
+        worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None)
+        if worker_id is not None:
+            # unset idle status to avoid race conditions
+            self.workers[worker_id].idle = False
         else:
             worker_id = len(self.workers)
             if worker_id < self.config["global"]["server.ns_threads"]:
@@ -181,44 +188,60 @@ class Ns(object):
                 # reached maximum number of threads, assign VIM to an existing one
                 worker_id = self.next_worker
                 self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
-        if load:
-            self.workers[worker_id].insert_task(("load_vim", target_id))
         return worker_id
 
     def assign_vim(self, target_id):
-        if target_id not in self.assignment:
-            self.assignment[target_id] = self._create_worker(target_id)
-            self.assignment_list.append(target_id)
+        with self.write_lock:
+            return self._assign_vim(target_id)
+
+    def _assign_vim(self, target_id):
+        if target_id not in self.vims_assigned:
+            worker_id = self.vims_assigned[target_id] = self._create_worker()
+            self.workers[worker_id].insert_task(("load_vim", target_id))
 
     def reload_vim(self, target_id):
         # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
         # this is because database VIM information is cached for threads working with SDN
-        # if target_id in self.assignment:
-        #     worker_id = self.assignment[target_id]
-        #     self.workers[worker_id].insert_task(("reload_vim", target_id))
-        for worker in self.workers:
-            if worker.is_alive():
-                worker.insert_task(("reload_vim", target_id))
+        with self.write_lock:
+            for worker in self.workers:
+                if worker and not worker.idle:
+                    worker.insert_task(("reload_vim", target_id))
 
     def unload_vim(self, target_id):
-        if target_id in self.assignment:
-            worker_id = self.assignment[target_id]
+        with self.write_lock:
+            return self._unload_vim(target_id)
+
+    def _unload_vim(self, target_id):
+        if target_id in self.vims_assigned:
+            worker_id = self.vims_assigned[target_id]
             self.workers[worker_id].insert_task(("unload_vim", target_id))
-            del self.assignment[target_id]
-            self.assignment_list.remove(target_id)
+            del self.vims_assigned[target_id]
 
     def check_vim(self, target_id):
-        if target_id in self.assignment:
-            worker_id = self.assignment[target_id]
-        else:
-            worker_id = self._create_worker(target_id, load=False)
+        with self.write_lock:
+            if target_id in self.vims_assigned:
+                worker_id = self.vims_assigned[target_id]
+            else:
+                worker_id = self._create_worker()
 
         worker = self.workers[worker_id]
         worker.insert_task(("check_vim", target_id))
 
+    def unload_unused_vims(self):
+        with self.write_lock:
+            vims_to_unload = []
+            for target_id in self.vims_assigned:
+                if not self.db.get_one("ro_tasks",
+                                       q_filter={"target_id": target_id,
+                                                 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']},
+                                       fail_on_empty=False):
+                    vims_to_unload.append(target_id)
+            for target_id in vims_to_unload:
+                self._unload_vim(target_id)
+
     def _get_cloud_init(self, where):
         """
-
+        Not used as cloud init content is provided in the http body. This method reads cloud init from a file
         :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
         :return:
         """
@@ -286,7 +309,7 @@ class Ns(object):
             },
             "public_key": public_key,
             "private_key": private_key_encrypted,
-            "actions": [],
+            "actions": []
         }
         self.db.create("ro_nsrs", db_content)
         return db_content
@@ -638,7 +661,7 @@ class Ns(object):
                             target_viminfo = None
                         if target_viminfo is None:
                             # must be deleted
-                            self.assign_vim(target_vim)
+                            self._assign_vim(target_vim)
                             target_record_id = "{}.{}".format(db_record, existing_item["id"])
                             item_ = item
                             if target_vim.startswith("sdn"):
@@ -683,7 +706,7 @@ class Ns(object):
                             target_record_id += ".sdn"
                         extra_dict = process_params(target_item, target_viminfo, target_record_id)
 
-                        self.assign_vim(target_vim)
+                        self._assign_vim(target_vim)
                         task = _create_task(
                             target_vim, item_, "CREATE",
                             target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
@@ -716,7 +739,7 @@ class Ns(object):
                             if not vdur:
                                 raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
                             target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
-                            self.assign_vim(target_vim)
+                            self._assign_vim(target_vim)
                             target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
                             extra_dict = {
                                 "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],