from osm_common.msgbase import MsgException
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.lcm_hc import get_health_check_file
-from os import environ, path
+from os import path
from random import choice as random_choice
from n2vc import version as n2vc_version
import traceback
120 # how many time ping is send once is confirmed all is running
)
ping_interval_boot = 5 # how many time ping is sent when booting
- cfg_logger_name = {
- "message": "lcm.msg",
- "database": "lcm.db",
- "storage": "lcm.fs",
- "tsdb": "lcm.prometheus",
- }
- # ^ contains for each section at lcm.cfg the used logger name
+
+ main_config = LcmCfg()
def __init__(self, config_file, loop=None):
"""
self.worker_id = self.get_process_id()
# load configuration
config = self.read_config_file(config_file)
- self.config = config
- self.health_check_file = get_health_check_file(self.config)
- self.config["ro_config"] = {
- "ng": config["RO"].get("ng", False),
- "uri": config["RO"].get("uri"),
- "tenant": config.get("tenant", "osm"),
- "logger_name": "lcm.roclient",
- "loglevel": config["RO"].get("loglevel", "ERROR"),
- }
- if not self.config["ro_config"]["uri"]:
- self.config["ro_config"]["uri"] = "http://{}:{}/".format(
- config["RO"]["host"], config["RO"]["port"]
- )
- elif (
- "/ro" in self.config["ro_config"]["uri"][-4:]
- or "/openmano" in self.config["ro_config"]["uri"][-10:]
- ):
- # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
- index = self.config["ro_config"]["uri"][-1].rfind("/")
- self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
-
+ self.main_config.set_from_dict(config)
+ self.main_config.transform()
+ self.main_config.load_from_env()
+ self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
+ # TODO: check if lcm_hc.py is necessary
+ self.health_check_file = get_health_check_file(self.main_config.to_dict())
self.loop = loop or asyncio.get_event_loop()
self.ns = (
self.netslice
log_formatter_simple = logging.Formatter(
log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
)
- config["database"]["logger_name"] = "lcm.db"
- config["storage"]["logger_name"] = "lcm.fs"
- config["message"]["logger_name"] = "lcm.msg"
- if config["global"].get("logfile"):
+ if self.main_config.globalConfig.logfile:
file_handler = logging.handlers.RotatingFileHandler(
- config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
+ self.main_config.globalConfig.logfile,
+ maxBytes=100e6,
+ backupCount=9,
+ delay=0,
)
file_handler.setFormatter(log_formatter_simple)
self.logger.addHandler(file_handler)
- if not config["global"].get("nologging"):
+ if not self.main_config.globalConfig.to_dict()["nologging"]:
str_handler = logging.StreamHandler()
str_handler.setFormatter(log_formatter_simple)
self.logger.addHandler(str_handler)
- if config["global"].get("loglevel"):
- self.logger.setLevel(config["global"]["loglevel"])
+ if self.main_config.globalConfig.to_dict()["loglevel"]:
+ self.logger.setLevel(self.main_config.globalConfig.loglevel)
# logging other modules
- for k1, logname in self.cfg_logger_name.items():
- config[k1]["logger_name"] = logname
- logger_module = logging.getLogger(logname)
- if config[k1].get("logfile"):
+ for logger in ("message", "database", "storage", "tsdb"):
+ logger_config = self.main_config.to_dict()[logger]
+ logger_module = logging.getLogger(logger_config["logger_name"])
+ if logger_config["logfile"]:
file_handler = logging.handlers.RotatingFileHandler(
- config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
+ logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
)
file_handler.setFormatter(log_formatter_simple)
logger_module.addHandler(file_handler)
- if config[k1].get("loglevel"):
- logger_module.setLevel(config[k1]["loglevel"])
+ if logger_config["loglevel"]:
+ logger_module.setLevel(logger_config["loglevel"])
self.logger.critical(
"starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
)
)
try:
- self.db = Database(config).instance.db
+ self.db = Database(self.main_config.to_dict()).instance.db
- self.fs = Filesystem(config).instance.fs
+ self.fs = Filesystem(self.main_config.to_dict()).instance.fs
self.fs.sync()
# copy message configuration in order to remove 'group_id' for msg_admin
- config_message = config["message"].copy()
+ config_message = self.main_config.message.to_dict()
config_message["loop"] = self.loop
if config_message["driver"] == "local":
self.msg = msglocal.MsgLocal()
else:
raise LcmException(
"Invalid configuration param '{}' at '[message]':'driver'".format(
- config["message"]["driver"]
+ self.main_config.message.driver
)
)
except (DbException, FsException, MsgException) as e:
tries = 14
last_error = None
while True:
- ro_uri = self.config["ro_config"]["uri"]
+ ro_uri = self.main_config.RO.uri
+ if not ro_uri:
+ ro_uri = ""
try:
# try new RO, if fail old RO
try:
- self.config["ro_config"]["uri"] = ro_uri + "ro"
- ro_server = NgRoClient(self.loop, **self.config["ro_config"])
+ self.main_config.RO.uri = ro_uri + "ro"
+ ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
- self.config["ro_config"]["ng"] = True
+ self.main_config.RO.ng = True
except Exception:
- self.config["ro_config"]["uri"] = ro_uri + "openmano"
- ro_server = ROClient(self.loop, **self.config["ro_config"])
+ self.main_config.RO.uri = ro_uri + "openmano"
+ ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
- self.config["ro_config"]["ng"] = False
+ self.main_config.RO.ng = False
if versiontuple(ro_version) < versiontuple(min_RO_version):
raise LcmException(
"Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
)
self.logger.info(
"Connected to RO version {} new-generation version {}".format(
- ro_version, self.config["ro_config"]["ng"]
+ ro_version, self.main_config.RO.ng
)
)
return
except (ROClientException, NgRoException) as e:
- self.config["ro_config"]["uri"] = ro_uri
+ self.main_config.RO.uri = ro_uri
tries -= 1
traceback.print_tb(e.__traceback__)
error_text = "Error while connecting to RO on {}: {}".format(
- self.config["ro_config"]["uri"], e
+ self.main_config.RO.uri, e
)
if tries <= 0:
self.logger.critical(error_text)
"k8scluster", k8scluster_id, order_id, "k8scluster_create", task
)
return
+ elif command == "edit" or command == "edited":
+ k8scluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
+ self.lcm_tasks.register(
+ "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
+ )
+ return
elif command == "delete" or command == "deleted":
k8scluster_id = params.get("_id")
task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
task = asyncio.ensure_future(self.vca.create(params, order_id))
self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
return
+ elif command == "edit" or command == "edited":
+ vca_id = params.get("_id")
+ task = asyncio.ensure_future(self.vca.edit(params, order_id))
+ self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
+ return
elif command == "delete" or command == "deleted":
vca_id = params.get("_id")
task = asyncio.ensure_future(self.vca.delete(params, order_id))
nslcmop_id = nslcmop["_id"]
nsr_id = nslcmop["nsInstanceId"]
task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
- self.lcm_tasks.register(
- "ns", nsr_id, nslcmop_id, "ns_heal", task
- )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
return
elif command == "migrate":
nslcmop = params
nslcmop_id = nslcmop["_id"]
nsr_id = nslcmop["nsInstanceId"]
task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
- self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task))
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task)
- self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task))
+ self.logger.debug(
+ "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
+ )
+ self.lcm_tasks.register(
+ "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
+ )
+ self.logger.debug(
+ "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
+ )
return
elif command == "show":
nsr_id = params
db_nsr["config-status"],
db_nsr["detailed-status"],
db_nsr["_admin"]["deployed"],
- self.lcm_ns_tasks.get(nsr_id),
+ self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
)
)
except Exception as e:
db_nsir["config-status"],
db_nsir["detailed-status"],
db_nsir["_admin"]["deployed"],
- self.lcm_netslice_tasks.get(nsir_id),
+ self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
)
)
except Exception as e:
elif topic == "vim_account":
vim_id = params["_id"]
if command in ("create", "created"):
- if not self.config["ro_config"].get("ng"):
+ if not self.main_config.RO.ng:
task = asyncio.ensure_future(self.vim.create(params, order_id))
self.lcm_tasks.register(
"vim_account", vim_id, order_id, "vim_create", task
sys.stdout.flush()
return
elif command in ("edit", "edited"):
- if not self.config["ro_config"].get("ng"):
+ if not self.main_config.RO.ng:
task = asyncio.ensure_future(self.vim.edit(params, order_id))
self.lcm_tasks.register(
"vim_account", vim_id, order_id, "vim_edit", task
elif topic == "wim_account":
wim_id = params["_id"]
if command in ("create", "created"):
- if not self.config["ro_config"].get("ng"):
+ if not self.main_config.RO.ng:
task = asyncio.ensure_future(self.wim.create(params, order_id))
self.lcm_tasks.register(
"wim_account", wim_id, order_id, "wim_create", task
elif topic == "sdn":
_sdn_id = params["_id"]
if command in ("create", "created"):
- if not self.config["ro_config"].get("ng"):
+ if not self.main_config.RO.ng:
task = asyncio.ensure_future(self.sdn.create(params, order_id))
self.lcm_tasks.register(
"sdn", _sdn_id, order_id, "sdn_create", task
# check RO version
self.loop.run_until_complete(self.check_RO_version())
- self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
+ # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
self.netslice = netslice.NetsliceLcm(
- self.msg, self.lcm_tasks, self.config, self.loop, self.ns
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
+ )
+ self.vim = vim_sdn.VimLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ )
+ self.wim = vim_sdn.WimLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ )
+ self.sdn = vim_sdn.SdnLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
)
- self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
- self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
- self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
self.k8scluster = vim_sdn.K8sClusterLcm(
- self.msg, self.lcm_tasks, self.config, self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ )
+ self.vca = vim_sdn.VcaLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
)
- self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
self.k8srepo = vim_sdn.K8sRepoLcm(
- self.msg, self.lcm_tasks, self.config, self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
)
self.loop.run_until_complete(
self.fs.fs_disconnect()
def read_config_file(self, config_file):
- # TODO make a [ini] + yaml inside parser
- # the configparser library is not suitable, because it does not admit comments at the end of line,
- # and not parse integer or boolean
try:
- # read file as yaml format
with open(config_file) as f:
- conf = yaml.safe_load(f)
- # Ensure all sections are not empty
- for k in (
- "global",
- "timeout",
- "RO",
- "VCA",
- "database",
- "storage",
- "message",
- ):
- if not conf.get(k):
- conf[k] = {}
-
- # read all environ that starts with OSMLCM_
- for k, v in environ.items():
- if not k.startswith("OSMLCM_"):
- continue
- subject, _, item = k[7:].lower().partition("_")
- if not item:
- continue
- if subject in ("ro", "vca"):
- # put in capital letter
- subject = subject.upper()
- try:
- if item == "port" or subject == "timeout":
- conf[subject][item] = int(v)
- else:
- conf[subject][item] = v
- except Exception as e:
- self.logger.warning(
- "skipping environ '{}' on exception '{}'".format(k, e)
- )
-
- # backward compatibility of VCA parameters
-
- if "pubkey" in conf["VCA"]:
- conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
- if "cacert" in conf["VCA"]:
- conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
- if "apiproxy" in conf["VCA"]:
- conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
-
- if "enableosupgrade" in conf["VCA"]:
- conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
- if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
- if conf["VCA"]["enable_os_upgrade"].lower() == "false":
- conf["VCA"]["enable_os_upgrade"] = False
- elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
- conf["VCA"]["enable_os_upgrade"] = True
-
- if "aptmirror" in conf["VCA"]:
- conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
-
- return conf
+ return yaml.safe_load(f)
except Exception as e:
self.logger.critical("At config file '{}': {}".format(config_file, e))
exit(1)