# If done now it will not be linked to parent not getting its handler and level
self.map_topic = {}
self.write_lock = None
- self.assignment = {}
- self.assignment_list = []
+ self.vims_assigned = {}
self.next_worker = 0
self.plugins = {}
self.workers = []
self.write_lock = Lock()
except (DbException, FsException, MsgException) as e:
raise NsException(str(e), http_code=e.http_code)
+
+ def get_assigned_vims(self):
+ return list(self.vims_assigned.keys())
def stop(self):
try:
for worker in self.workers:
worker.insert_task(("terminate",))
- def _create_worker(self, target_id, load=True):
- # Look for a thread not alive
- worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
- if worker_id:
- # re-start worker
- self.workers[worker_id].start()
+ def _create_worker(self):
+ """
+ Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
+ limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
+ return the index of the assigned worker thread. Worker threads are storead at self.workers
+ """
+ # Look for a thread in idle status
+ worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None)
+ if worker_id is not None:
+ # unset idle status to avoid race conditions
+ self.workers[worker_id].idle = False
else:
worker_id = len(self.workers)
if worker_id < self.config["global"]["server.ns_threads"]:
# reached maximum number of threads, assign VIM to an existing one
worker_id = self.next_worker
self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
- if load:
- self.workers[worker_id].insert_task(("load_vim", target_id))
return worker_id
def assign_vim(self, target_id):
- if target_id not in self.assignment:
- self.assignment[target_id] = self._create_worker(target_id)
- self.assignment_list.append(target_id)
+ with self.write_lock:
+ return self._assign_vim(target_id)
+
+ def _assign_vim(self, target_id):
+ if target_id not in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id] = self._create_worker()
+ self.workers[worker_id].insert_task(("load_vim", target_id))
def reload_vim(self, target_id):
# send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
# this is because database VIM information is cached for threads working with SDN
- # if target_id in self.assignment:
- # worker_id = self.assignment[target_id]
- # self.workers[worker_id].insert_task(("reload_vim", target_id))
- for worker in self.workers:
- if worker.is_alive():
- worker.insert_task(("reload_vim", target_id))
+ with self.write_lock:
+ for worker in self.workers:
+ if worker and not worker.idle:
+ worker.insert_task(("reload_vim", target_id))
def unload_vim(self, target_id):
- if target_id in self.assignment:
- worker_id = self.assignment[target_id]
+ with self.write_lock:
+ return self._unload_vim(target_id)
+
+ def _unload_vim(self, target_id):
+ if target_id in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id]
self.workers[worker_id].insert_task(("unload_vim", target_id))
- del self.assignment[target_id]
- self.assignment_list.remove(target_id)
+ del self.vims_assigned[target_id]
def check_vim(self, target_id):
- if target_id in self.assignment:
- worker_id = self.assignment[target_id]
- else:
- worker_id = self._create_worker(target_id, load=False)
+ with self.write_lock:
+ if target_id in self.vims_assigned:
+ worker_id = self.vims_assigned[target_id]
+ else:
+ worker_id = self._create_worker()
worker = self.workers[worker_id]
worker.insert_task(("check_vim", target_id))
+ def unload_unused_vims(self):
+ with self.write_lock:
+ vims_to_unload = []
+ for target_id in self.vims_assigned:
+ if not self.db.get_one("ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']},
+ fail_on_empty=False):
+ vims_to_unload.append(target_id)
+ for target_id in vims_to_unload:
+ self._unload_vim(target_id)
+
def _get_cloud_init(self, where):
"""
-
+ Not used as cloud init content is provided in the http body. This method reads cloud init from a file
:param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
:return:
"""
},
"public_key": public_key,
"private_key": private_key_encrypted,
- "actions": [],
+ "actions": []
}
self.db.create("ro_nsrs", db_content)
return db_content
target_viminfo = None
if target_viminfo is None:
# must be deleted
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
target_record_id = "{}.{}".format(db_record, existing_item["id"])
item_ = item
if target_vim.startswith("sdn"):
target_record_id += ".sdn"
extra_dict = process_params(target_item, target_viminfo, target_record_id)
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
task = _create_task(
target_vim, item_, "CREATE",
target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
if not vdur:
raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
- self.assign_vim(target_vim)
+ self._assign_vim(target_vim)
target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
extra_dict = {
"depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
REFRESH_IMAGE = 3600 * 10
REFRESH_DELETE = 3600 * 10
QUEUE_SIZE = 2000
- # TODO delete assigment_lock = Lock()
terminate = False
- # TODO delete assignment = {}
MAX_TIME_LOCKED = 3600
MAX_TIME_VIM_LOCKED = 120
}
self.time_last_task_processed = None
self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+ self.idle = True # it is idle when there are not vim_targets associated
def insert_task(self, task):
try:
:return: None.
"""
try:
- target, _, _id = target_id.partition(":")
self.db_vims.pop(target_id, None)
self.my_vims.pop(target_id, None)
- self.vim_targets.remove(target_id)
+ if target_id in self.vim_targets:
+ self.vim_targets.remove(target_id)
+ self.logger.info("Unloaded {}".format(target_id))
rmtree("{}:{}".format(target_id, self.worker_index))
except FileNotFoundError:
pass # this is raised by rmtree if folder does not exist
unset_dict = {}
op_text = ""
step = ""
- loaded = target_id in self.my_vims
+ loaded = target_id in self.vim_targets
target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
try:
step = "Getting {} from db".format(target_id)
def run(self):
# load database
- self.logger.debug("Starting")
+ self.logger.info("Starting")
while True:
# step 1: get commands from queue
try:
- task = self.task_queue.get(block=False if self.my_vims else True)
+ if self.vim_targets:
+ task = self.task_queue.get(block=False)
+ else:
+ if not self.idle:
+ self.logger.debug("enters in idle state")
+ self.idle = True
+ task = self.task_queue.get(block=True)
+ self.idle = False
+
if task[0] == "terminate":
break
elif task[0] == "load_vim":
+ self.logger.info("order to load vim {}".format(task[1]))
self._load_vim(task[1])
elif task[0] == "unload_vim":
+ self.logger.info("order to unload vim {}".format(task[1]))
self._unload_vim(task[1])
elif task[0] == "reload_vim":
self._reload_vim(task[1])
elif task[0] == "check_vim":
+ self.logger.info("order to check vim {}".format(task[1]))
self._check_vim(task[1])
continue
except Exception as e:
except Exception as e:
self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
- self.logger.debug("Finishing")
+ self.logger.info("Finishing")
class VimAdminThread(threading.Thread):
MAX_TIME_LOCKED = 3600 # 1h
- MAX_TIME_UNATTENDED = 60 # 600 # 10min
+ MAX_TIME_UNATTENDED = 600 # 10min
+ TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
kafka_topics = ("vim_account", "wim_account", "sdn")
def __init__(self, config, engine):
self.engine = engine
self.loop = None
self.last_rotask_time = 0
+ self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
self.logger = logging.getLogger("ro.vimadmin")
self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
async def vim_watcher(self):
- """ Reads database periodically looking for tasks not processed by nobody because of a restar
+ """ Reads database periodically looking for tasks not processed by nobody because of a reboot
in order to load this vim"""
+ # firstly read VIMS not processed
+ for target_database in ("vim_accounts", "wim_accounts", "sdns"):
+ unattended_targets = self.db.get_list(target_database,
+ q_filter={"_admin.operations.operationState": "PROCESSING"})
+ for target in unattended_targets:
+ target_id = "{}:{}".format(target_database[:3], target["_id"])
+ self.logger.info("ordered to check {}".format(target_id))
+ self.engine.check_vim(target_id)
+
while not self.to_terminate:
now = time()
+ processed_vims = []
if not self.last_rotask_time:
self.last_rotask_time = 0
ro_tasks = self.db.get_list("ro_tasks",
- q_filter={"target_id.ncont": self.engine.assignment_list,
+ q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
"locked_at.lt": now - self.MAX_TIME_LOCKED,
"to_check_at.gt": self.last_rotask_time,
"to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
for ro_task in ro_tasks:
- if ro_task["target_id"] not in self.engine.assignment_list:
- self.engine.assign_vim(ro_task["target_id"])
- self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
-
- await asyncio.sleep(300, loop=self.loop)
+ # if already checked ignore
+ if ro_task["target_id"] in processed_vims:
+ continue
+ processed_vims.append(ro_task["target_id"])
+ # if already assigned ignore
+ if ro_task["target_id"] in self.engine.get_assigned_vims():
+ continue
+ # if there is some task locked on this VIM, there is an RO working on it, so ignore
+ if self.db.get_list("ro_tasks",
+ q_filter={"target_id": ro_task["target_id"],
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+ continue
+ # unattended, assign vim
+ self.engine.assign_vim(ro_task["target_id"])
+ self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+ # every 2 hours check if there are vims without any ro_task and unload it
+ if now > self.next_check_unused_vim:
+ self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
+ self.engine.unload_unused_vims()
+ await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
async def aiomain(self):
kafka_working = True
except (DbException, MsgException) as e:
raise VimAdminException(str(e), http_code=e.http_code)
- self.logger.debug("Starting")
+ self.logger.info("Starting")
while not self.to_terminate:
try:
self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
if not self.to_terminate:
self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
- self.logger.debug("Finishing")
+ self.logger.info("Finishing")
self._stop()
self.loop.close()