blob: 67b2a3f845c31a5ce8da3dd52a578511f2fd9745 [file] [log] [blame]
Mark Beierl821bfc92023-01-24 21:15:25 -05001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
4##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
20
21import asyncio
22import getopt
23import logging
24import logging.handlers
25import sys
26import yaml
27
28from osm_lcm.lcm_utils import LcmException
29
30from osm_common.dbbase import DbException
31from osm_lcm.data_utils.database.database import Database
32from osm_lcm.data_utils.lcm_config import LcmCfg
33from os import path
34from temporalio import workflow
35from temporalio.client import Client
36from temporalio.worker import Worker
37
38
39class NGLcm:
40
41 main_config = LcmCfg()
42
43 def __init__(self, config_file, loop=None):
44 """
45 Init, Connect to database, filesystem storage, and messaging
46 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
47 :return: None
48 """
49 self.db = None
50
51 # logging
52 self.logger = logging.getLogger("lcm")
53 # load configuration
54 config = self.read_config_file(config_file)
55 self.main_config.set_from_dict(config)
56 self.main_config.transform()
57 self.main_config.load_from_env()
58 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
59
60 # logging
61 log_format_simple = (
62 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
63 )
64 log_formatter_simple = logging.Formatter(
65 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
66 )
67 if self.main_config.globalConfig.logfile:
68 file_handler = logging.handlers.RotatingFileHandler(
69 self.main_config.globalConfig.logfile,
70 maxBytes=100e6,
71 backupCount=9,
72 delay=0,
73 )
74 file_handler.setFormatter(log_formatter_simple)
75 self.logger.addHandler(file_handler)
76 if not self.main_config.globalConfig.to_dict()["nologging"]:
77 str_handler = logging.StreamHandler()
78 str_handler.setFormatter(log_formatter_simple)
79 self.logger.addHandler(str_handler)
80
81 if self.main_config.globalConfig.to_dict()["loglevel"]:
82 self.logger.setLevel(self.main_config.globalConfig.loglevel)
83
84 # logging other modules
85 for logger in ("message", "database", "storage", "tsdb"):
86 logger_config = self.main_config.to_dict()[logger]
87 logger_module = logging.getLogger(logger_config["logger_name"])
88 if logger_config["logfile"]:
89 file_handler = logging.handlers.RotatingFileHandler(
90 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
91 )
92 file_handler.setFormatter(log_formatter_simple)
93 logger_module.addHandler(file_handler)
94 if logger_config["loglevel"]:
95 logger_module.setLevel(logger_config["loglevel"])
96 self.logger.critical("starting osm/nglcm")
97
98 try:
99 self.db = Database(self.main_config.to_dict()).instance.db
100 except (DbException) as e:
101 self.logger.critical(str(e), exc_info=True)
102 raise LcmException(str(e))
103
104 async def start(self):
105 # do some temporal stuff here
106 temporal_api = (
107 f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
108 )
109 self.logger.info(f"Attempting to register with Temporal at {temporal_api}")
110 client = await Client.connect(temporal_api)
111
112 task_queue = "lcm-task-queue"
113 workflows = [
114 Heartbeat,
115 ]
116 activities = []
117
118 worker = Worker(
119 client, task_queue=task_queue, workflows=workflows, activities=activities
120 )
121
122 self.logger.info(f"Registered for queue {task_queue}")
123 self.logger.info(f"Registered workflows {workflows}")
124 self.logger.info(f"Registered activites {activities}")
125
126 await worker.run()
127
128 def read_config_file(self, config_file):
129 try:
130 with open(config_file) as f:
131 return yaml.safe_load(f)
132 except Exception as e:
133 self.logger.critical("At config file '{}': {}".format(config_file, e))
134 exit(1)
135
136
137@workflow.defn
138class Heartbeat:
139 @workflow.run
140 async def run(self) -> str:
141 return "alive"
142
143
144if __name__ == "__main__":
145
146 try:
147 opts, args = getopt.getopt(
148 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
149 )
150 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
151 config_file = None
152 for o, a in opts:
153 if o in ("-c", "--config"):
154 config_file = a
155 else:
156 assert False, "Unhandled option"
157
158 if config_file:
159 if not path.isfile(config_file):
160 print(
161 "configuration file '{}' does not exist".format(config_file),
162 file=sys.stderr,
163 )
164 exit(1)
165 else:
166 for config_file in (
167 __file__[: __file__.rfind(".")] + ".cfg",
168 "./lcm.cfg",
169 "/etc/osm/lcm.cfg",
170 ):
171 print(f"{config_file}")
172 if path.isfile(config_file):
173 break
174 else:
175 print(
176 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
177 file=sys.stderr,
178 )
179 exit(1)
180 lcm = NGLcm(config_file)
181 asyncio.run(lcm.start())
182 except (LcmException, getopt.GetoptError) as e:
183 print(str(e), file=sys.stderr)
184 # usage()
185 exit(1)