X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=5638943c6fb01bd934bf653355fbe30a402e7cb5;hb=a89a5a7e3b0421a5ae1859235dfef6b6adf24279;hp=9eb37fadfe625dc922f428d4f71bb3f717436c60;hpb=b9e357cbb5c38a95b226213a27adedae740eb1dc;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 9eb37fa..5638943 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -44,7 +44,9 @@ from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem -from os import environ, path +from osm_lcm.data_utils.lcm_config import LcmCfg +from osm_lcm.lcm_hc import get_health_check_file +from os import path from random import choice as random_choice from n2vc import version as n2vc_version import traceback @@ -58,9 +60,6 @@ min_RO_version = "6.0.2" min_n2vc_version = "0.0.2" min_common_version = "0.1.19" -health_check_file = ( - path.expanduser("~") + "/time_last_ping" -) # TODO find better location for this file class Lcm: @@ -69,13 +68,8 @@ class Lcm: 120 # how many time ping is send once is confirmed all is running ) ping_interval_boot = 5 # how many time ping is sent when booting - cfg_logger_name = { - "message": "lcm.msg", - "database": "lcm.db", - "storage": "lcm.fs", - "tsdb": "lcm.prometheus", - } - # ^ contains for each section at lcm.cfg the used logger name + + main_config = LcmCfg() def __init__(self, config_file, loop=None): """ @@ -97,26 +91,12 @@ class Lcm: self.worker_id = self.get_process_id() # load configuration config = self.read_config_file(config_file) - self.config = config - self.config["ro_config"] = { - "ng": config["RO"].get("ng", False), - "uri": config["RO"].get("uri"), - "tenant": config.get("tenant", "osm"), - "logger_name": "lcm.roclient", - "loglevel": config["RO"].get("loglevel", "ERROR"), - } - if not self.config["ro_config"]["uri"]: - self.config["ro_config"]["uri"] = "http://{}:{}/".format( - config["RO"]["host"], config["RO"]["port"] - ) - elif ( - "/ro" in self.config["ro_config"]["uri"][-4:] - or "/openmano" in self.config["ro_config"]["uri"][-10:] - ): - # uri ends with '/ro', '/ro/', '/openmano', '/openmano/' - index = self.config["ro_config"]["uri"][-1].rfind("/") - self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1] - + self.main_config.set_from_dict(config) + self.main_config.transform() + self.main_config.load_from_env() + self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) + # TODO: check if lcm_hc.py is necessary + self.health_check_file = get_health_check_file(self.main_config.to_dict()) self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice @@ -131,35 +111,35 @@ class Lcm: log_formatter_simple = logging.Formatter( log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S" ) - config["database"]["logger_name"] = "lcm.db" - config["storage"]["logger_name"] = "lcm.fs" - config["message"]["logger_name"] = "lcm.msg" - if config["global"].get("logfile"): + if self.main_config.globalConfig.logfile: file_handler = logging.handlers.RotatingFileHandler( - config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0 + self.main_config.globalConfig.logfile, + maxBytes=100e6, + backupCount=9, + delay=0, ) file_handler.setFormatter(log_formatter_simple) self.logger.addHandler(file_handler) - if not config["global"].get("nologging"): + if not self.main_config.globalConfig.to_dict()["nologging"]: str_handler = logging.StreamHandler() str_handler.setFormatter(log_formatter_simple) self.logger.addHandler(str_handler) - if config["global"].get("loglevel"): - self.logger.setLevel(config["global"]["loglevel"]) + if self.main_config.globalConfig.to_dict()["loglevel"]: + self.logger.setLevel(self.main_config.globalConfig.loglevel) # logging other modules - for k1, logname in self.cfg_logger_name.items(): - config[k1]["logger_name"] = logname - logger_module = logging.getLogger(logname) - if config[k1].get("logfile"): + for logger in ("message", "database", "storage", "tsdb"): + logger_config = self.main_config.to_dict()[logger] + logger_module = logging.getLogger(logger_config["logger_name"]) + if logger_config["logfile"]: file_handler = logging.handlers.RotatingFileHandler( - config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0 + logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0 ) file_handler.setFormatter(log_formatter_simple) logger_module.addHandler(file_handler) - if config[k1].get("loglevel"): - logger_module.setLevel(config[k1]["loglevel"]) + if logger_config["loglevel"]: + logger_module.setLevel(logger_config["loglevel"]) self.logger.critical( "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date) ) @@ -182,13 +162,13 @@ class Lcm: ) try: - self.db = Database(config).instance.db + self.db = Database(self.main_config.to_dict()).instance.db - self.fs = Filesystem(config).instance.fs + self.fs = Filesystem(self.main_config.to_dict()).instance.fs self.fs.sync() # copy message configuration in order to remove 'group_id' for msg_admin - config_message = config["message"].copy() + config_message = self.main_config.message.to_dict() config_message["loop"] = self.loop if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() @@ -205,7 +185,7 @@ class Lcm: else: raise LcmException( "Invalid configuration param '{}' at '[message]':'driver'".format( - config["message"]["driver"] + self.main_config.message.driver ) ) except (DbException, FsException, MsgException) as e: @@ -219,19 +199,21 @@ class Lcm: tries = 14 last_error = None while True: - ro_uri = self.config["ro_config"]["uri"] + ro_uri = self.main_config.RO.uri + if not ro_uri: + ro_uri = "" try: # try new RO, if fail old RO try: - self.config["ro_config"]["uri"] = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.config["ro_config"]) + self.main_config.RO.uri = ro_uri + "ro" + ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() - self.config["ro_config"]["ng"] = True + self.main_config.RO.ng = True except Exception: - self.config["ro_config"]["uri"] = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.config["ro_config"]) + self.main_config.RO.uri = ro_uri + "openmano" + ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() - self.config["ro_config"]["ng"] = False + self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): raise LcmException( "Not compatible osm/RO version '{}'. Needed '{}' or higher".format( @@ -240,16 +222,16 @@ class Lcm: ) self.logger.info( "Connected to RO version {} new-generation version {}".format( - ro_version, self.config["ro_config"]["ng"] + ro_version, self.main_config.RO.ng ) ) return except (ROClientException, NgRoException) as e: - self.config["ro_config"]["uri"] = ro_uri + self.main_config.RO.uri = ro_uri tries -= 1 traceback.print_tb(e.__traceback__) error_text = "Error while connecting to RO on {}: {}".format( - self.config["ro_config"]["uri"], e + self.main_config.RO.uri, e ) if tries <= 0: self.logger.critical(error_text) @@ -345,12 +327,12 @@ class Lcm: return self.pings_not_received = 0 try: - with open(health_check_file, "w") as f: + with open(self.health_check_file, "w") as f: f.write(str(time())) except Exception as e: self.logger.error( "Cannot write into '{}' for healthcheck: {}".format( - health_check_file, e + self.health_check_file, e ) ) return @@ -455,6 +437,14 @@ class Lcm: task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return + elif command == "heal": + # self.logger.debug("Healing NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task) + return elif command == "migrate": nslcmop = params nslcmop_id = nslcmop["_id"] @@ -462,6 +452,21 @@ class Lcm: task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task) return + elif command == "verticalscale": + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id)) + self.logger.debug( + "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task) + ) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "ns_verticalscale", task + ) + self.logger.debug( + "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task) + ) + return elif command == "show": nsr_id = params try: @@ -490,9 +495,11 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", "updated", "migrated", + "verticalscaled", ): # "scaled-cooldown-time" return @@ -548,13 +555,14 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", ): # "scaled-cooldown-time" return elif topic == "vim_account": vim_id = params["_id"] if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.vim.create(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_create", task @@ -572,7 +580,7 @@ class Lcm: sys.stdout.flush() return elif command in ("edit", "edited"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.vim.edit(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_edit", task @@ -583,7 +591,7 @@ class Lcm: elif topic == "wim_account": wim_id = params["_id"] if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.wim.create(params, order_id)) self.lcm_tasks.register( "wim_account", wim_id, order_id, "wim_create", task @@ -611,7 +619,7 @@ class Lcm: elif topic == "sdn": _sdn_id = params["_id"] if command in ("create", "created"): - if not self.config["ro_config"].get("ng"): + if not self.main_config.RO.ng: task = asyncio.ensure_future(self.sdn.create(params, order_id)) self.lcm_tasks.register( "sdn", _sdn_id, order_id, "sdn_create", task @@ -691,19 +699,28 @@ class Lcm: # check RO version self.loop.run_until_complete(self.check_RO_version()) - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.config, self.loop, self.ns + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns + ) + self.vim = vim_sdn.VimLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + ) + self.wim = vim_sdn.WimLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + ) + self.sdn = vim_sdn.SdnLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop ) - self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop) - self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop) - self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.config, self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + ) + self.vca = vim_sdn.VcaLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop ) - self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.config, self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop ) self.loop.run_until_complete( @@ -732,67 +749,9 @@ class Lcm: self.fs.fs_disconnect() def read_config_file(self, config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean try: - # read file as yaml format with open(config_file) as f: - conf = yaml.load(f, Loader=yaml.Loader) - # Ensure all sections are not empty - for k in ( - "global", - "timeout", - "RO", - "VCA", - "database", - "storage", - "message", - ): - if not conf.get(k): - conf[k] = {} - - # read all environ that starts with OSMLCM_ - for k, v in environ.items(): - if not k.startswith("OSMLCM_"): - continue - subject, _, item = k[7:].lower().partition("_") - if not item: - continue - if subject in ("ro", "vca"): - # put in capital letter - subject = subject.upper() - try: - if item == "port" or subject == "timeout": - conf[subject][item] = int(v) - else: - conf[subject][item] = v - except Exception as e: - self.logger.warning( - "skipping environ '{}' on exception '{}'".format(k, e) - ) - - # backward compatibility of VCA parameters - - if "pubkey" in conf["VCA"]: - conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey") - if "cacert" in conf["VCA"]: - conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert") - if "apiproxy" in conf["VCA"]: - conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy") - - if "enableosupgrade" in conf["VCA"]: - conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade") - if isinstance(conf["VCA"].get("enable_os_upgrade"), str): - if conf["VCA"]["enable_os_upgrade"].lower() == "false": - conf["VCA"]["enable_os_upgrade"] = False - elif conf["VCA"]["enable_os_upgrade"].lower() == "true": - conf["VCA"]["enable_os_upgrade"] = True - - if "aptmirror" in conf["VCA"]: - conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror") - - return conf + return yaml.safe_load(f) except Exception as e: self.logger.critical("At config file '{}': {}".format(config_file, e)) exit(1) @@ -856,7 +815,7 @@ if __name__ == "__main__": elif o == "--health-check": from osm_lcm.lcm_hc import health_check - health_check(health_check_file, Lcm.ping_interval_pace) + health_check(config_file, Lcm.ping_interval_pace) # elif o == "--log-socket-port": # log_socket_port = a # elif o == "--log-socket-host":