X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=9b62d82eef07a337eebb1445c675f0de256919ff;hb=b6049d378301d13e0ddaadb8433b8ad20f4f8a23;hp=9f0e31061a7a36c1e91ceedeb2cd80c0a35cb0bb;hpb=a27dc53c6acd967ea17f0d720a82b23a8404cbfa;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 9f0e310..9b62d82 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -19,7 +19,6 @@ # DEBUG WITH PDB -import os import pdb import asyncio @@ -28,7 +27,7 @@ import logging import logging.handlers import getopt import sys -import configparser +from random import SystemRandom from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient @@ -47,12 +46,11 @@ from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_hc import get_health_check_file -from os import path -from random import choice as random_choice +from os import path, getenv from n2vc import version as n2vc_version import traceback -if os.getenv("OSMLCM_PDB_DEBUG", None) is not None: +if getenv("OSMLCM_PDB_DEBUG", None) is not None: pdb.set_trace() @@ -64,7 +62,6 @@ min_common_version = "0.1.19" class Lcm: - ping_interval_pace = ( 120 # how many time ping is send once is confirmed all is running ) @@ -72,7 +69,7 @@ class Lcm: main_config = LcmCfg() - def __init__(self, config_file, loop=None): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -98,7 +95,6 @@ class Lcm: self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) # TODO: check if lcm_hc.py is necessary self.health_check_file = get_health_check_file(self.main_config.to_dict()) - self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice ) = ( @@ -170,7 +166,7 @@ class Lcm: # copy message configuration in order to remove 'group_id' for msg_admin config_message = self.main_config.message.to_dict() - config_message["loop"] = self.loop + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -207,12 +203,12 @@ class Lcm: # try new RO, if fail old RO try: self.main_config.RO.uri = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = True except Exception: self.main_config.RO.uri = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): @@ -264,7 +260,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -275,7 +270,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -297,9 +292,9 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -319,7 +314,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -337,6 +332,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -349,6 +382,13 @@ class Lcm: "k8scluster", k8scluster_id, order_id, "k8scluster_create", task ) return + elif command == "edit" or command == "edited": + k8scluster_id = params.get("_id") + task = asyncio.ensure_future(self.k8scluster.edit(params, order_id)) + self.lcm_tasks.register( + "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task + ) + return elif command == "delete" or command == "deleted": k8scluster_id = params.get("_id") task = asyncio.ensure_future(self.k8scluster.delete(params, order_id)) @@ -362,6 +402,11 @@ class Lcm: task = asyncio.ensure_future(self.vca.create(params, order_id)) self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task) return + elif command == "edit" or command == "edited": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.edit(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task) + return elif command == "delete" or command == "deleted": vca_id = params.get("_id") task = asyncio.ensure_future(self.vca.delete(params, order_id)) @@ -481,7 +526,7 @@ class Lcm: db_nsr["config-status"], db_nsr["detailed-status"], db_nsr["_admin"]["deployed"], - self.lcm_ns_tasks.get(nsr_id), + self.lcm_tasks.task_registry["ns"].get(nsr_id, ""), ) ) except Exception as e: @@ -543,7 +588,7 @@ class Lcm: db_nsir["config-status"], db_nsir["detailed-status"], db_nsir["_admin"]["deployed"], - self.lcm_netslice_tasks.get(nsir_id), + self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""), ) ) except Exception as e: @@ -643,7 +688,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -658,16 +702,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.loop, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -690,43 +736,34 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + async def start(self): # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns - ) - self.vim = vim_sdn.VimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.wim = vim_sdn.WimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.sdn = vim_sdn.SdnLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.vca = vim_sdn.VcaLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) - ) + await self.kafka_read_ping() # TODO # self.logger.debug("Terminating cancelling creation tasks") @@ -734,12 +771,10 @@ class Lcm: # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -750,37 +785,12 @@ class Lcm: self.fs.fs_disconnect() def read_config_file(self, config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean - conf = {} try: - # read file as yaml format - config = configparser.ConfigParser(inline_comment_prefixes="#") - config.read(config_file) - conf = {s: dict(config.items(s)) for s in config.sections()} + with open(config_file) as f: + return yaml.safe_load(f) except Exception as e: self.logger.critical("At config file '{}': {}".format(config_file, e)) - self.logger.critical("Trying to load config as legacy mode") - try: - with open(config_file) as f: - conf = yaml.safe_load(f) - # Ensure all sections are not empty - for k in ( - "global", - "timeout", - "RO", - "VCA", - "database", - "storage", - "message", - ): - if not conf.get(k): - conf[k] = {} - except Exception as e: - self.logger.critical("At config file '{}': {}".format(config_file, e)) - exit(1) - return conf + exit(1) @staticmethod def get_process_id(): @@ -789,18 +799,22 @@ class Lcm: will provide a random one :return: Obtained ID """ - # Try getting docker id. If fails, get pid - try: - with open("/proc/self/cgroup", "r") as f: - text_id_ = f.readline() - _, _, text_id = text_id_.rpartition("/") - text_id = text_id.replace("\n", "")[:12] - if text_id: - return text_id - except Exception: - pass - # Return a random id - return "".join(random_choice("0123456789abcdef") for _ in range(12)) + + def get_docker_id(): + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + return text_id.replace("\n", "")[:12] + except Exception: + return None + + def generate_random_id(): + return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12)) + + # Try getting docker id. If it fails, generate a random id + docker_id = get_docker_id() + return docker_id if docker_id else generate_random_id() def usage(): @@ -818,7 +832,6 @@ def usage(): if __name__ == "__main__": - try: # print("SYS.PATH='{}'".format(sys.path)) # load parameters and configuration @@ -842,14 +855,9 @@ if __name__ == "__main__": from osm_lcm.lcm_hc import health_check health_check(config_file, Lcm.ping_interval_pace) - # elif o == "--log-socket-port": - # log_socket_port = a - # elif o == "--log-socket-host": - # log_socket_host = a - # elif o == "--log-file": - # log_file = a else: - assert False, "Unhandled option" + print(f"Unhandled option: {o}") + exit(1) if config_file: if not path.isfile(config_file): @@ -873,7 +881,7 @@ if __name__ == "__main__": ) exit(1) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()