# If done now it will not be linked to parent not getting its handler and level
self.map_topic = {}
self.write_lock = None
- self.assignment = {}
- self.assignment_list = []
+ self.vims_assigned = {}
self.next_worker = 0
self.plugins = {}
self.workers = []
self.write_lock = Lock()
except (DbException, FsException, MsgException) as e:
raise NsException(str(e), http_code=e.http_code)
+
+ def get_assigned_vims(self):
+ return list(self.vims_assigned.keys())
def stop(self):
try:
for worker in self.workers:
worker.insert_task(("terminate",))
- def _create_worker(self, target_id, load=True):
- # Look for a thread not alive
- worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
- if worker_id:
- # re-start worker
- self.workers[worker_id].start()
+ def _create_worker(self):
+ """
+ Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
+ limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
+ return the index of the assigned worker thread. Worker threads are storead at self.workers
+ """
+ # Look for a thread in idle status
+ worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None)
+ if worker_id is not None:
+ # unset idle status to avoid race conditions
+ self.workers[worker_id].idle = False
else:
worker_id = len(self.workers)
if worker_id < self.config["global"]["server.ns_threads"]:
# reached maximum number of threads, assign VIM to an existing one
worker_id = self.next_worker
self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
- if load:
- self.workers[worker_id].insert_task(("load_vim", target_id))
return worker_id
def assign_vim(self, target_id):
- if target_id not in self.assignment:
- self.assignment[target_id] = self._create_worker(target_id)
- self.assignment_list.append(target_id)
+ with self.write_lock:
+ return self._assign_vim(target_id)
+
+ def _assign_vim(self, target_id):
+ if target_id not in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id] = self._create_worker()
+ self.workers[worker_id].insert_task(("load_vim", target_id))
def reload_vim(self, target_id):
# send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
# this is because database VIM information is cached for threads working with SDN
- # if target_id in self.assignment:
- # worker_id = self.assignment[target_id]
- # self.workers[worker_id].insert_task(("reload_vim", target_id))
- for worker in self.workers:
- if worker.is_alive():
- worker.insert_task(("reload_vim", target_id))
+ with self.write_lock:
+ for worker in self.workers:
+ if worker and not worker.idle:
+ worker.insert_task(("reload_vim", target_id))
def unload_vim(self, target_id):
- if target_id in self.assignment:
- worker_id = self.assignment[target_id]
+ with self.write_lock:
+ return self._unload_vim(target_id)
+
+ def _unload_vim(self, target_id):
+ if target_id in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id]
self.workers[worker_id].insert_task(("unload_vim", target_id))
- del self.assignment[target_id]
- self.assignment_list.remove(target_id)
+ del self.vims_assigned[target_id]
def check_vim(self, target_id):
- if target_id in self.assignment:
- worker_id = self.assignment[target_id]
- else:
- worker_id = self._create_worker(target_id, load=False)
+ with self.write_lock:
+ if target_id in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id]
+ else:
+ worker_id = self._create_worker()
worker = self.workers[worker_id]
worker.insert_task(("check_vim", target_id))
+ def unload_unused_vims(self):
+ with self.write_lock:
+ vims_to_unload = []
+ for target_id in self.vims_assigned:
+ if not self.db.get_one("ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']},
+ fail_on_empty=False):
+ vims_to_unload.append(target_id)
+ for target_id in vims_to_unload:
+ self._unload_vim(target_id)
+
def _get_cloud_init(self, where):
"""
-
+ Not used as cloud init content is provided in the http body. This method reads cloud init from a file
:param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
:return:
"""
},
"public_key": public_key,
"private_key": private_key_encrypted,
- "actions": [],
+ "actions": []
}
self.db.create("ro_nsrs", db_content)
return db_content
for iface_index, interface in enumerate(target_vdu["interfaces"]):
if interface.get("ns-vld-id"):
net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
- else:
+ elif interface.get("vnf-vld-id"):
net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+ else:
+ self.logger.error("Interface {} from vdu {} not connected to any vld".format(
+ iface_index, target_vdu["vdu-name"]))
+ continue # interface not connected to any vld
extra_dict["depends_on"].append(net_text)
net_item = {x: v for x, v in interface.items() if x in
("name", "vpci", "port_security", "port_security_disable_strategy", "floating_ip")}
target_viminfo = None
if target_viminfo is None:
# must be deleted
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
target_record_id = "{}.{}".format(db_record, existing_item["id"])
item_ = item
if target_vim.startswith("sdn"):
target_record_id += ".sdn"
extra_dict = process_params(target_item, target_viminfo, target_record_id)
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
task = _create_task(
target_vim, item_, "CREATE",
target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
if not vdur:
raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
extra_dict = {
"depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],