fix 1385: enhance unload vim from ns_thread when not needed
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
index d7a7c1d..5af766c 100644 (file)
@@ -42,7 +42,8 @@ class VimAdminException(Exception):
 
 class VimAdminThread(threading.Thread):
     MAX_TIME_LOCKED = 3600  # 1h
-    MAX_TIME_UNATTENDED = 60  # 600  # 10min
+    MAX_TIME_UNATTENDED = 600  # 10min
+    TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
     kafka_topics = ("vim_account", "wim_account", "sdn")
 
     def __init__(self, config, engine):
@@ -59,30 +60,58 @@ class VimAdminThread(threading.Thread):
         self.engine = engine
         self.loop = None
         self.last_rotask_time = 0
+        self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
         self.logger = logging.getLogger("ro.vimadmin")
         self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
         self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
 
     async def vim_watcher(self):
-        """ Reads database periodically looking for tasks not processed by nobody because of a restar
+        """ Reads database periodically looking for tasks not processed by nobody because of a reboot
         in order to load this vim"""
+        # firstly read VIMS not processed
+        for target_database in ("vim_accounts", "wim_accounts", "sdns"):
+            unattended_targets = self.db.get_list(target_database,
+                                                  q_filter={"_admin.operations.operationState": "PROCESSING"})
+            for target in unattended_targets:
+                target_id = "{}:{}".format(target_database[:3], target["_id"])
+                self.logger.info("ordered to check {}".format(target_id))
+                self.engine.check_vim(target_id)
+
         while not self.to_terminate:
             now = time()
+            processed_vims = []
             if not self.last_rotask_time:
                 self.last_rotask_time = 0
             ro_tasks = self.db.get_list("ro_tasks",
-                                        q_filter={"target_id.ncont": self.engine.assignment_list,
+                                        q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
                                                   "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
                                                   "locked_at.lt": now - self.MAX_TIME_LOCKED,
                                                   "to_check_at.gt": self.last_rotask_time,
                                                   "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
             for ro_task in ro_tasks:
-                if ro_task["target_id"] not in self.engine.assignment_list:
-                    self.engine.assign_vim(ro_task["target_id"])
-                    self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
-
-            await asyncio.sleep(300, loop=self.loop)
+                # if already checked ignore
+                if ro_task["target_id"] in processed_vims:
+                    continue
+                processed_vims.append(ro_task["target_id"])
+                # if already assigned ignore
+                if ro_task["target_id"] in self.engine.get_assigned_vims():
+                    continue
+                # if there is some task locked on this VIM, there is an RO working on it, so ignore
+                if self.db.get_list("ro_tasks",
+                                    q_filter={"target_id": ro_task["target_id"],
+                                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                              "locked_at.gt": now - self.MAX_TIME_LOCKED}):
+                    continue
+                # unattended, assign vim
+                self.engine.assign_vim(ro_task["target_id"])
+                self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+            # every 2 hours check if there are vims without any ro_task and unload it
+            if now > self.next_check_unused_vim:
+                self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
+                self.engine.unload_unused_vims()
+            await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
 
     async def aiomain(self):
         kafka_working = True
@@ -156,7 +185,7 @@ class VimAdminThread(threading.Thread):
         except (DbException, MsgException) as e:
             raise VimAdminException(str(e), http_code=e.http_code)
 
-        self.logger.debug("Starting")
+        self.logger.info("Starting")
         while not self.to_terminate:
             try:
                 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
@@ -166,7 +195,7 @@ class VimAdminThread(threading.Thread):
                 if not self.to_terminate:
                     self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
 
-        self.logger.debug("Finishing")
+        self.logger.info("Finishing")
         self._stop()
         self.loop.close()