X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=d6c10e8fbcd54466abb80f615a144b58f0a97ea9;hb=HEAD;hp=82947e9d73da21304730277e3b5bffd887c17172;hpb=ca7ece02d870a16bdcd47dab7c5ac30c82c58545;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 82947e9d..77c3d82f 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -19,17 +19,18 @@ # DEBUG WITH PDB -import os import pdb +import os import asyncio import yaml import logging import logging.handlers import getopt import sys +from random import SystemRandom -from osm_lcm import ns, vim_sdn, netslice +from osm_lcm import ns, vim_sdn, netslice, k8s from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException @@ -44,12 +45,14 @@ from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem -from os import environ, path -from random import choice as random_choice +from osm_lcm.data_utils.lcm_config import LcmCfg +from osm_lcm.data_utils.list_utils import find_in_list +from osm_lcm.lcm_hc import get_health_check_file +from os import path, getenv from n2vc import version as n2vc_version import traceback -if os.getenv("OSMLCM_PDB_DEBUG", None) is not None: +if getenv("OSMLCM_PDB_DEBUG", None) is not None: pdb.set_trace() @@ -58,26 +61,24 @@ min_RO_version = "6.0.2" min_n2vc_version = "0.0.2" min_common_version = "0.1.19" -health_check_file = ( - path.expanduser("~") + "/time_last_ping" -) # TODO find better location for this file class Lcm: + profile_collection_mapping = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + } ping_interval_pace = ( 120 # how many time ping is send once is confirmed all is running ) ping_interval_boot = 5 # how many time ping is sent when booting - cfg_logger_name = { - "message": "lcm.msg", - "database": "lcm.db", - "storage": "lcm.fs", - "tsdb": "lcm.prometheus", - } - # ^ contains for each section at lcm.cfg the used logger name - def __init__(self, config_file, loop=None): + main_config = LcmCfg() + + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -97,32 +98,31 @@ class Lcm: self.worker_id = self.get_process_id() # load configuration config = self.read_config_file(config_file) - self.config = config - self.config["ro_config"] = { - "ng": config["RO"].get("ng", False), - "uri": config["RO"].get("uri"), - "tenant": config.get("tenant", "osm"), - "logger_name": "lcm.roclient", - "loglevel": config["RO"].get("loglevel", "ERROR"), - } - if not self.config["ro_config"]["uri"]: - self.config["ro_config"]["uri"] = "http://{}:{}/".format( - config["RO"]["host"], config["RO"]["port"] - ) - elif ( - "/ro" in self.config["ro_config"]["uri"][-4:] - or "/openmano" in self.config["ro_config"]["uri"][-10:] - ): - # uri ends with '/ro', '/ro/', '/openmano', '/openmano/' - index = self.config["ro_config"]["uri"][-1].rfind("/") - self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1] - - self.loop = loop or asyncio.get_event_loop() + self.main_config.set_from_dict(config) + self.main_config.transform() + self.main_config.load_from_env() + self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) + # TODO: check if lcm_hc.py is necessary + self.health_check_file = get_health_check_file(self.main_config.to_dict()) self.ns = ( self.netslice ) = ( self.vim - ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None + ) = ( + self.wim + ) = ( + self.sdn + ) = ( + self.k8scluster + ) = ( + self.vca + ) = ( + self.k8srepo + ) = ( + self.cluster + ) = ( + self.k8s_app + ) = self.k8s_resource = self.k8s_infra_controller = self.k8s_infra_config = None # logging log_format_simple = ( @@ -131,35 +131,35 @@ class Lcm: log_formatter_simple = logging.Formatter( log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S" ) - config["database"]["logger_name"] = "lcm.db" - config["storage"]["logger_name"] = "lcm.fs" - config["message"]["logger_name"] = "lcm.msg" - if config["global"].get("logfile"): + if self.main_config.globalConfig.logfile: file_handler = logging.handlers.RotatingFileHandler( - config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0 + self.main_config.globalConfig.logfile, + maxBytes=100e6, + backupCount=9, + delay=0, ) file_handler.setFormatter(log_formatter_simple) self.logger.addHandler(file_handler) - if not config["global"].get("nologging"): + if not self.main_config.globalConfig.to_dict()["nologging"]: str_handler = logging.StreamHandler() str_handler.setFormatter(log_formatter_simple) self.logger.addHandler(str_handler) - if config["global"].get("loglevel"): - self.logger.setLevel(config["global"]["loglevel"]) + if self.main_config.globalConfig.to_dict()["loglevel"]: + self.logger.setLevel(self.main_config.globalConfig.loglevel) # logging other modules - for k1, logname in self.cfg_logger_name.items(): - config[k1]["logger_name"] = logname - logger_module = logging.getLogger(logname) - if config[k1].get("logfile"): + for logger in ("message", "database", "storage", "tsdb"): + logger_config = self.main_config.to_dict()[logger] + logger_module = logging.getLogger(logger_config["logger_name"]) + if logger_config["logfile"]: file_handler = logging.handlers.RotatingFileHandler( - config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0 + logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0 ) file_handler.setFormatter(log_formatter_simple) logger_module.addHandler(file_handler) - if config[k1].get("loglevel"): - logger_module.setLevel(config[k1]["loglevel"]) + if logger_config["loglevel"]: + logger_module.setLevel(logger_config["loglevel"]) self.logger.critical( "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date) ) @@ -182,14 +182,14 @@ class Lcm: ) try: - self.db = Database(config).instance.db + self.db = Database(self.main_config.to_dict()).instance.db - self.fs = Filesystem(config).instance.fs + self.fs = Filesystem(self.main_config.to_dict()).instance.fs self.fs.sync() # copy message configuration in order to remove 'group_id' for msg_admin - config_message = config["message"].copy() - config_message["loop"] = self.loop + config_message = self.main_config.message.to_dict() + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -205,7 +205,7 @@ class Lcm: else: raise LcmException( "Invalid configuration param '{}' at '[message]':'driver'".format( - config["message"]["driver"] + self.main_config.message.driver ) ) except (DbException, FsException, MsgException) as e: @@ -215,23 +215,31 @@ class Lcm: # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry(self.worker_id, self.logger) + self.logger.info( + "Worker_id: {} main_config: {} lcm tasks: {}".format( + self.worker_id, self.main_config, self.lcm_tasks + ) + ) + async def check_RO_version(self): tries = 14 last_error = None while True: - ro_uri = self.config["ro_config"]["uri"] + ro_uri = self.main_config.RO.uri + if not ro_uri: + ro_uri = "" try: # try new RO, if fail old RO try: - self.config["ro_config"]["uri"] = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.config["ro_config"]) + self.main_config.RO.uri = ro_uri + "ro" + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() - self.config["ro_config"]["ng"] = True + self.main_config.RO.ng = True except Exception: - self.config["ro_config"]["uri"] = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.config["ro_config"]) + self.main_config.RO.uri = ro_uri + "openmano" + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() - self.config["ro_config"]["ng"] = False + self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): raise LcmException( "Not compatible osm/RO version '{}'. Needed '{}' or higher".format( @@ -240,16 +248,16 @@ class Lcm: ) self.logger.info( "Connected to RO version {} new-generation version {}".format( - ro_version, self.config["ro_config"]["ng"] + ro_version, self.main_config.RO.ng ) ) return except (ROClientException, NgRoException) as e: - self.config["ro_config"]["uri"] = ro_uri + self.main_config.RO.uri = ro_uri tries -= 1 traceback.print_tb(e.__traceback__) error_text = "Error while connecting to RO on {}: {}".format( - self.config["ro_config"]["uri"], e + self.main_config.RO.uri, e ) if tries <= 0: self.logger.critical(error_text) @@ -281,7 +289,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -292,7 +299,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -314,11 +321,22 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): - order_id = 1 + def get_operation_params(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationParams", {}) + async def kafka_read_callback(self, topic, command, params): + order_id = 1 + self.logger.info( + "Topic: {} command: {} params: {} order ID: {}".format( + topic, command, params, order_id + ) + ) if topic != "admin" and command != "ping": self.logger.debug( "Task kafka_read receives {} {}: {}".format(topic, command, params) @@ -326,6 +344,11 @@ class Lcm: self.consecutive_errors = 0 self.first_start = False order_id += 1 + self.logger.info( + "Consecutive error: {} First start: {}".format( + self.consecutive_errors, self.first_start + ) + ) if command == "exit": raise LcmExceptionExit elif command.startswith("#"): @@ -336,7 +359,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -345,15 +368,53 @@ class Lcm: return self.pings_not_received = 0 try: - with open(health_check_file, "w") as f: + with open(self.health_check_file, "w") as f: f.write(str(time())) except Exception as e: self.logger.error( "Cannot write into '{}' for healthcheck: {}".format( - health_check_file, e + self.health_check_file, e ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -366,6 +427,13 @@ class Lcm: "k8scluster", k8scluster_id, order_id, "k8scluster_create", task ) return + elif command == "edit" or command == "edited": + k8scluster_id = params.get("_id") + task = asyncio.ensure_future(self.k8scluster.edit(params, order_id)) + self.lcm_tasks.register( + "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task + ) + return elif command == "delete" or command == "deleted": k8scluster_id = params.get("_id") task = asyncio.ensure_future(self.k8scluster.delete(params, order_id)) @@ -379,6 +447,11 @@ class Lcm: task = asyncio.ensure_future(self.vca.create(params, order_id)) self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task) return + elif command == "edit" or command == "edited": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.edit(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task) + return elif command == "delete" or command == "deleted": vca_id = params.get("_id") task = asyncio.ensure_future(self.vca.delete(params, order_id)) @@ -403,9 +476,15 @@ class Lcm: elif topic == "ns": if command == "instantiate": # self.logger.debug("Deploying NS {}".format(nsr_id)) + self.logger.info("NS instantiate") nslcmop = params nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] + self.logger.info( + "NsLCMOP: {} NsLCMOP_ID:{} nsr_id: {}".format( + nslcmop, nslcmop_id, nsr_id + ) + ) task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) self.lcm_tasks.register( "ns", nsr_id, nslcmop_id, "ns_instantiate", task @@ -455,6 +534,14 @@ class Lcm: task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return + elif command == "heal": + # self.logger.debug("Healing NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task) + return elif command == "migrate": nslcmop = params nslcmop_id = nslcmop["_id"] @@ -475,7 +562,7 @@ class Lcm: db_nsr["config-status"], db_nsr["detailed-status"], db_nsr["_admin"]["deployed"], - self.lcm_ns_tasks.get(nsr_id), + self.lcm_tasks.task_registry["ns"].get(nsr_id, ""), ) ) except Exception as e: @@ -486,12 +573,15 @@ class Lcm: return # TODO cleaning of task just in case should be done elif command in ( "vnf_terminated", + "policy_updated", "terminated", "instantiated", "scaled", + "healed", "actioned", "updated", "migrated", + "verticalscaled", ): # "scaled-cooldown-time" return @@ -534,7 +624,7 @@ class Lcm: db_nsir["config-status"], db_nsir["detailed-status"], db_nsir["_admin"]["deployed"], - self.lcm_netslice_tasks.get(nsir_id), + self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""), ) ) except Exception as e: @@ -547,13 +637,29 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", ): # "scaled-cooldown-time" return elif topic == "vim_account": vim_id = params["_id"] + op_id = vim_id + op_params = params + db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) + vim_config = db_vim.get("config", {}) + self.logger.debug("Db Vim: {}".format(db_vim)) if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + self.logger.debug("Main config: {}".format(self.main_config.to_dict())) + if "credentials" in vim_config: + self.logger.info("Vim add cloud credentials") + task = asyncio.ensure_future( + self.cloud_credentials.add(op_id, op_params, db_vim) + ) + self.lcm_tasks.register( + "vim_account", vim_id, op_id, "cloud_credentials_add", task + ) + if not self.main_config.RO.ng: + self.logger.info("Calling RO to create VIM (no NG-RO)") task = asyncio.ensure_future(self.vim.create(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_create", task @@ -561,6 +667,14 @@ class Lcm: return elif command == "delete" or command == "deleted": self.lcm_tasks.cancel(topic, vim_id) + if "credentials" in vim_config: + self.logger.info("Vim remove cloud credentials") + task = asyncio.ensure_future( + self.cloud_credentials.remove(op_id, op_params, db_vim) + ) + self.lcm_tasks.register( + "vim_account", vim_id, op_id, "cloud_credentials_remove", task + ) task = asyncio.ensure_future(self.vim.delete(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_delete", task @@ -571,7 +685,15 @@ class Lcm: sys.stdout.flush() return elif command in ("edit", "edited"): - if not self.config["ro_config"].get("ng"): + if "credentials" in vim_config: + self.logger.info("Vim update cloud credentials") + task = asyncio.ensure_future( + self.cloud_credentials.edit(op_id, op_params, db_vim) + ) + self.lcm_tasks.register( + "vim_account", vim_id, op_id, "cloud_credentials_update", task + ) + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.vim.edit(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_edit", task @@ -582,7 +704,7 @@ class Lcm: elif topic == "wim_account": wim_id = params["_id"] if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.wim.create(params, order_id)) self.lcm_tasks.register( "wim_account", wim_id, order_id, "wim_create", task @@ -610,7 +732,7 @@ class Lcm: elif topic == "sdn": _sdn_id = params["_id"] if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.sdn.create(params, order_id)) self.lcm_tasks.register( "sdn", _sdn_id, order_id, "sdn_create", task @@ -627,15 +749,338 @@ class Lcm: return elif command == "deleted": return # TODO cleaning of task just in case should be done + elif topic == "cluster": + if command != "get_creds": + op_id = params["operation_id"] + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + op_params = self.get_operation_params(db_cluster, op_id) + db_content = { + "cluster": db_cluster, + } + if command == "create" or command == "created": + self.logger.debug("cluster_id = {}".format(cluster_id)) + # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]}) + db_vim = self.db.get_one( + "vim_accounts", {"name": db_cluster["vim_account"]} + ) + db_content["vim_account"] = db_vim + task = asyncio.ensure_future( + self.cluster.create(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "cluster_create", task + ) + return + elif command == "delete" or command == "deleted": + task = asyncio.ensure_future( + self.cluster.delete(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "cluster_delete", task + ) + return + elif command == "add" or command == "added": + profile_type = params["profile_type"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one( + profile_collection, {"_id": params["profile_id"]} + ) + db_profile["profile_type"] = profile_type + db_content["profile"] = db_profile + task = asyncio.ensure_future( + self.cluster.attach_profile(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "profile_add", task + ) + return + elif command == "remove" or command == "removed": + profile_type = params["profile_type"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one( + profile_collection, {"_id": params["profile_id"]} + ) + db_profile["profile_type"] = profile_type + db_content["profile"] = db_profile + task = asyncio.ensure_future( + self.cluster.detach_profile(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "profile_remove", task + ) + return + elif command == "register" or command == "registered": + task = asyncio.ensure_future( + self.cluster.register(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "cluster_register", task + ) + return + elif command == "deregister" or command == "deregistered": + task = asyncio.ensure_future( + self.cluster.deregister(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "cluster_deregister", task + ) + return + elif command == "get_creds": + cluster_id = params["_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + task = asyncio.ensure_future(self.cluster.get_creds(db_cluster)) + self.lcm_tasks.register( + "cluster", cluster_id, cluster_id, "cluster_get_credentials", task + ) + return + elif command == "upgrade" or command == "scale": + # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]}) + db_vim = self.db.get_one( + "vim_accounts", {"name": db_cluster["vim_account"]} + ) + db_content["vim_account"] = db_vim + task = asyncio.ensure_future( + self.cluster.update(op_id, op_params, db_content) + ) + self.lcm_tasks.register( + "cluster", cluster_id, op_id, "cluster_update", task + ) + return + elif topic == "k8s_app": + op_id = params["operation_id"] + profile_id = params["profile_id"] + db_profile = self.db.get_one("k8sapp", {"_id": profile_id}) + db_profile["profile_type"] = "applications" + op_params = self.get_operation_params(db_profile, op_id) + if command == "profile_create" or command == "profile_created": + self.logger.debug("Create k8s_app_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_app.create(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_app", profile_id, op_id, "k8s_app_create", task + ) + return + elif command == "delete" or command == "deleted": + self.logger.debug("Delete k8s_app_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_app.delete(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_app", profile_id, op_id, "k8s_app_delete", task + ) + return + elif topic == "k8s_resource": + op_id = params["operation_id"] + profile_id = params["profile_id"] + db_profile = self.db.get_one("k8sresource", {"_id": profile_id}) + db_profile["profile_type"] = "managed-resources" + op_params = self.get_operation_params(db_profile, op_id) + if command == "profile_create" or command == "profile_created": + self.logger.debug("Create k8s_resource_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_resource.create(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_resource", + profile_id, + op_id, + "k8s_resource_create", + task, + ) + return + elif command == "delete" or command == "deleted": + self.logger.debug("Delete k8s_resource_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_resource.delete(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_resource", + profile_id, + op_id, + "k8s_resource_delete", + task, + ) + return + + elif topic == "k8s_infra_controller": + op_id = params["operation_id"] + profile_id = params["profile_id"] + db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id}) + db_profile["profile_type"] = "infra-controllers" + op_params = self.get_operation_params(db_profile, op_id) + if command == "profile_create" or command == "profile_created": + self.logger.debug( + "Create k8s_infra_controller_id = {}".format(profile_id) + ) + task = asyncio.ensure_future( + self.k8s_infra_controller.create(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_infra_controller", + profile_id, + op_id, + "k8s_infra_controller_create", + task, + ) + return + elif command == "delete" or command == "deleted": + self.logger.debug( + "Delete k8s_infra_controller_id = {}".format(profile_id) + ) + task = asyncio.ensure_future( + self.k8s_infra_controller.delete(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_infra_controller", + profile_id, + op_id, + "k8s_infra_controller_delete", + task, + ) + return + + elif topic == "k8s_infra_config": + op_id = params["operation_id"] + profile_id = params["profile_id"] + db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id}) + db_profile["profile_type"] = "infra-configs" + op_params = self.get_operation_params(db_profile, op_id) + if command == "profile_create" or command == "profile_created": + self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_infra_config.create(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_infra_config", + profile_id, + op_id, + "k8s_infra_config_create", + task, + ) + return + elif command == "delete" or command == "deleted": + self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id)) + task = asyncio.ensure_future( + self.k8s_infra_config.delete(op_id, op_params, db_profile) + ) + self.lcm_tasks.register( + "k8s_infra_config", + profile_id, + op_id, + "k8s_infra_config_delete", + task, + ) + return + elif topic == "oka": + op_id = params["operation_id"] + oka_id = params["oka_id"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + op_params = self.get_operation_params(db_oka, op_id) + if command == "create": + task = asyncio.ensure_future(self.oka.create(op_id, op_params, db_oka)) + self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task) + return + elif command == "edit": + task = asyncio.ensure_future(self.oka.edit(op_id, op_params, db_oka)) + self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task) + return + elif command == "delete": + task = asyncio.ensure_future(self.oka.delete(op_id, op_params, db_oka)) + self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task) + return + elif topic == "ksu": + op_id = params["operation_id"] + op_params = None + db_content = None + if not (command == "clone" or command == "move"): + # op_params is a list + # db_content is a list of KSU + db_content = [] + op_params = [] + for ksu_id in params["ksus_list"]: + db_ksu = self.db.get_one("ksus", {"_id": ksu_id}) + db_content.append(db_ksu) + ksu_params = {} + if command == "delete": + ksu_params["profile"] = {} + ksu_params["profile"]["profile_type"] = db_ksu["profile"][ + "profile_type" + ] + ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"] + else: + ksu_params = self.get_operation_params(db_ksu, op_id) + # Update ksu_params["profile"] with profile name and age-pubkey + profile_type = ksu_params["profile"]["profile_type"] + profile_id = ksu_params["profile"]["_id"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one( + profile_collection, {"_id": profile_id} + ) + ksu_params["profile"]["name"] = db_profile["name"] + ksu_params["profile"]["age_pubkey"] = db_profile.get( + "age_pubkey", "" + ) + if command == "create" or command == "edit" or command == "edited": + # Update ksu_params["oka"] with sw_catalog_path (when missing) + for oka in ksu_params["oka"]: + if "sw_catalog_path" not in oka: + oka_id = oka["_id"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + oka[ + "sw_catalog_path" + ] = f"infra-controllers/{db_oka['git_name']}" + op_params.append(ksu_params) + else: + # db_content and op_params are single items + db_content = self.db.get_one("ksus", {"_id": params["_id"]}) + db_content = db_ksu + op_params = self.get_operation_params(db_ksu, op_id) + if command == "create": + task = asyncio.ensure_future( + self.ksu.create(op_id, op_params, db_content) + ) + self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task) + return + elif command == "edit" or command == "edited": + task = asyncio.ensure_future( + self.ksu.edit(op_id, op_params, db_content) + ) + self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task) + return + elif command == "delete": + task = asyncio.ensure_future( + self.ksu.delete(op_id, op_params, db_content) + ) + self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task) + return + elif command == "clone": + task = asyncio.ensure_future( + self.ksu.clone(op_id, op_params, db_content) + ) + self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task) + return + elif command == "move": + task = asyncio.ensure_future( + self.ksu.move(op_id, op_params, db_content) + ) + self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task) + return + self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) async def kafka_read(self): self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True + self.logger.info( + "Consecutive errors: {} first start: {}".format( + self.consecutive_errors, self.first_start + ) + ) while self.consecutive_errors < 10: try: topics = ( @@ -648,16 +1093,30 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", + "cluster", + "k8s_app", + "k8s_resource", + "k8s_infra_controller", + "k8s_infra_config", + "oka", + "ksu", + ) + self.logger.info( + "Consecutive errors: {} first start: {}".format( + self.consecutive_errors, self.first_start + ) ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.loop, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -680,47 +1139,72 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + async def start(self): + self.logger.info("Start LCM") # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) + # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.config, self.loop, self.ns + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) - self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop) - self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop) - self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.config, self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.config, self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.cluster = k8s.ClusterLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.k8s_app = k8s.K8sAppLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_resource = k8s.K8sResourceLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_infra_controller = k8s.K8sInfraControllerLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_infra_config = k8s.K8sInfraConfigLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.cloud_credentials = k8s.CloudCredentialsLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) + self.logger.info( + "Msg: {} lcm tasks: {} main config: {}".format( + self.msg, self.lcm_tasks, self.main_config + ) ) + await self.kafka_read_ping() + # TODO # self.logger.debug("Terminating cancelling creation tasks") # self.lcm_tasks.cancel("ALL", "create") # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -731,67 +1215,9 @@ class Lcm: self.fs.fs_disconnect() def read_config_file(self, config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean try: - # read file as yaml format with open(config_file) as f: - conf = yaml.load(f, Loader=yaml.Loader) - # Ensure all sections are not empty - for k in ( - "global", - "timeout", - "RO", - "VCA", - "database", - "storage", - "message", - ): - if not conf.get(k): - conf[k] = {} - - # read all environ that starts with OSMLCM_ - for k, v in environ.items(): - if not k.startswith("OSMLCM_"): - continue - subject, _, item = k[7:].lower().partition("_") - if not item: - continue - if subject in ("ro", "vca"): - # put in capital letter - subject = subject.upper() - try: - if item == "port" or subject == "timeout": - conf[subject][item] = int(v) - else: - conf[subject][item] = v - except Exception as e: - self.logger.warning( - "skipping environ '{}' on exception '{}'".format(k, e) - ) - - # backward compatibility of VCA parameters - - if "pubkey" in conf["VCA"]: - conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey") - if "cacert" in conf["VCA"]: - conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert") - if "apiproxy" in conf["VCA"]: - conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy") - - if "enableosupgrade" in conf["VCA"]: - conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade") - if isinstance(conf["VCA"].get("enable_os_upgrade"), str): - if conf["VCA"]["enable_os_upgrade"].lower() == "false": - conf["VCA"]["enable_os_upgrade"] = False - elif conf["VCA"]["enable_os_upgrade"].lower() == "true": - conf["VCA"]["enable_os_upgrade"] = True - - if "aptmirror" in conf["VCA"]: - conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror") - - return conf + return yaml.safe_load(f) except Exception as e: self.logger.critical("At config file '{}': {}".format(config_file, e)) exit(1) @@ -803,18 +1229,22 @@ class Lcm: will provide a random one :return: Obtained ID """ - # Try getting docker id. If fails, get pid - try: - with open("/proc/self/cgroup", "r") as f: - text_id_ = f.readline() - _, _, text_id = text_id_.rpartition("/") - text_id = text_id.replace("\n", "")[:12] - if text_id: - return text_id - except Exception: - pass - # Return a random id - return "".join(random_choice("0123456789abcdef") for _ in range(12)) + + def get_docker_id(): + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + return text_id.replace("\n", "")[:12] + except Exception: + return None + + def generate_random_id(): + return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12)) + + # Try getting docker id. If it fails, generate a random id + docker_id = get_docker_id() + return docker_id if docker_id else generate_random_id() def usage(): @@ -832,7 +1262,6 @@ def usage(): if __name__ == "__main__": - try: # print("SYS.PATH='{}'".format(sys.path)) # load parameters and configuration @@ -855,15 +1284,10 @@ if __name__ == "__main__": elif o == "--health-check": from osm_lcm.lcm_hc import health_check - health_check(health_check_file, Lcm.ping_interval_pace) - # elif o == "--log-socket-port": - # log_socket_port = a - # elif o == "--log-socket-host": - # log_socket_host = a - # elif o == "--log-file": - # log_file = a + health_check(config_file, Lcm.ping_interval_pace) else: - assert False, "Unhandled option" + print(f"Unhandled option: {o}") + exit(1) if config_file: if not path.isfile(config_file): @@ -886,8 +1310,9 @@ if __name__ == "__main__": file=sys.stderr, ) exit(1) + config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file))) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()