blob: 6972477ef93d4ded264d43c969f18d8b3eeb18e9 [file] [log] [blame]
Mark Beierl821bfc92023-01-24 21:15:25 -05001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
4##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
20
21import asyncio
22import getopt
23import logging
24import logging.handlers
25import sys
26import yaml
27
28from osm_lcm.lcm_utils import LcmException
29
30from osm_common.dbbase import DbException
31from osm_lcm.data_utils.database.database import Database
32from osm_lcm.data_utils.lcm_config import LcmCfg
33from os import path
34from temporalio import workflow
35from temporalio.client import Client
36from temporalio.worker import Worker
37
38
39class NGLcm:
Mark Beierl821bfc92023-01-24 21:15:25 -050040 main_config = LcmCfg()
41
42 def __init__(self, config_file, loop=None):
43 """
44 Init, Connect to database, filesystem storage, and messaging
45 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
46 :return: None
47 """
48 self.db = None
49
50 # logging
51 self.logger = logging.getLogger("lcm")
52 # load configuration
53 config = self.read_config_file(config_file)
54 self.main_config.set_from_dict(config)
55 self.main_config.transform()
56 self.main_config.load_from_env()
57 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
58
59 # logging
60 log_format_simple = (
61 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
62 )
63 log_formatter_simple = logging.Formatter(
64 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
65 )
66 if self.main_config.globalConfig.logfile:
67 file_handler = logging.handlers.RotatingFileHandler(
68 self.main_config.globalConfig.logfile,
69 maxBytes=100e6,
70 backupCount=9,
71 delay=0,
72 )
73 file_handler.setFormatter(log_formatter_simple)
74 self.logger.addHandler(file_handler)
75 if not self.main_config.globalConfig.to_dict()["nologging"]:
76 str_handler = logging.StreamHandler()
77 str_handler.setFormatter(log_formatter_simple)
78 self.logger.addHandler(str_handler)
79
80 if self.main_config.globalConfig.to_dict()["loglevel"]:
81 self.logger.setLevel(self.main_config.globalConfig.loglevel)
82
83 # logging other modules
84 for logger in ("message", "database", "storage", "tsdb"):
85 logger_config = self.main_config.to_dict()[logger]
86 logger_module = logging.getLogger(logger_config["logger_name"])
87 if logger_config["logfile"]:
88 file_handler = logging.handlers.RotatingFileHandler(
89 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
90 )
91 file_handler.setFormatter(log_formatter_simple)
92 logger_module.addHandler(file_handler)
93 if logger_config["loglevel"]:
94 logger_module.setLevel(logger_config["loglevel"])
95 self.logger.critical("starting osm/nglcm")
96
97 try:
98 self.db = Database(self.main_config.to_dict()).instance.db
Mark Beierl90f700d2023-02-09 15:01:33 -050099 except DbException as e:
Mark Beierl821bfc92023-01-24 21:15:25 -0500100 self.logger.critical(str(e), exc_info=True)
101 raise LcmException(str(e))
102
103 async def start(self):
104 # do some temporal stuff here
105 temporal_api = (
106 f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
107 )
108 self.logger.info(f"Attempting to register with Temporal at {temporal_api}")
109 client = await Client.connect(temporal_api)
110
111 task_queue = "lcm-task-queue"
112 workflows = [
113 Heartbeat,
114 ]
115 activities = []
116
117 worker = Worker(
118 client, task_queue=task_queue, workflows=workflows, activities=activities
119 )
120
121 self.logger.info(f"Registered for queue {task_queue}")
122 self.logger.info(f"Registered workflows {workflows}")
123 self.logger.info(f"Registered activites {activities}")
124
125 await worker.run()
126
127 def read_config_file(self, config_file):
128 try:
129 with open(config_file) as f:
130 return yaml.safe_load(f)
131 except Exception as e:
132 self.logger.critical("At config file '{}': {}".format(config_file, e))
133 exit(1)
134
135
136@workflow.defn
137class Heartbeat:
138 @workflow.run
139 async def run(self) -> str:
140 return "alive"
141
142
143if __name__ == "__main__":
Mark Beierl821bfc92023-01-24 21:15:25 -0500144 try:
145 opts, args = getopt.getopt(
146 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
147 )
148 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
149 config_file = None
150 for o, a in opts:
151 if o in ("-c", "--config"):
152 config_file = a
153 else:
154 assert False, "Unhandled option"
155
156 if config_file:
157 if not path.isfile(config_file):
158 print(
159 "configuration file '{}' does not exist".format(config_file),
160 file=sys.stderr,
161 )
162 exit(1)
163 else:
164 for config_file in (
165 __file__[: __file__.rfind(".")] + ".cfg",
166 "./lcm.cfg",
167 "/etc/osm/lcm.cfg",
168 ):
169 print(f"{config_file}")
170 if path.isfile(config_file):
171 break
172 else:
173 print(
174 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
175 file=sys.stderr,
176 )
177 exit(1)
178 lcm = NGLcm(config_file)
179 asyncio.run(lcm.start())
180 except (LcmException, getopt.GetoptError) as e:
181 print(str(e), file=sys.stderr)
182 # usage()
183 exit(1)