X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=1c2e960a36e6ae4714c92948f1c003b3f33a137d;hb=refs%2Fchanges%2F82%2F10282%2F5;hp=1b4e9ebc92a76db0199e52c9bf0e99451bc898bd;hpb=fb13d2ea0b862011b80f2c64c95bdada2c893ff7;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index 1b4e9ebc..1c2e960a 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -87,8 +87,7 @@ class Ns(object): # If done now it will not be linked to parent not getting its handler and level self.map_topic = {} self.write_lock = None - self.assignment = {} - self.assignment_list = [] + self.vims_assigned = {} self.next_worker = 0 self.plugins = {} self.workers = [] @@ -150,6 +149,9 @@ class Ns(object): self.write_lock = Lock() except (DbException, FsException, MsgException) as e: raise NsException(str(e), http_code=e.http_code) + + def get_assigned_vims(self): + return list(self.vims_assigned.keys()) def stop(self): try: @@ -165,12 +167,17 @@ class Ns(object): for worker in self.workers: worker.insert_task(("terminate",)) - def _create_worker(self, target_id, load=True): - # Look for a thread not alive - worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None) - if worker_id: - # re-start worker - self.workers[worker_id].start() + def _create_worker(self): + """ + Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the + limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread + return the index of the assigned worker thread. Worker threads are storead at self.workers + """ + # Look for a thread in idle status + worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None) + if worker_id is not None: + # unset idle status to avoid race conditions + self.workers[worker_id].idle = False else: worker_id = len(self.workers) if worker_id < self.config["global"]["server.ns_threads"]: @@ -181,44 +188,60 @@ class Ns(object): # reached maximum number of threads, assign VIM to an existing one worker_id = self.next_worker self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"] - if load: - self.workers[worker_id].insert_task(("load_vim", target_id)) return worker_id def assign_vim(self, target_id): - if target_id not in self.assignment: - self.assignment[target_id] = self._create_worker(target_id) - self.assignment_list.append(target_id) + with self.write_lock: + return self._assign_vim(target_id) + + def _assign_vim(self, target_id): + if target_id not in self.vims_assigned: + worker_id = self.vims_assigned[target_id] = self._create_worker() + self.workers[worker_id].insert_task(("load_vim", target_id)) def reload_vim(self, target_id): # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed, # this is because database VIM information is cached for threads working with SDN - # if target_id in self.assignment: - # worker_id = self.assignment[target_id] - # self.workers[worker_id].insert_task(("reload_vim", target_id)) - for worker in self.workers: - if worker.is_alive(): - worker.insert_task(("reload_vim", target_id)) + with self.write_lock: + for worker in self.workers: + if worker and not worker.idle: + worker.insert_task(("reload_vim", target_id)) def unload_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] + with self.write_lock: + return self._unload_vim(target_id) + + def _unload_vim(self, target_id): + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] self.workers[worker_id].insert_task(("unload_vim", target_id)) - del self.assignment[target_id] - self.assignment_list.remove(target_id) + del self.vims_assigned[target_id] def check_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] - else: - worker_id = self._create_worker(target_id, load=False) + with self.write_lock: + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] + else: + worker_id = self._create_worker() worker = self.workers[worker_id] worker.insert_task(("check_vim", target_id)) + def unload_unused_vims(self): + with self.write_lock: + vims_to_unload = [] + for target_id in self.vims_assigned: + if not self.db.get_one("ro_tasks", + q_filter={"target_id": target_id, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']}, + fail_on_empty=False): + vims_to_unload.append(target_id) + for target_id in vims_to_unload: + self._unload_vim(target_id) + def _get_cloud_init(self, where): """ - + Not used as cloud init content is provided in the http body. This method reads cloud init from a file :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' :return: """ @@ -286,7 +309,7 @@ class Ns(object): }, "public_key": public_key, "private_key": private_key_encrypted, - "actions": [], + "actions": [] } self.db.create("ro_nsrs", db_content) return db_content @@ -487,10 +510,11 @@ class Ns(object): "ip_version": "IPv4" if "v4" in ip_profile.get("ip-version", "ipv4") else "IPv6", "subnet_address": ip_profile.get("subnet-address"), "gateway_address": ip_profile.get("gateway-address"), - "dhcp_enabled": ip_profile["dhcp-params"].get("enabled", True), - "dhcp_start_address": ip_profile["dhcp-params"].get("start-address"), - "dhcp_count": ip_profile["dhcp-params"].get("count"), - + "dhcp_enabled": ip_profile["dhcp-params"].get("enabled", True) + if "dhcp_params" in ip_profile else False, + "dhcp_start_address": ip_profile["dhcp-params"].get("start-address") + if "dhcp_params" in ip_profile else None, + "dhcp_count": ip_profile["dhcp-params"].get("count") if "dhcp_params" in ip_profile else None, } if ip_profile.get("dns-server"): ro_ip_profile["dns_address"] = ";".join([v["address"] for v in ip_profile["dns-server"]]) @@ -548,8 +572,12 @@ class Ns(object): for iface_index, interface in enumerate(target_vdu["interfaces"]): if interface.get("ns-vld-id"): net_text = ns_preffix + ":vld." + interface["ns-vld-id"] - else: + elif interface.get("vnf-vld-id"): net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] + else: + self.logger.error("Interface {} from vdu {} not connected to any vld".format( + iface_index, target_vdu["vdu-name"])) + continue # interface not connected to any vld extra_dict["depends_on"].append(net_text) net_item = {x: v for x, v in interface.items() if x in ("name", "vpci", "port_security", "port_security_disable_strategy", "floating_ip")} @@ -634,7 +662,7 @@ class Ns(object): target_viminfo = None if target_viminfo is None: # must be deleted - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record_id = "{}.{}".format(db_record, existing_item["id"]) item_ = item if target_vim.startswith("sdn"): @@ -679,7 +707,7 @@ class Ns(object): target_record_id += ".sdn" extra_dict = process_params(target_item, target_viminfo, target_record_id) - self.assign_vim(target_vim) + self._assign_vim(target_vim) task = _create_task( target_vim, item_, "CREATE", target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim), @@ -712,7 +740,7 @@ class Ns(object): if not vdur: raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"])) target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items()) - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index) extra_dict = { "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],