From 8615352fd9db8a157462ad848e37260d4f3468d2 Mon Sep 17 00:00:00 2001 From: tierno Date: Sun, 6 Dec 2020 18:27:16 +0000 Subject: [PATCH] fix 1385: enhance unload vim from ns_thread when not needed control of threads in in idle status (without vims to process) and reuse it when needed Change-Id: Ib7c5023eec18229fb86a1632b63aca5aef8d2a14 Signed-off-by: tierno --- NG-RO/osm_ng_ro/ns.py | 87 +++++++++++++++++++++++------------- NG-RO/osm_ng_ro/ns_thread.py | 27 +++++++---- NG-RO/osm_ng_ro/vim_admin.py | 49 +++++++++++++++----- 3 files changed, 113 insertions(+), 50 deletions(-) diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index aa1040d7..f6210514 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -87,8 +87,7 @@ class Ns(object): # If done now it will not be linked to parent not getting its handler and level self.map_topic = {} self.write_lock = None - self.assignment = {} - self.assignment_list = [] + self.vims_assigned = {} self.next_worker = 0 self.plugins = {} self.workers = [] @@ -150,6 +149,9 @@ class Ns(object): self.write_lock = Lock() except (DbException, FsException, MsgException) as e: raise NsException(str(e), http_code=e.http_code) + + def get_assigned_vims(self): + return list(self.vims_assigned.keys()) def stop(self): try: @@ -165,12 +167,17 @@ class Ns(object): for worker in self.workers: worker.insert_task(("terminate",)) - def _create_worker(self, target_id, load=True): - # Look for a thread not alive - worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None) - if worker_id: - # re-start worker - self.workers[worker_id].start() + def _create_worker(self): + """ + Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the + limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread + return the index of the assigned worker thread. Worker threads are storead at self.workers + """ + # Look for a thread in idle status + worker_id = next((i for i in range(len(self.workers)) if self.workers[i] and self.workers[i].idle), None) + if worker_id is not None: + # unset idle status to avoid race conditions + self.workers[worker_id].idle = False else: worker_id = len(self.workers) if worker_id < self.config["global"]["server.ns_threads"]: @@ -181,44 +188,60 @@ class Ns(object): # reached maximum number of threads, assign VIM to an existing one worker_id = self.next_worker self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"] - if load: - self.workers[worker_id].insert_task(("load_vim", target_id)) return worker_id def assign_vim(self, target_id): - if target_id not in self.assignment: - self.assignment[target_id] = self._create_worker(target_id) - self.assignment_list.append(target_id) + with self.write_lock: + return self._assign_vim(target_id) + + def _assign_vim(self, target_id): + if target_id not in self.vims_assigned: + worker_id = self.vims_assigned[target_id] = self._create_worker() + self.workers[worker_id].insert_task(("load_vim", target_id)) def reload_vim(self, target_id): # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed, # this is because database VIM information is cached for threads working with SDN - # if target_id in self.assignment: - # worker_id = self.assignment[target_id] - # self.workers[worker_id].insert_task(("reload_vim", target_id)) - for worker in self.workers: - if worker.is_alive(): - worker.insert_task(("reload_vim", target_id)) + with self.write_lock: + for worker in self.workers: + if worker and not worker.idle: + worker.insert_task(("reload_vim", target_id)) def unload_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] + with self.write_lock: + return self._unload_vim(target_id) + + def _unload_vim(self, target_id): + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] self.workers[worker_id].insert_task(("unload_vim", target_id)) - del self.assignment[target_id] - self.assignment_list.remove(target_id) + del self.vims_assigned[target_id] def check_vim(self, target_id): - if target_id in self.assignment: - worker_id = self.assignment[target_id] - else: - worker_id = self._create_worker(target_id, load=False) + with self.write_lock: + if target_id in self.vims_assigned: + worker_id = self.vims_assigned[target_id] + else: + worker_id = self._create_worker() worker = self.workers[worker_id] worker.insert_task(("check_vim", target_id)) + def unload_unused_vims(self): + with self.write_lock: + vims_to_unload = [] + for target_id in self.vims_assigned: + if not self.db.get_one("ro_tasks", + q_filter={"target_id": target_id, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED']}, + fail_on_empty=False): + vims_to_unload.append(target_id) + for target_id in vims_to_unload: + self._unload_vim(target_id) + def _get_cloud_init(self, where): """ - + Not used as cloud init content is provided in the http body. This method reads cloud init from a file :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' :return: """ @@ -286,7 +309,7 @@ class Ns(object): }, "public_key": public_key, "private_key": private_key_encrypted, - "actions": [], + "actions": [] } self.db.create("ro_nsrs", db_content) return db_content @@ -638,7 +661,7 @@ class Ns(object): target_viminfo = None if target_viminfo is None: # must be deleted - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record_id = "{}.{}".format(db_record, existing_item["id"]) item_ = item if target_vim.startswith("sdn"): @@ -683,7 +706,7 @@ class Ns(object): target_record_id += ".sdn" extra_dict = process_params(target_item, target_viminfo, target_record_id) - self.assign_vim(target_vim) + self._assign_vim(target_vim) task = _create_task( target_vim, item_, "CREATE", target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim), @@ -716,7 +739,7 @@ class Ns(object): if not vdur: raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"])) target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items()) - self.assign_vim(target_vim) + self._assign_vim(target_vim) target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index) extra_dict = { "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])], diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 0ec84525..c7c5464b 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -780,9 +780,7 @@ class NsWorker(threading.Thread): REFRESH_IMAGE = 3600 * 10 REFRESH_DELETE = 3600 * 10 QUEUE_SIZE = 2000 - # TODO delete assigment_lock = Lock() terminate = False - # TODO delete assignment = {} MAX_TIME_LOCKED = 3600 MAX_TIME_VIM_LOCKED = 120 @@ -815,6 +813,7 @@ class NsWorker(threading.Thread): } self.time_last_task_processed = None self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db + self.idle = True # it is idle when there are not vim_targets associated def insert_task(self, task): try: @@ -884,10 +883,11 @@ class NsWorker(threading.Thread): :return: None. """ try: - target, _, _id = target_id.partition(":") self.db_vims.pop(target_id, None) self.my_vims.pop(target_id, None) - self.vim_targets.remove(target_id) + if target_id in self.vim_targets: + self.vim_targets.remove(target_id) + self.logger.info("Unloaded {}".format(target_id)) rmtree("{}:{}".format(target_id, self.worker_index)) except FileNotFoundError: pass # this is raised by rmtree if folder does not exist @@ -906,7 +906,7 @@ class NsWorker(threading.Thread): unset_dict = {} op_text = "" step = "" - loaded = target_id in self.my_vims + loaded = target_id in self.vim_targets target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns" try: step = "Getting {} from db".format(target_id) @@ -1451,20 +1451,31 @@ class NsWorker(threading.Thread): def run(self): # load database - self.logger.debug("Starting") + self.logger.info("Starting") while True: # step 1: get commands from queue try: - task = self.task_queue.get(block=False if self.my_vims else True) + if self.vim_targets: + task = self.task_queue.get(block=False) + else: + if not self.idle: + self.logger.debug("enters in idle state") + self.idle = True + task = self.task_queue.get(block=True) + self.idle = False + if task[0] == "terminate": break elif task[0] == "load_vim": + self.logger.info("order to load vim {}".format(task[1])) self._load_vim(task[1]) elif task[0] == "unload_vim": + self.logger.info("order to unload vim {}".format(task[1])) self._unload_vim(task[1]) elif task[0] == "reload_vim": self._reload_vim(task[1]) elif task[0] == "check_vim": + self.logger.info("order to check vim {}".format(task[1])) self._check_vim(task[1]) continue except Exception as e: @@ -1487,4 +1498,4 @@ class NsWorker(threading.Thread): except Exception as e: self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) - self.logger.debug("Finishing") + self.logger.info("Finishing") diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py index d7a7c1da..5af766c7 100644 --- a/NG-RO/osm_ng_ro/vim_admin.py +++ b/NG-RO/osm_ng_ro/vim_admin.py @@ -42,7 +42,8 @@ class VimAdminException(Exception): class VimAdminThread(threading.Thread): MAX_TIME_LOCKED = 3600 # 1h - MAX_TIME_UNATTENDED = 60 # 600 # 10min + MAX_TIME_UNATTENDED = 600 # 10min + TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h kafka_topics = ("vim_account", "wim_account", "sdn") def __init__(self, config, engine): @@ -59,30 +60,58 @@ class VimAdminThread(threading.Thread): self.engine = engine self.loop = None self.last_rotask_time = 0 + self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM self.logger = logging.getLogger("ro.vimadmin") self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody async def vim_watcher(self): - """ Reads database periodically looking for tasks not processed by nobody because of a restar + """ Reads database periodically looking for tasks not processed by nobody because of a reboot in order to load this vim""" + # firstly read VIMS not processed + for target_database in ("vim_accounts", "wim_accounts", "sdns"): + unattended_targets = self.db.get_list(target_database, + q_filter={"_admin.operations.operationState": "PROCESSING"}) + for target in unattended_targets: + target_id = "{}:{}".format(target_database[:3], target["_id"]) + self.logger.info("ordered to check {}".format(target_id)) + self.engine.check_vim(target_id) + while not self.to_terminate: now = time() + processed_vims = [] if not self.last_rotask_time: self.last_rotask_time = 0 ro_tasks = self.db.get_list("ro_tasks", - q_filter={"target_id.ncont": self.engine.assignment_list, + q_filter={"target_id.ncont": self.engine.get_assigned_vims(), "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], "locked_at.lt": now - self.MAX_TIME_LOCKED, "to_check_at.gt": self.last_rotask_time, "to_check_at.lte": now - self.MAX_TIME_UNATTENDED}) self.last_rotask_time = now - self.MAX_TIME_UNATTENDED for ro_task in ro_tasks: - if ro_task["target_id"] not in self.engine.assignment_list: - self.engine.assign_vim(ro_task["target_id"]) - self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"])) - - await asyncio.sleep(300, loop=self.loop) + # if already checked ignore + if ro_task["target_id"] in processed_vims: + continue + processed_vims.append(ro_task["target_id"]) + # if already assigned ignore + if ro_task["target_id"] in self.engine.get_assigned_vims(): + continue + # if there is some task locked on this VIM, there is an RO working on it, so ignore + if self.db.get_list("ro_tasks", + q_filter={"target_id": ro_task["target_id"], + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at.gt": now - self.MAX_TIME_LOCKED}): + continue + # unattended, assign vim + self.engine.assign_vim(ro_task["target_id"]) + self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"])) + + # every 2 hours check if there are vims without any ro_task and unload it + if now > self.next_check_unused_vim: + self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM + self.engine.unload_unused_vims() + await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop) async def aiomain(self): kafka_working = True @@ -156,7 +185,7 @@ class VimAdminThread(threading.Thread): except (DbException, MsgException) as e: raise VimAdminException(str(e), http_code=e.http_code) - self.logger.debug("Starting") + self.logger.info("Starting") while not self.to_terminate: try: self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop)) @@ -166,7 +195,7 @@ class VimAdminThread(threading.Thread): if not self.to_terminate: self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) - self.logger.debug("Finishing") + self.logger.info("Finishing") self._stop() self.loop.close() -- 2.17.1