X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=f621051486ddf9aeb871e4284d83993be8c5c1d7;hb=refs%2Fchanges%2F80%2F10080%2F6;hp=aa1040d782c511a5b1ac7a928f530ff3f26e4778;hpb=55fa0bb312eebc5899abca6124b2114db1682f7b;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index aa1040d7..f6210514 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -87,8 +87,7 @@ class Ns(object): # If done now it will not be linked to parent not getting its handler and level self.map_topic = {} self.write_lock = None - self.assignment = {} - self.assignment_list = [] + self.vims_assigned = {} self.next_worker = 0 self.plugins = {} self.workers = [] @@ -150,6 +149,9 @@ class Ns(object): self.write_lock = Lock() except (DbException, FsException, MsgException) as e: raise NsException(str(e), http_code=e.http_code) + + def get_assigned_vims(self): + return list(self.vims_assigned.keys()) def stop(self): try: @@ -165,12 +167,17 @@ class Ns(object): for worker in self.workers: worker.insert_task(("terminate",)) - def _create_worker(self, target_id, load=True): - # Look for a thread not alive - worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None) - if worker_id: - # re-start worker - self.workers[worker_id].start() + def _create_worker(self): + """ + Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the + limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread + return the index of the assigned worker thread. Worker threads are storead at self.workers + """ + # Look for a thread in idle status + worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None) + if worker_id is not None: + # unset idle status to avoid race conditions + self.workers[worker_id].idle = False else: worker_id = len(self.workers) if worker_id < self.config["global"]["server.ns_threads"]: @@ -181,44 +188,60 @@ class Ns(object): # reached maximum number of threads, assign VIM to an existing one worker_id = self.next_worker self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"] - if load: - self.workers[worker_id].insert_task(("load_vim", target_id)) return worker_id def assign_vim(self, target_id): - if target_id not in self.assignment: - self.assignment[target_id] = self._create_worker(target_id) - self.assignment_list.append(target_id) + with self.write_lock: + return self._assign_vim(target_id) + + def _assign_vim(self, target_id): + if target_id not in self.vims_assigned: + worker_id = self.vims_assigned[target_id] = self._create_worker() + self.workers[worker_id].insert_task(("load_vim", target_id)) def reload_vim(self, target_id): # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed, # this is because database VIM information is cached for threads working with SDN - # if target_id in self.assignment: - # worker_id = self.assignment[target_id] - # self.workers[worker_id].insert_task(("reload_vim", target_id)) - for worker in self.workers: - if worker.is_alive(): - worker.insert_task(("reload_vim", target_id)) + with self.write_lock: + for worker in self.workers: + if worker and not worker.idle: + worker.insert_task(("reload_vim", target_id)) def unload_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] + with self.write_lock: + return self._unload_vim(target_id) + + def _unload_vim(self, target_id): + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] self.workers[worker_id].insert_task(("unload_vim", target_id)) - del self.assignment[target_id] - self.assignment_list.remove(target_id) + del self.vims_assigned[target_id] def check_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] - else: - worker_id = self._create_worker(target_id, load=False) + with self.write_lock: + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] + else: + worker_id = self._create_worker() worker = self.workers[worker_id] worker.insert_task(("check_vim", target_id)) + def unload_unused_vims(self): + with self.write_lock: + vims_to_unload = [] + for target_id in self.vims_assigned: + if not self.db.get_one("ro_tasks", + q_filter={"target_id": target_id, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']}, + fail_on_empty=False): + vims_to_unload.append(target_id) + for target_id in vims_to_unload: + self._unload_vim(target_id) + def _get_cloud_init(self, where): """ - + Not used as cloud init content is provided in the http body. This method reads cloud init from a file :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' :return: """ @@ -286,7 +309,7 @@ class Ns(object): }, "public_key": public_key, "private_key": private_key_encrypted, - "actions": [], + "actions": [] } self.db.create("ro_nsrs", db_content) return db_content @@ -638,7 +661,7 @@ class Ns(object): target_viminfo = None if target_viminfo is None: # must be deleted - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record_id = "{}.{}".format(db_record, existing_item["id"]) item_ = item if target_vim.startswith("sdn"): @@ -683,7 +706,7 @@ class Ns(object): target_record_id += ".sdn" extra_dict = process_params(target_item, target_viminfo, target_record_id) - self.assign_vim(target_vim) + self._assign_vim(target_vim) task = _create_task( target_vim, item_, "CREATE", target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim), @@ -716,7 +739,7 @@ class Ns(object): if not vdur: raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"])) target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items()) - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index) extra_dict = { "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],