blob: 46e283669ecadf9f7d842634f4ab51dd7f9fd92a [file] [log] [blame]
#!/usr/bin/python3
# -*- coding: utf-8 -*-
##
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
##
import asyncio
import getopt
import logging
import logging.handlers
import os
import sys
from os import path
import yaml
from osm_common.dbbase import DbException
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.lcm_utils import LcmException
from osm_lcm.temporal.juju_paas_activities import (
JujuPaasConnector,
CreateModelImpl,
CheckCharmStatusImpl,
DeployCharmImpl,
TestVimConnectivityImpl,
RemoveCharmImpl,
CheckCharmIsRemovedImpl,
ResolveCharmErrorsImpl,
)
from osm_lcm.temporal.lcm_activities import NsLcmNoOpImpl, UpdateNsLcmOperationStateImpl
from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflowImpl
from osm_lcm.temporal.ns_activities import (
DeleteNsRecordImpl,
GetVnfDetailsImpl,
GetNsRecordImpl,
UpdateNsStateImpl,
)
from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflowImpl
from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflowImpl
from osm_lcm.temporal.vim_activities import (
UpdateVimStateImpl,
UpdateVimOperationStateImpl,
DeleteVimRecordImpl,
)
from osm_lcm.temporal.vim_workflows import (
VimCreateWorkflowImpl,
VimDeleteWorkflowImpl,
VimUpdateWorkflowImpl,
)
from osm_lcm.temporal.vnf_activities import (
DeleteVnfRecordImpl,
GetTaskQueueImpl,
GetVnfDescriptorImpl,
GetVnfRecordImpl,
GetVimCloudImpl,
ChangeVnfStateImpl,
SetVnfModelImpl,
ChangeVnfInstantiationStateImpl,
SendNotificationForVnfImpl,
)
from osm_lcm.temporal.vnf_workflows import (
VnfDeleteWorkflowImpl,
VnfInstantiateWorkflowImpl,
VnfPrepareWorkflowImpl,
VnfTerminateWorkflowImpl,
)
from temporalio.client import Client
from temporalio.worker import Worker
class NGLcm:
main_config = LcmCfg()
def __init__(self, config_file):
"""
Init, Connect to database, filesystem storage, and messaging
:param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.db: Database = None
self.logger = logging.getLogger("lcm")
self._load_configuration(config_file)
self._configure_logging()
try:
self.db = Database(self.main_config.to_dict()).instance.db
except DbException as e:
self.logger.critical(str(e), exc_info=True)
raise LcmException(str(e))
def _load_configuration(self, config_file):
config = self._read_config_file(config_file)
self.main_config.set_from_dict(config)
self.main_config.transform()
self.main_config.load_from_env()
self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
def _read_config_file(self, config_file):
try:
with open(config_file) as f:
return yaml.safe_load(f)
except Exception as e:
self.logger.critical("At config file '{}': {}".format(config_file, e))
exit(1)
@staticmethod
def _get_log_formatter_simple():
log_format_simple = (
"%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
)
return logging.Formatter(log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S")
def _create_file_handler(self):
return logging.handlers.RotatingFileHandler(
self.main_config.globalConfig.logfile,
maxBytes=100e6,
backupCount=9,
delay=0,
)
def _log_other_modules(self):
for logger in ("message", "database", "storage", "tsdb", "temporal"):
logger_config = self.main_config.to_dict()[logger]
logger_module = logging.getLogger(logger_config["logger_name"])
if logger_config["logfile"]:
file_handler = logging.handlers.RotatingFileHandler(
logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
)
file_handler.setFormatter(self._get_log_formatter_simple())
logger_module.addHandler(file_handler)
if logger_config["loglevel"]:
logger_module.setLevel(logger_config["loglevel"])
logging.getLogger("juju.client.connection").setLevel(logging.CRITICAL)
def _configure_logging(self):
if self.main_config.globalConfig.logfile:
file_handler = self._create_file_handler()
file_handler.setFormatter(self._get_log_formatter_simple())
self.logger.addHandler(file_handler)
if not self.main_config.globalConfig.to_dict()["nologging"]:
str_handler = logging.StreamHandler()
str_handler.setFormatter(self._get_log_formatter_simple())
self.logger.addHandler(str_handler)
if self.main_config.globalConfig.to_dict()["loglevel"]:
self.logger.setLevel(self.main_config.globalConfig.loglevel)
self._log_other_modules()
self.logger.critical("starting osm/nglcm")
async def start(self):
temporal_api = (
f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
)
client = await Client.connect(temporal_api)
paas_connector_instance = JujuPaasConnector(self.db)
workflows = [
NsInstantiateWorkflowImpl,
NsNoOpWorkflowImpl,
VimCreateWorkflowImpl,
VimDeleteWorkflowImpl,
VimUpdateWorkflowImpl,
VduInstantiateWorkflowImpl,
VnfDeleteWorkflowImpl,
VnfInstantiateWorkflowImpl,
VnfPrepareWorkflowImpl,
VnfTerminateWorkflowImpl,
]
activities = [
UpdateNsStateImpl(self.db),
GetVnfDetailsImpl(self.db),
GetNsRecordImpl(self.db),
UpdateNsLcmOperationStateImpl(self.db),
NsLcmNoOpImpl(),
CreateModelImpl(self.db, paas_connector_instance),
DeployCharmImpl(paas_connector_instance),
CheckCharmStatusImpl(paas_connector_instance),
TestVimConnectivityImpl(paas_connector_instance),
RemoveCharmImpl(paas_connector_instance),
CheckCharmIsRemovedImpl(paas_connector_instance),
ResolveCharmErrorsImpl(paas_connector_instance),
UpdateVimOperationStateImpl(self.db),
UpdateVimStateImpl(self.db),
DeleteVimRecordImpl(self.db),
ChangeVnfStateImpl(self.db),
ChangeVnfInstantiationStateImpl(self.db),
DeleteNsRecordImpl(self.db),
DeleteVnfRecordImpl(self.db),
GetTaskQueueImpl(self.db),
GetVimCloudImpl(self.db),
GetVnfDescriptorImpl(self.db),
GetVnfRecordImpl(self.db),
SendNotificationForVnfImpl(),
SetVnfModelImpl(self.db),
]
# Check if we are running under a debugger
debug = os.getenv("VSCODE_IPC_HOOK_CLI") is not None
worker = Worker(
client,
task_queue=LCM_TASK_QUEUE,
workflows=workflows,
activities=activities,
debug_mode=debug,
)
self.logger.info("Starting LCM temporal worker")
await worker.run()
if __name__ == "__main__":
try:
opts, args = getopt.getopt(
sys.argv[1:], "hc:", ["config=", "help", "health-check"]
)
# TODO add "log-socket-host=", "log-socket-port=", "log-file="
config_file = None
for o, a in opts:
if o in ("-c", "--config"):
config_file = a
else:
assert False, "Unhandled option"
if config_file:
if not path.isfile(config_file):
print(
"configuration file '{}' does not exist".format(config_file),
file=sys.stderr,
)
exit(1)
else:
for config_file in (
__file__[: __file__.rfind(".")] + ".cfg",
"./lcm.cfg",
"/etc/osm/lcm.cfg",
):
print(f"{config_file}")
if path.isfile(config_file):
break
else:
print(
"No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
file=sys.stderr,
)
exit(1)
lcm = NGLcm(config_file)
asyncio.run(lcm.start())
except (LcmException, getopt.GetoptError) as e:
print(str(e), file=sys.stderr)
# usage()
exit(1)