X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=f621051486ddf9aeb871e4284d83993be8c5c1d7;hb=refs%2Fchanges%2F80%2F10080%2F6;hp=a9c5ec0a894c231af940147767fde4fe265698d3;hpb=70eeb18e4fcbb8bc3c81c88f270b59966ae4d463;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index a9c5ec0a..f6210514 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -87,8 +87,7 @@ class Ns(object): # If done now it will not be linked to parent not getting its handler and level self.map_topic = {} self.write_lock = None - self.assignment = {} - self.assignment_list = [] + self.vims_assigned = {} self.next_worker = 0 self.plugins = {} self.workers = [] @@ -150,6 +149,9 @@ class Ns(object): self.write_lock = Lock() except (DbException, FsException, MsgException) as e: raise NsException(str(e), http_code=e.http_code) + + def get_assigned_vims(self): + return list(self.vims_assigned.keys()) def stop(self): try: @@ -165,12 +167,17 @@ class Ns(object): for worker in self.workers: worker.insert_task(("terminate",)) - def _create_worker(self, target_id, load=True): - # Look for a thread not alive - worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None) - if worker_id: - # re-start worker - self.workers[worker_id].start() + def _create_worker(self): + """ + Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the + limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread + return the index of the assigned worker thread. Worker threads are storead at self.workers + """ + # Look for a thread in idle status + worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None) + if worker_id is not None: + # unset idle status to avoid race conditions + self.workers[worker_id].idle = False else: worker_id = len(self.workers) if worker_id < self.config["global"]["server.ns_threads"]: @@ -181,44 +188,60 @@ class Ns(object): # reached maximum number of threads, assign VIM to an existing one worker_id = self.next_worker self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"] - if load: - self.workers[worker_id].insert_task(("load_vim", target_id)) return worker_id def assign_vim(self, target_id): - if target_id not in self.assignment: - self.assignment[target_id] = self._create_worker(target_id) - self.assignment_list.append(target_id) + with self.write_lock: + return self._assign_vim(target_id) + + def _assign_vim(self, target_id): + if target_id not in self.vims_assigned: + worker_id = self.vims_assigned[target_id] = self._create_worker() + self.workers[worker_id].insert_task(("load_vim", target_id)) def reload_vim(self, target_id): # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed, # this is because database VIM information is cached for threads working with SDN - # if target_id in self.assignment: - # worker_id = self.assignment[target_id] - # self.workers[worker_id].insert_task(("reload_vim", target_id)) - for worker in self.workers: - if worker.is_alive(): - worker.insert_task(("reload_vim", target_id)) + with self.write_lock: + for worker in self.workers: + if worker and not worker.idle: + worker.insert_task(("reload_vim", target_id)) def unload_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] + with self.write_lock: + return self._unload_vim(target_id) + + def _unload_vim(self, target_id): + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] self.workers[worker_id].insert_task(("unload_vim", target_id)) - del self.assignment[target_id] - self.assignment_list.remove(target_id) + del self.vims_assigned[target_id] def check_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] - else: - worker_id = self._create_worker(target_id, load=False) + with self.write_lock: + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] + else: + worker_id = self._create_worker() worker = self.workers[worker_id] worker.insert_task(("check_vim", target_id)) + def unload_unused_vims(self): + with self.write_lock: + vims_to_unload = [] + for target_id in self.vims_assigned: + if not self.db.get_one("ro_tasks", + q_filter={"target_id": target_id, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']}, + fail_on_empty=False): + vims_to_unload.append(target_id) + for target_id in vims_to_unload: + self._unload_vim(target_id) + def _get_cloud_init(self, where): """ - + Not used as cloud init content is provided in the http body. This method reads cloud init from a file :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' :return: """ @@ -286,7 +309,7 @@ class Ns(object): }, "public_key": public_key, "private_key": private_key_encrypted, - "actions": [], + "actions": [] } self.db.create("ro_nsrs", db_content) return db_content @@ -413,10 +436,8 @@ class Ns(object): flavor_data = { "disk": int(target_flavor["storage-gb"]), - # "ram": max(int(target_flavor["memory-mb"]) // 1024, 1), - # ^ TODO manage at vim_connectors MB instead of GB "ram": int(target_flavor["memory-mb"]), - "vcpus": target_flavor["vcpu-count"], + "vcpus": int(target_flavor["vcpu-count"]), } numa = {} extended = {} @@ -550,8 +571,12 @@ class Ns(object): for iface_index, interface in enumerate(target_vdu["interfaces"]): if interface.get("ns-vld-id"): net_text = ns_preffix + ":vld." + interface["ns-vld-id"] - else: + elif interface.get("vnf-vld-id"): net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] + else: + self.logger.error("Interface {} from vdu {} not connected to any vld".format( + iface_index, target_vdu["vdu-name"])) + continue # interface not connected to any vld extra_dict["depends_on"].append(net_text) net_item = {x: v for x, v in interface.items() if x in ("name", "vpci", "port_security", "port_security_disable_strategy", "floating_ip")} @@ -636,7 +661,7 @@ class Ns(object): target_viminfo = None if target_viminfo is None: # must be deleted - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record_id = "{}.{}".format(db_record, existing_item["id"]) item_ = item if target_vim.startswith("sdn"): @@ -681,7 +706,7 @@ class Ns(object): target_record_id += ".sdn" extra_dict = process_params(target_item, target_viminfo, target_record_id) - self.assign_vim(target_vim) + self._assign_vim(target_vim) task = _create_task( target_vim, item_, "CREATE", target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim), @@ -714,7 +739,7 @@ class Ns(object): if not vdur: raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"])) target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items()) - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index) extra_dict = { "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],