X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=2fc479feb3da55673ee9632f3950a1989eba26ab;hb=refs%2Fchanges%2F47%2F13047%2F4;hp=4bffba9396193c1fc641409e6645025ccec90608;hpb=8bbeeb07c4be6b2396549af9ca8c389d4b888809;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 4bffba9..2fc479f 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -63,7 +63,6 @@ min_common_version = "0.1.19" class Lcm: - ping_interval_pace = ( 120 # how many time ping is send once is confirmed all is running ) @@ -71,7 +70,7 @@ class Lcm: main_config = LcmCfg() - def __init__(self, config_file, loop=None): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -97,7 +96,6 @@ class Lcm: self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) # TODO: check if lcm_hc.py is necessary self.health_check_file = get_health_check_file(self.main_config.to_dict()) - self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice ) = ( @@ -169,7 +167,7 @@ class Lcm: # copy message configuration in order to remove 'group_id' for msg_admin config_message = self.main_config.message.to_dict() - config_message["loop"] = self.loop + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -206,12 +204,12 @@ class Lcm: # try new RO, if fail old RO try: self.main_config.RO.uri = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = True except Exception: self.main_config.RO.uri = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): @@ -263,7 +261,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -274,7 +271,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -296,7 +293,7 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) def kafka_read_callback(self, topic, command, params): order_id = 1 @@ -318,7 +315,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -673,11 +670,10 @@ class Lcm: topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, self.kafka_read_callback, from_beginning=True ), self.msg_admin.aioread( topics_admin, - self.loop, self.kafka_read_callback, group_id=False, ), @@ -701,43 +697,35 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + async def start(self): # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns - ) - self.vim = vim_sdn.VimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.wim = vim_sdn.WimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.sdn = vim_sdn.SdnLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.vca = vim_sdn.VcaLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) - ) + await self.kafka_read_ping() # TODO # self.logger.debug("Terminating cancelling creation tasks") @@ -745,12 +733,10 @@ class Lcm: # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -804,7 +790,6 @@ def usage(): if __name__ == "__main__": - try: # print("SYS.PATH='{}'".format(sys.path)) # load parameters and configuration @@ -859,7 +844,7 @@ if __name__ == "__main__": ) exit(1) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()