X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=d6c10e8fbcd54466abb80f615a144b58f0a97ea9;hb=HEAD;hp=5638943c6fb01bd934bf653355fbe30a402e7cb5;hpb=a89a5a7e3b0421a5ae1859235dfef6b6adf24279;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5638943..f8a97ae 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -19,15 +19,16 @@ # DEBUG WITH PDB -import os import pdb +import os import asyncio import yaml import logging import logging.handlers import getopt import sys +from random import SystemRandom from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient @@ -46,12 +47,11 @@ from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_hc import get_health_check_file -from os import path -from random import choice as random_choice +from os import path, getenv from n2vc import version as n2vc_version import traceback -if os.getenv("OSMLCM_PDB_DEBUG", None) is not None: +if getenv("OSMLCM_PDB_DEBUG", None) is not None: pdb.set_trace() @@ -63,7 +63,6 @@ min_common_version = "0.1.19" class Lcm: - ping_interval_pace = ( 120 # how many time ping is send once is confirmed all is running ) @@ -71,7 +70,7 @@ class Lcm: main_config = LcmCfg() - def __init__(self, config_file, loop=None): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -97,7 +96,6 @@ class Lcm: self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) # TODO: check if lcm_hc.py is necessary self.health_check_file = get_health_check_file(self.main_config.to_dict()) - self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice ) = ( @@ -169,7 +167,7 @@ class Lcm: # copy message configuration in order to remove 'group_id' for msg_admin config_message = self.main_config.message.to_dict() - config_message["loop"] = self.loop + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -206,12 +204,12 @@ class Lcm: # try new RO, if fail old RO try: self.main_config.RO.uri = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = True except Exception: self.main_config.RO.uri = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): @@ -263,7 +261,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -274,7 +271,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -296,9 +293,9 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -318,7 +315,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -336,6 +333,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -348,6 +383,13 @@ class Lcm: "k8scluster", k8scluster_id, order_id, "k8scluster_create", task ) return + elif command == "edit" or command == "edited": + k8scluster_id = params.get("_id") + task = asyncio.ensure_future(self.k8scluster.edit(params, order_id)) + self.lcm_tasks.register( + "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task + ) + return elif command == "delete" or command == "deleted": k8scluster_id = params.get("_id") task = asyncio.ensure_future(self.k8scluster.delete(params, order_id)) @@ -361,6 +403,11 @@ class Lcm: task = asyncio.ensure_future(self.vca.create(params, order_id)) self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task) return + elif command == "edit" or command == "edited": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.edit(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task) + return elif command == "delete" or command == "deleted": vca_id = params.get("_id") task = asyncio.ensure_future(self.vca.delete(params, order_id)) @@ -480,7 +527,7 @@ class Lcm: db_nsr["config-status"], db_nsr["detailed-status"], db_nsr["_admin"]["deployed"], - self.lcm_ns_tasks.get(nsr_id), + self.lcm_tasks.task_registry["ns"].get(nsr_id, ""), ) ) except Exception as e: @@ -542,7 +589,7 @@ class Lcm: db_nsir["config-status"], db_nsir["detailed-status"], db_nsir["_admin"]["deployed"], - self.lcm_netslice_tasks.get(nsir_id), + self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""), ) ) except Exception as e: @@ -642,7 +689,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -657,16 +703,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.loop, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -689,43 +737,34 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + async def start(self): # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns - ) - self.vim = vim_sdn.VimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.wim = vim_sdn.WimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.sdn = vim_sdn.SdnLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.vca = vim_sdn.VcaLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) - ) + await self.kafka_read_ping() # TODO # self.logger.debug("Terminating cancelling creation tasks") @@ -733,12 +772,10 @@ class Lcm: # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -763,18 +800,22 @@ class Lcm: will provide a random one :return: Obtained ID """ - # Try getting docker id. If fails, get pid - try: - with open("/proc/self/cgroup", "r") as f: - text_id_ = f.readline() - _, _, text_id = text_id_.rpartition("/") - text_id = text_id.replace("\n", "")[:12] - if text_id: - return text_id - except Exception: - pass - # Return a random id - return "".join(random_choice("0123456789abcdef") for _ in range(12)) + + def get_docker_id(): + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + return text_id.replace("\n", "")[:12] + except Exception: + return None + + def generate_random_id(): + return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12)) + + # Try getting docker id. If it fails, generate a random id + docker_id = get_docker_id() + return docker_id if docker_id else generate_random_id() def usage(): @@ -792,7 +833,6 @@ def usage(): if __name__ == "__main__": - try: # print("SYS.PATH='{}'".format(sys.path)) # load parameters and configuration @@ -816,14 +856,9 @@ if __name__ == "__main__": from osm_lcm.lcm_hc import health_check health_check(config_file, Lcm.ping_interval_pace) - # elif o == "--log-socket-port": - # log_socket_port = a - # elif o == "--log-socket-host": - # log_socket_host = a - # elif o == "--log-file": - # log_file = a else: - assert False, "Unhandled option" + print(f"Unhandled option: {o}") + exit(1) if config_file: if not path.isfile(config_file): @@ -846,8 +881,9 @@ if __name__ == "__main__": file=sys.stderr, ) exit(1) + config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file))) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()