X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fvim_admin.py;h=5af766c70fa9df14233a2ac1d264a6db4f850b5c;hb=refs%2Fchanges%2F80%2F10080%2F6;hp=d7a7c1dab5750055a8b47bf0d7b7372cff2b7e1b;hpb=b64cf0c1aab5292a6d32e9d57bedfb28136b54ed;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py index d7a7c1da..5af766c7 100644 --- a/NG-RO/osm_ng_ro/vim_admin.py +++ b/NG-RO/osm_ng_ro/vim_admin.py @@ -42,7 +42,8 @@ class VimAdminException(Exception): class VimAdminThread(threading.Thread): MAX_TIME_LOCKED = 3600 # 1h - MAX_TIME_UNATTENDED = 60 # 600 # 10min + MAX_TIME_UNATTENDED = 600 # 10min + TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h kafka_topics = ("vim_account", "wim_account", "sdn") def __init__(self, config, engine): @@ -59,30 +60,58 @@ class VimAdminThread(threading.Thread): self.engine = engine self.loop = None self.last_rotask_time = 0 + self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM self.logger = logging.getLogger("ro.vimadmin") self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody async def vim_watcher(self): - """ Reads database periodically looking for tasks not processed by nobody because of a restar + """ Reads database periodically looking for tasks not processed by nobody because of a reboot in order to load this vim""" + # firstly read VIMS not processed + for target_database in ("vim_accounts", "wim_accounts", "sdns"): + unattended_targets = self.db.get_list(target_database, + q_filter={"_admin.operations.operationState": "PROCESSING"}) + for target in unattended_targets: + target_id = "{}:{}".format(target_database[:3], target["_id"]) + self.logger.info("ordered to check {}".format(target_id)) + self.engine.check_vim(target_id) + while not self.to_terminate: now = time() + processed_vims = [] if not self.last_rotask_time: self.last_rotask_time = 0 ro_tasks = self.db.get_list("ro_tasks", - q_filter={"target_id.ncont": self.engine.assignment_list, + q_filter={"target_id.ncont": self.engine.get_assigned_vims(), "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], "locked_at.lt": now - self.MAX_TIME_LOCKED, "to_check_at.gt": self.last_rotask_time, "to_check_at.lte": now - self.MAX_TIME_UNATTENDED}) self.last_rotask_time = now - self.MAX_TIME_UNATTENDED for ro_task in ro_tasks: - if ro_task["target_id"] not in self.engine.assignment_list: - self.engine.assign_vim(ro_task["target_id"]) - self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"])) - - await asyncio.sleep(300, loop=self.loop) + # if already checked ignore + if ro_task["target_id"] in processed_vims: + continue + processed_vims.append(ro_task["target_id"]) + # if already assigned ignore + if ro_task["target_id"] in self.engine.get_assigned_vims(): + continue + # if there is some task locked on this VIM, there is an RO working on it, so ignore + if self.db.get_list("ro_tasks", + q_filter={"target_id": ro_task["target_id"], + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at.gt": now - self.MAX_TIME_LOCKED}): + continue + # unattended, assign vim + self.engine.assign_vim(ro_task["target_id"]) + self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"])) + + # every 2 hours check if there are vims without any ro_task and unload it + if now > self.next_check_unused_vim: + self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM + self.engine.unload_unused_vims() + await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop) async def aiomain(self): kafka_working = True @@ -156,7 +185,7 @@ class VimAdminThread(threading.Thread): except (DbException, MsgException) as e: raise VimAdminException(str(e), http_code=e.http_code) - self.logger.debug("Starting") + self.logger.info("Starting") while not self.to_terminate: try: self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop)) @@ -166,7 +195,7 @@ class VimAdminThread(threading.Thread): if not self.to_terminate: self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) - self.logger.debug("Finishing") + self.logger.info("Finishing") self._stop() self.loop.close()