class VimAdminThread(threading.Thread):
MAX_TIME_LOCKED = 3600 # 1h
- MAX_TIME_UNATTENDED = 60 # 600 # 10min
+ MAX_TIME_UNATTENDED = 600 # 10min
+ TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
kafka_topics = ("vim_account", "wim_account", "sdn")
def __init__(self, config, engine):
self.engine = engine
self.loop = None
self.last_rotask_time = 0
+ self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
self.logger = logging.getLogger("ro.vimadmin")
self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
async def vim_watcher(self):
- """ Reads database periodically looking for tasks not processed by nobody because of a restar
+ """ Reads database periodically looking for tasks not processed by nobody because of a reboot
in order to load this vim"""
+ # firstly read VIMS not processed
+ for target_database in ("vim_accounts", "wim_accounts", "sdns"):
+ unattended_targets = self.db.get_list(target_database,
+ q_filter={"_admin.operations.operationState": "PROCESSING"})
+ for target in unattended_targets:
+ target_id = "{}:{}".format(target_database[:3], target["_id"])
+ self.logger.info("ordered to check {}".format(target_id))
+ self.engine.check_vim(target_id)
+
while not self.to_terminate:
now = time()
+ processed_vims = []
if not self.last_rotask_time:
self.last_rotask_time = 0
ro_tasks = self.db.get_list("ro_tasks",
- q_filter={"target_id.ncont": self.engine.assignment_list,
+ q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
"locked_at.lt": now - self.MAX_TIME_LOCKED,
"to_check_at.gt": self.last_rotask_time,
"to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
for ro_task in ro_tasks:
- if ro_task["target_id"] not in self.engine.assignment_list:
- self.engine.assign_vim(ro_task["target_id"])
- self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
-
- await asyncio.sleep(300, loop=self.loop)
+ # if already checked ignore
+ if ro_task["target_id"] in processed_vims:
+ continue
+ processed_vims.append(ro_task["target_id"])
+ # if already assigned ignore
+ if ro_task["target_id"] in self.engine.get_assigned_vims():
+ continue
+ # if there is some task locked on this VIM, there is an RO working on it, so ignore
+ if self.db.get_list("ro_tasks",
+ q_filter={"target_id": ro_task["target_id"],
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+ continue
+ # unattended, assign vim
+ self.engine.assign_vim(ro_task["target_id"])
+ self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+ # every 2 hours check if there are vims without any ro_task and unload it
+ if now > self.next_check_unused_vim:
+ self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
+ self.engine.unload_unused_vims()
+ await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
async def aiomain(self):
kafka_working = True
except (DbException, MsgException) as e:
raise VimAdminException(str(e), http_code=e.http_code)
- self.logger.debug("Starting")
+ self.logger.info("Starting")
while not self.to_terminate:
try:
self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
if not self.to_terminate:
self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
- self.logger.debug("Finishing")
+ self.logger.info("Finishing")
self._stop()
self.loop.close()