It is in charge of load tasks assigned to VIMs that nobody is in chage of it
"""
-import logging
-import threading
import asyncio
from http import HTTPStatus
+import logging
+import threading
+from time import time
-from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common import dbmemory, dbmongo, msgkafka, msglocal
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
-from time import time
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class LockRenew:
-
renew_list = []
# ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
# be renewed. The time order is achieved as it is appended at the end
self.config = config
self.logger = logger
self.to_terminate = False
- self.loop = None
self.db = None
self.task_locked_time = config["global"]["task_locked_time"]
self.task_relock_time = config["global"]["task_relock_time"]
self.task_max_locked_time = config["global"]["task_max_locked_time"]
- def start(self, db, loop):
+ def start(self, db):
self.db = db
- self.loop = loop
@staticmethod
def add_lock_object(database_table, database_object, thread_object):
async def renew_locks(self):
while not self.to_terminate:
if not self.renew_list:
- await asyncio.sleep(
- self.task_locked_time - self.task_relock_time, loop=self.loop
- )
+ await asyncio.sleep(self.task_locked_time - self.task_relock_time)
continue
lock_object = self.renew_list[0]
)
else:
# wait until it is time to re-lock it
- await asyncio.sleep(time_to_relock, loop=self.loop)
+ await asyncio.sleep(time_to_relock)
def stop(self):
# unlock all locked items
self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
self.engine.unload_unused_vims()
- await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
+ await asyncio.sleep(self.MAX_TIME_UNATTENDED)
async def aiomain(self):
kafka_working = True
while not self.to_terminate:
try:
if not self.aiomain_task_kafka:
- # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
- await self.msg.aiowrite(
- "vim_account", "echo", "dummy message", loop=self.loop
- )
+ for kafka_topic in self.kafka_topics:
+ await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
kafka_working = True
self.logger.debug("Starting vim_account subscription task")
self.aiomain_task_kafka = asyncio.ensure_future(
self.msg.aioread(
self.kafka_topics,
- loop=self.loop,
group_id=False,
aiocallback=self._msg_callback,
),
- loop=self.loop,
)
if not self.aiomain_task_vim:
- self.aiomain_task_vim = asyncio.ensure_future(
- self.vim_watcher(), loop=self.loop
- )
+ self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
if not self.aiomain_task_renew_lock:
self.aiomain_task_renew_lock = asyncio.ensure_future(
- self.lock_renew.renew_locks(), loop=self.loop
+ self.lock_renew.renew_locks()
)
done, _ = await asyncio.wait(
self.aiomain_task_renew_lock,
],
timeout=None,
- loop=self.loop,
return_when=asyncio.FIRST_COMPLETED,
)
self.logger.error("renew_locks task exception: {}".format(exc))
self.aiomain_task_renew_lock = None
except asyncio.CancelledError:
- pass
+ self.logger.exception("asyncio.CancelledError occured.")
except Exception as e:
if self.to_terminate:
)
kafka_working = False
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
def run(self):
"""
)
)
- self.lock_renew.start(self.db, self.loop)
+ self.lock_renew.start(self.db)
if not self.msg:
config_msg = self.config["message"].copy()
- config_msg["loop"] = self.loop
if config_msg["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.logger.info("Starting")
while not self.to_terminate:
try:
- self.loop.run_until_complete(
- asyncio.ensure_future(self.aiomain(), loop=self.loop)
- )
- # except asyncio.CancelledError:
- # break # if cancelled it should end, breaking loop
+ asyncio.run(self.main_task())
except Exception as e:
if not self.to_terminate:
self.logger.exception(
self._stop()
self.loop.close()
+ async def main_task(self):
+ task = asyncio.ensure_future(self.aiomain())
+ await task
+
async def _msg_callback(self, topic, command, params):
"""
Callback to process a received message from kafka
self.lock_renew.to_terminate = True
if self.aiomain_task_kafka:
- self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+ self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel())
if self.aiomain_task_vim:
- self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
+ self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel())
if self.aiomain_task_renew_lock:
- self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
+ self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel())
self.lock_renew.stop()