X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=2fc479feb3da55673ee9632f3950a1989eba26ab;hb=cf47a3b2ef1fe3e095c81dc6033c67b067adb08c;hp=9f0e31061a7a36c1e91ceedeb2cd80c0a35cb0bb;hpb=a27dc53c6acd967ea17f0d720a82b23a8404cbfa;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 9f0e310..2fc479f 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -28,7 +28,6 @@ import logging import logging.handlers import getopt import sys -import configparser from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient @@ -64,7 +63,6 @@ min_common_version = "0.1.19" class Lcm: - ping_interval_pace = ( 120 # how many time ping is send once is confirmed all is running ) @@ -72,7 +70,7 @@ class Lcm: main_config = LcmCfg() - def __init__(self, config_file, loop=None): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -98,7 +96,6 @@ class Lcm: self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) # TODO: check if lcm_hc.py is necessary self.health_check_file = get_health_check_file(self.main_config.to_dict()) - self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice ) = ( @@ -170,7 +167,7 @@ class Lcm: # copy message configuration in order to remove 'group_id' for msg_admin config_message = self.main_config.message.to_dict() - config_message["loop"] = self.loop + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -207,12 +204,12 @@ class Lcm: # try new RO, if fail old RO try: self.main_config.RO.uri = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = True except Exception: self.main_config.RO.uri = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): @@ -264,7 +261,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -275,7 +271,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -297,7 +293,7 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) def kafka_read_callback(self, topic, command, params): order_id = 1 @@ -319,7 +315,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -349,6 +345,13 @@ class Lcm: "k8scluster", k8scluster_id, order_id, "k8scluster_create", task ) return + elif command == "edit" or command == "edited": + k8scluster_id = params.get("_id") + task = asyncio.ensure_future(self.k8scluster.edit(params, order_id)) + self.lcm_tasks.register( + "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task + ) + return elif command == "delete" or command == "deleted": k8scluster_id = params.get("_id") task = asyncio.ensure_future(self.k8scluster.delete(params, order_id)) @@ -362,6 +365,11 @@ class Lcm: task = asyncio.ensure_future(self.vca.create(params, order_id)) self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task) return + elif command == "edit" or command == "edited": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.edit(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task) + return elif command == "delete" or command == "deleted": vca_id = params.get("_id") task = asyncio.ensure_future(self.vca.delete(params, order_id)) @@ -481,7 +489,7 @@ class Lcm: db_nsr["config-status"], db_nsr["detailed-status"], db_nsr["_admin"]["deployed"], - self.lcm_ns_tasks.get(nsr_id), + self.lcm_tasks.task_registry["ns"].get(nsr_id, ""), ) ) except Exception as e: @@ -543,7 +551,7 @@ class Lcm: db_nsir["config-status"], db_nsir["detailed-status"], db_nsir["_admin"]["deployed"], - self.lcm_netslice_tasks.get(nsir_id), + self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""), ) ) except Exception as e: @@ -662,11 +670,10 @@ class Lcm: topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, self.kafka_read_callback, from_beginning=True ), self.msg_admin.aioread( topics_admin, - self.loop, self.kafka_read_callback, group_id=False, ), @@ -690,43 +697,35 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + async def start(self): # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns - ) - self.vim = vim_sdn.VimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.wim = vim_sdn.WimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.sdn = vim_sdn.SdnLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.vca = vim_sdn.VcaLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) - ) + await self.kafka_read_ping() # TODO # self.logger.debug("Terminating cancelling creation tasks") @@ -734,12 +733,10 @@ class Lcm: # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -750,37 +747,12 @@ class Lcm: self.fs.fs_disconnect() def read_config_file(self, config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean - conf = {} try: - # read file as yaml format - config = configparser.ConfigParser(inline_comment_prefixes="#") - config.read(config_file) - conf = {s: dict(config.items(s)) for s in config.sections()} + with open(config_file) as f: + return yaml.safe_load(f) except Exception as e: self.logger.critical("At config file '{}': {}".format(config_file, e)) - self.logger.critical("Trying to load config as legacy mode") - try: - with open(config_file) as f: - conf = yaml.safe_load(f) - # Ensure all sections are not empty - for k in ( - "global", - "timeout", - "RO", - "VCA", - "database", - "storage", - "message", - ): - if not conf.get(k): - conf[k] = {} - except Exception as e: - self.logger.critical("At config file '{}': {}".format(config_file, e)) - exit(1) - return conf + exit(1) @staticmethod def get_process_id(): @@ -818,7 +790,6 @@ def usage(): if __name__ == "__main__": - try: # print("SYS.PATH='{}'".format(sys.path)) # load parameters and configuration @@ -873,7 +844,7 @@ if __name__ == "__main__": ) exit(1) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()