It is in charge of load tasks assigned to VIMs that nobody is in chage of it
"""
-import logging
-import threading
import asyncio
from http import HTTPStatus
+import logging
+import threading
+from time import time
-from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common import dbmemory, dbmongo, msgkafka, msglocal
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
-from time import time
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class VimAdminException(Exception):
-
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
self.http_code = http_code
Exception.__init__(self, message)
"initial_lock_time": database_object["locked_at"],
"locked_at": database_object["locked_at"],
"thread": thread_object,
- "unlocked": False # True when it is not needed any more
+ "unlocked": False, # True when it is not needed any more
}
LockRenew.renew_list.append(lock_object)
+
return lock_object
@staticmethod
async def renew_locks(self):
while not self.to_terminate:
if not self.renew_list:
- await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
+ await asyncio.sleep(
+ self.task_locked_time - self.task_relock_time, loop=self.loop
+ )
continue
+
lock_object = self.renew_list[0]
- if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
+
+ if (
+ lock_object["unlocked"]
+ or not lock_object["thread"]
+ or not lock_object["thread"].is_alive()
+ ):
# task has been finished or locker thread is dead, not needed to re-locked.
self.renew_list.pop(0)
continue
locked_at = lock_object["locked_at"]
now = time()
- time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
+ time_to_relock = (
+ locked_at + self.task_locked_time - self.task_relock_time - now
+ )
+
if time_to_relock < 1:
if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
self.renew_list.pop(0)
# re-lock
new_locked_at = locked_at + self.task_locked_time
+
try:
- if self.db.set_one(lock_object["table"],
- update_dict={"locked_at": new_locked_at, "modified_at": now},
- q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
- fail_on_empty=False):
- self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
+ if self.db.set_one(
+ lock_object["table"],
+ update_dict={
+ "locked_at": new_locked_at,
+ "modified_at": now,
+ },
+ q_filter={
+ "_id": lock_object["_id"],
+ "locked_at": locked_at,
+ },
+ fail_on_empty=False,
+ ):
+ self.logger.debug(
+ "Renew lock for {}.{}".format(
+ lock_object["table"], lock_object["_id"]
+ )
+ )
lock_object["locked_at"] = new_locked_at
self.renew_list.append(lock_object)
else:
- self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
- lock_object["_id"]))
+ self.logger.info(
+ "Cannot renew lock for {}.{}".format(
+ lock_object["table"], lock_object["_id"]
+ )
+ )
except Exception as e:
- self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
- lock_object["table"], lock_object["_id"], e))
+ self.logger.error(
+ "Exception when trying to renew lock for {}.{}: {}".format(
+ lock_object["table"], lock_object["_id"], e
+ )
+ )
else:
# wait until it is time to re-lock it
await asyncio.sleep(time_to_relock, loop=self.loop)
def stop(self):
# unlock all locked items
now = time()
+
for lock_object in self.renew_list:
locked_at = lock_object["locked_at"]
+
if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
- self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
- q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
- fail_on_empty=False)
+ self.db.set_one(
+ lock_object["table"],
+ update_dict={"locked_at": 0},
+ q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+ fail_on_empty=False,
+ )
class VimAdminThread(threading.Thread):
self.last_rotask_time = 0
self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
self.logger = logging.getLogger("ro.vimadmin")
- self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
- self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
+ # asyncio task for receiving vim actions from kafka bus
+ self.aiomain_task_kafka = None
+ # asyncio task for watching ro_tasks not processed by nobody
+ self.aiomain_task_vim = None
self.aiomain_task_renew_lock = None
# ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
self.lock_renew = LockRenew(config, self.logger)
self.task_locked_time = config["global"]["task_locked_time"]
async def vim_watcher(self):
- """ Reads database periodically looking for tasks not processed by nobody because of a reboot
+ """Reads database periodically looking for tasks not processed by nobody because of a reboot
in order to load this vim"""
# firstly read VIMS not processed
for target_database in ("vim_accounts", "wim_accounts", "sdns"):
- unattended_targets = self.db.get_list(target_database,
- q_filter={"_admin.operations.operationState": "PROCESSING"})
+ unattended_targets = self.db.get_list(
+ target_database,
+ q_filter={"_admin.operations.operationState": "PROCESSING"},
+ )
+
for target in unattended_targets:
target_id = "{}:{}".format(target_database[:3], target["_id"])
self.logger.info("ordered to check {}".format(target_id))
while not self.to_terminate:
now = time()
processed_vims = []
+
if not self.last_rotask_time:
self.last_rotask_time = 0
- ro_tasks = self.db.get_list("ro_tasks",
- q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
- "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.task_locked_time,
- "to_check_at.gt": self.last_rotask_time,
- "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+
+ ro_tasks = self.db.get_list(
+ "ro_tasks",
+ q_filter={
+ "target_id.ncont": self.engine.get_assigned_vims(),
+ "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+ "locked_at.lt": now - self.task_locked_time,
+ "to_check_at.gt": self.last_rotask_time,
+ "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
+ },
+ )
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+
for ro_task in ro_tasks:
# if already checked ignore
if ro_task["target_id"] in processed_vims:
continue
+
processed_vims.append(ro_task["target_id"])
+
# if already assigned ignore
if ro_task["target_id"] in self.engine.get_assigned_vims():
continue
+
# if there is some task locked on this VIM, there is an RO working on it, so ignore
- if self.db.get_list("ro_tasks",
- q_filter={"target_id": ro_task["target_id"],
- "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.gt": now - self.task_locked_time}):
+ if self.db.get_list(
+ "ro_tasks",
+ q_filter={
+ "target_id": ro_task["target_id"],
+ "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
+ "locked_at.gt": now - self.task_locked_time,
+ },
+ ):
continue
+
# unattended, assign vim
self.engine.assign_vim(ro_task["target_id"])
- self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+ self.logger.debug(
+ "ordered to load {}. Inactivity detected".format(
+ ro_task["target_id"]
+ )
+ )
# every 2 hours check if there are vims without any ro_task and unload it
if now > self.next_check_unused_vim:
self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
self.engine.unload_unused_vims()
+
await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
async def aiomain(self):
try:
if not self.aiomain_task_kafka:
# await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
- await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+ for kafka_topic in self.kafka_topics:
+ await self.msg.aiowrite(
+ kafka_topic, "echo", "dummy message", loop=self.loop
+ )
kafka_working = True
self.logger.debug("Starting vim_account subscription task")
self.aiomain_task_kafka = asyncio.ensure_future(
- self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
- aiocallback=self._msg_callback),
- loop=self.loop)
+ self.msg.aioread(
+ self.kafka_topics,
+ loop=self.loop,
+ group_id=False,
+ aiocallback=self._msg_callback,
+ ),
+ loop=self.loop,
+ )
+
if not self.aiomain_task_vim:
self.aiomain_task_vim = asyncio.ensure_future(
- self.vim_watcher(),
- loop=self.loop)
+ self.vim_watcher(), loop=self.loop
+ )
+
if not self.aiomain_task_renew_lock:
- self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
+ self.aiomain_task_renew_lock = asyncio.ensure_future(
+ self.lock_renew.renew_locks(), loop=self.loop
+ )
done, _ = await asyncio.wait(
- [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
- timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ [
+ self.aiomain_task_kafka,
+ self.aiomain_task_vim,
+ self.aiomain_task_renew_lock,
+ ],
+ timeout=None,
+ loop=self.loop,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
try:
if self.aiomain_task_kafka in done:
exc = self.aiomain_task_kafka.exception()
- self.logger.error("kafka subscription task exception: {}".format(exc))
+ self.logger.error(
+ "kafka subscription task exception: {}".format(exc)
+ )
self.aiomain_task_kafka = None
+
if self.aiomain_task_vim in done:
exc = self.aiomain_task_vim.exception()
- self.logger.error("vim_account watcher task exception: {}".format(exc))
+ self.logger.error(
+ "vim_account watcher task exception: {}".format(exc)
+ )
self.aiomain_task_vim = None
+
if self.aiomain_task_renew_lock in done:
exc = self.aiomain_task_renew_lock.exception()
self.logger.error("renew_locks task exception: {}".format(exc))
self.aiomain_task_renew_lock = None
except asyncio.CancelledError:
- pass
+ self.logger.exception("asyncio.CancelledError occured.")
except Exception as e:
if self.to_terminate:
return
+
if kafka_working:
# logging only first time
- self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+ self.logger.critical(
+ "Error accessing kafka '{}'. Retrying ...".format(e)
+ )
kafka_working = False
+
await asyncio.sleep(10, loop=self.loop)
def run(self):
self.db = dbmemory.DbMemory()
self.db.db_connect(self.config["database"])
else:
- raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
- self.config["database"]["driver"]))
+ raise VimAdminException(
+ "Invalid configuration param '{}' at '[database]':'driver'".format(
+ self.config["database"]["driver"]
+ )
+ )
+
self.lock_renew.start(self.db, self.loop)
if not self.msg:
config_msg = self.config["message"].copy()
config_msg["loop"] = self.loop
+
if config_msg["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg.connect(config_msg)
self.msg = msgkafka.MsgKafka()
self.msg.connect(config_msg)
else:
- raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
- config_msg["driver"]))
+ raise VimAdminException(
+ "Invalid configuration param '{}' at '[message]':'driver'".format(
+ config_msg["driver"]
+ )
+ )
except (DbException, MsgException) as e:
raise VimAdminException(str(e), http_code=e.http_code)
self.logger.info("Starting")
while not self.to_terminate:
try:
- self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
+ self.loop.run_until_complete(
+ asyncio.ensure_future(self.aiomain(), loop=self.loop)
+ )
# except asyncio.CancelledError:
# break # if cancelled it should end, breaking loop
except Exception as e:
if not self.to_terminate:
- self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+ self.logger.exception(
+ "Exception '{}' at messaging read loop".format(e), exc_info=True
+ )
self.logger.info("Finishing")
self._stop()
try:
if command == "echo":
return
+
if topic in self.kafka_topics:
- target = topic[0:3] # vim, wim or sdn
+ target = topic[0:3] # vim, wim or sdn
target_id = target + ":" + params["_id"]
+
if command in ("edited", "edit"):
self.engine.reload_vim(target_id)
self.logger.debug("ordered to reload {}".format(target_id))
elif command in ("create", "created"):
self.engine.check_vim(target_id)
self.logger.debug("ordered to check {}".format(target_id))
-
except (DbException, MsgException) as e:
- self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+ self.logger.error(
+ "Error while processing topic={} command={}: {}".format(
+ topic, command, e
+ )
+ )
except Exception as e:
- self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
- exc_info=True)
+ self.logger.exception(
+ "Exception while processing topic={} command={}: {}".format(
+ topic, command, e
+ ),
+ exc_info=True,
+ )
def _stop(self):
"""
try:
if self.db:
self.db.db_disconnect()
+
if self.msg:
self.msg.disconnect()
except (DbException, MsgException) as e:
"""
self.to_terminate = True
self.lock_renew.to_terminate = True
+
if self.aiomain_task_kafka:
self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+
if self.aiomain_task_vim:
self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+
if self.aiomain_task_renew_lock:
self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+
self.lock_renew.stop()