blob: 2fc479feb3da55673ee9632f3950a1989eba26ab [file] [log] [blame]
tiernoc0e42e22018-05-11 11:36:10 +02001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
tierno2e215512018-11-28 09:37:52 +00004##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
quilesj7e13aeb2019-10-08 13:34:55 +020020
21# DEBUG WITH PDB
22import os
23import pdb
24
tiernoc0e42e22018-05-11 11:36:10 +020025import asyncio
26import yaml
tierno275411e2018-05-16 14:33:32 +020027import logging
28import logging.handlers
29import getopt
tierno275411e2018-05-16 14:33:32 +020030import sys
tierno59d22d22018-09-25 18:10:19 +020031
bravof73bac502021-05-11 07:38:47 -040032from osm_lcm import ns, vim_sdn, netslice
tierno69f0d382020-05-07 13:08:09 +000033from osm_lcm.ng_ro import NgRoException, NgRoClient
34from osm_lcm.ROclient import ROClient, ROClientException
quilesj7e13aeb2019-10-08 13:34:55 +020035
tierno94f06112020-02-11 12:38:19 +000036from time import time
tierno8069ce52019-08-28 15:34:33 +000037from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
tiernoa4dea5a2020-01-05 16:29:30 +000038from osm_lcm import version as lcm_version, version_date as lcm_version_date
tierno8069ce52019-08-28 15:34:33 +000039
bravof922c4172020-11-24 21:21:43 -030040from osm_common import msglocal, msgkafka
tierno98768132018-09-11 12:07:21 +020041from osm_common import version as common_version
tierno59d22d22018-09-25 18:10:19 +020042from osm_common.dbbase import DbException
tiernoc0e42e22018-05-11 11:36:10 +020043from osm_common.fsbase import FsException
44from osm_common.msgbase import MsgException
bravof922c4172020-11-24 21:21:43 -030045from osm_lcm.data_utils.database.database import Database
46from osm_lcm.data_utils.filesystem.filesystem import Filesystem
Luis Vegaa27dc532022-11-11 20:10:49 +000047from osm_lcm.data_utils.lcm_config import LcmCfg
aticig56b86c22022-06-29 10:43:05 +030048from osm_lcm.lcm_hc import get_health_check_file
Luis Vegaa27dc532022-11-11 20:10:49 +000049from os import path
tierno16427352019-04-22 11:37:36 +000050from random import choice as random_choice
tierno59d22d22018-09-25 18:10:19 +020051from n2vc import version as n2vc_version
bravof922c4172020-11-24 21:21:43 -030052import traceback
tiernoc0e42e22018-05-11 11:36:10 +020053
garciadeblas5697b8b2021-03-24 09:17:02 +010054if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
quilesj7e13aeb2019-10-08 13:34:55 +020055 pdb.set_trace()
56
tiernoc0e42e22018-05-11 11:36:10 +020057
tierno275411e2018-05-16 14:33:32 +020058__author__ = "Alfonso Tierno"
tiernoe64f7fb2019-09-11 08:55:52 +000059min_RO_version = "6.0.2"
tierno6e9d2eb2018-09-12 17:47:18 +020060min_n2vc_version = "0.0.2"
quilesj7e13aeb2019-10-08 13:34:55 +020061
tierno16427352019-04-22 11:37:36 +000062min_common_version = "0.1.19"
tierno275411e2018-05-16 14:33:32 +020063
64
tiernoc0e42e22018-05-11 11:36:10 +020065class Lcm:
garciadeblas5697b8b2021-03-24 09:17:02 +010066 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
Luis Vegaa27dc532022-11-11 20:10:49 +000070
71 main_config = LcmCfg()
tiernoa9843d82018-10-24 10:44:20 +020072
Gabriel Cubae7898982023-05-11 01:57:21 -050073 def __init__(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +020074 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
tiernoc0e42e22018-05-11 11:36:10 +020079 self.db = None
80 self.msg = None
tierno16427352019-04-22 11:37:36 +000081 self.msg_admin = None
tiernoc0e42e22018-05-11 11:36:10 +020082 self.fs = None
83 self.pings_not_received = 1
tiernoc2564fe2019-01-28 16:18:56 +000084 self.consecutive_errors = 0
85 self.first_start = False
tiernoc0e42e22018-05-11 11:36:10 +020086
tiernoc0e42e22018-05-11 11:36:10 +020087 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +010088 self.logger = logging.getLogger("lcm")
tierno16427352019-04-22 11:37:36 +000089 # get id
90 self.worker_id = self.get_process_id()
tiernoc0e42e22018-05-11 11:36:10 +020091 # load configuration
92 config = self.read_config_file(config_file)
Luis Vegaa27dc532022-11-11 20:10:49 +000093 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +010099 self.ns = (
100 self.netslice
101 ) = (
102 self.vim
103 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
tiernoc0e42e22018-05-11 11:36:10 +0200104
105 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +0100106 log_format_simple = (
107 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 )
109 log_formatter_simple = logging.Formatter(
110 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
111 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000112 if self.main_config.globalConfig.logfile:
garciadeblas5697b8b2021-03-24 09:17:02 +0100113 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000114 self.main_config.globalConfig.logfile,
115 maxBytes=100e6,
116 backupCount=9,
117 delay=0,
garciadeblas5697b8b2021-03-24 09:17:02 +0100118 )
tiernoc0e42e22018-05-11 11:36:10 +0200119 file_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000121 if not self.main_config.globalConfig.to_dict()["nologging"]:
tiernoc0e42e22018-05-11 11:36:10 +0200122 str_handler = logging.StreamHandler()
123 str_handler.setFormatter(log_formatter_simple)
124 self.logger.addHandler(str_handler)
125
Luis Vegaa27dc532022-11-11 20:10:49 +0000126 if self.main_config.globalConfig.to_dict()["loglevel"]:
127 self.logger.setLevel(self.main_config.globalConfig.loglevel)
tiernoc0e42e22018-05-11 11:36:10 +0200128
129 # logging other modules
Luis Vegaa27dc532022-11-11 20:10:49 +0000130 for logger in ("message", "database", "storage", "tsdb"):
131 logger_config = self.main_config.to_dict()[logger]
132 logger_module = logging.getLogger(logger_config["logger_name"])
133 if logger_config["logfile"]:
garciadeblas5697b8b2021-03-24 09:17:02 +0100134 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000135 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
garciadeblas5697b8b2021-03-24 09:17:02 +0100136 )
tiernoc0e42e22018-05-11 11:36:10 +0200137 file_handler.setFormatter(log_formatter_simple)
138 logger_module.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000139 if logger_config["loglevel"]:
140 logger_module.setLevel(logger_config["loglevel"])
garciadeblas5697b8b2021-03-24 09:17:02 +0100141 self.logger.critical(
142 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
143 )
tierno59d22d22018-09-25 18:10:19 +0200144
tiernoc0e42e22018-05-11 11:36:10 +0200145 # check version of N2VC
146 # TODO enhance with int conversion or from distutils.version import LooseVersion
147 # or with list(map(int, version.split(".")))
tierno59d22d22018-09-25 18:10:19 +0200148 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100149 raise LcmException(
150 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
151 n2vc_version, min_n2vc_version
152 )
153 )
tierno59d22d22018-09-25 18:10:19 +0200154 # check version of common
tierno27246d82018-09-27 15:59:09 +0200155 if versiontuple(common_version) < versiontuple(min_common_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100156 raise LcmException(
157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
158 common_version, min_common_version
159 )
160 )
tierno22f4f9c2018-06-11 18:53:39 +0200161
tiernoc0e42e22018-05-11 11:36:10 +0200162 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000163 self.db = Database(self.main_config.to_dict()).instance.db
tiernoc0e42e22018-05-11 11:36:10 +0200164
Luis Vegaa27dc532022-11-11 20:10:49 +0000165 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
sousaedu40365e82021-07-26 15:24:21 +0200166 self.fs.sync()
tiernoc0e42e22018-05-11 11:36:10 +0200167
quilesj7e13aeb2019-10-08 13:34:55 +0200168 # copy message configuration in order to remove 'group_id' for msg_admin
Luis Vegaa27dc532022-11-11 20:10:49 +0000169 config_message = self.main_config.message.to_dict()
Gabriel Cubae7898982023-05-11 01:57:21 -0500170 config_message["loop"] = asyncio.get_event_loop()
tiernoc2564fe2019-01-28 16:18:56 +0000171 if config_message["driver"] == "local":
tiernoc0e42e22018-05-11 11:36:10 +0200172 self.msg = msglocal.MsgLocal()
tiernoc2564fe2019-01-28 16:18:56 +0000173 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
tiernoc2564fe2019-01-28 16:18:56 +0000177 elif config_message["driver"] == "kafka":
tiernoc0e42e22018-05-11 11:36:10 +0200178 self.msg = msgkafka.MsgKafka()
tiernoc2564fe2019-01-28 16:18:56 +0000179 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
tiernoc0e42e22018-05-11 11:36:10 +0200183 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100184 raise LcmException(
185 "Invalid configuration param '{}' at '[message]':'driver'".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000186 self.main_config.message.driver
garciadeblas5697b8b2021-03-24 09:17:02 +0100187 )
188 )
tiernoc0e42e22018-05-11 11:36:10 +0200189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
kuused124bfe2019-06-18 12:09:24 +0200193 # contains created tasks/futures to be able to cancel
bravof922c4172020-11-24 21:21:43 -0300194 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
kuused124bfe2019-06-18 12:09:24 +0200195
tierno22f4f9c2018-06-11 18:53:39 +0200196 async def check_RO_version(self):
tiernoe64f7fb2019-09-11 08:55:52 +0000197 tries = 14
198 last_error = None
199 while True:
Luis Vegaa27dc532022-11-11 20:10:49 +0000200 ro_uri = self.main_config.RO.uri
201 if not ro_uri:
202 ro_uri = ""
tiernoe64f7fb2019-09-11 08:55:52 +0000203 try:
tierno2357f4e2020-10-19 16:38:59 +0000204 # try new RO, if fail old RO
205 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000206 self.main_config.RO.uri = ro_uri + "ro"
Gabriel Cubae7898982023-05-11 01:57:21 -0500207 ro_server = NgRoClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000208 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000209 self.main_config.RO.ng = True
tierno2357f4e2020-10-19 16:38:59 +0000210 except Exception:
Luis Vegaa27dc532022-11-11 20:10:49 +0000211 self.main_config.RO.uri = ro_uri + "openmano"
Gabriel Cubae7898982023-05-11 01:57:21 -0500212 ro_server = ROClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000213 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000214 self.main_config.RO.ng = False
tiernoe64f7fb2019-09-11 08:55:52 +0000215 if versiontuple(ro_version) < versiontuple(min_RO_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100216 raise LcmException(
217 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version
219 )
220 )
221 self.logger.info(
222 "Connected to RO version {} new-generation version {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000223 ro_version, self.main_config.RO.ng
garciadeblas5697b8b2021-03-24 09:17:02 +0100224 )
225 )
tiernoe64f7fb2019-09-11 08:55:52 +0000226 return
tierno69f0d382020-05-07 13:08:09 +0000227 except (ROClientException, NgRoException) as e:
Luis Vegaa27dc532022-11-11 20:10:49 +0000228 self.main_config.RO.uri = ro_uri
tiernoe64f7fb2019-09-11 08:55:52 +0000229 tries -= 1
bravof922c4172020-11-24 21:21:43 -0300230 traceback.print_tb(e.__traceback__)
garciadeblas5697b8b2021-03-24 09:17:02 +0100231 error_text = "Error while connecting to RO on {}: {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000232 self.main_config.RO.uri, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100233 )
tiernoe64f7fb2019-09-11 08:55:52 +0000234 if tries <= 0:
235 self.logger.critical(error_text)
236 raise LcmException(error_text)
237 if last_error != error_text:
238 last_error = error_text
garciadeblas5697b8b2021-03-24 09:17:02 +0100239 self.logger.error(
240 error_text + ". Waiting until {} seconds".format(5 * tries)
241 )
tiernoe64f7fb2019-09-11 08:55:52 +0000242 await asyncio.sleep(5)
tierno22f4f9c2018-06-11 18:53:39 +0200243
tiernoc0e42e22018-05-11 11:36:10 +0200244 async def test(self, param=None):
245 self.logger.debug("Starting/Ending test task: {}".format(param))
246
tiernoc0e42e22018-05-11 11:36:10 +0200247 async def kafka_ping(self):
248 self.logger.debug("Task kafka_ping Enter")
249 consecutive_errors = 0
250 first_start = True
251 kafka_has_received = False
252 self.pings_not_received = 1
253 while True:
254 try:
tierno16427352019-04-22 11:37:36 +0000255 await self.msg_admin.aiowrite(
garciadeblas5697b8b2021-03-24 09:17:02 +0100256 "admin",
257 "ping",
258 {
259 "from": "lcm",
260 "to": "lcm",
261 "worker_id": self.worker_id,
262 "version": lcm_version,
263 },
garciadeblas5697b8b2021-03-24 09:17:02 +0100264 )
tiernoc0e42e22018-05-11 11:36:10 +0200265 # time between pings are low when it is not received and at starting
garciadeblas5697b8b2021-03-24 09:17:02 +0100266 wait_time = (
267 self.ping_interval_boot
268 if not kafka_has_received
269 else self.ping_interval_pace
270 )
tiernoc0e42e22018-05-11 11:36:10 +0200271 if not self.pings_not_received:
272 kafka_has_received = True
273 self.pings_not_received += 1
Gabriel Cubae7898982023-05-11 01:57:21 -0500274 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200275 if self.pings_not_received > 10:
276 raise LcmException("It is not receiving pings from Kafka bus")
277 consecutive_errors = 0
278 first_start = False
279 except LcmException:
280 raise
281 except Exception as e:
282 # if not first_start is the first time after starting. So leave more time and wait
283 # to allow kafka starts
284 if consecutive_errors == 8 if not first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100285 self.logger.error(
286 "Task kafka_read task exit error too many errors. Exception: {}".format(
287 e
288 )
289 )
tiernoc0e42e22018-05-11 11:36:10 +0200290 raise
291 consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100292 self.logger.error(
293 "Task kafka_read retrying after Exception {}".format(e)
294 )
tierno16427352019-04-22 11:37:36 +0000295 wait_time = 2 if not first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500296 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200297
gcalvinoed7f6d42018-12-14 14:44:56 +0100298 def kafka_read_callback(self, topic, command, params):
299 order_id = 1
300
301 if topic != "admin" and command != "ping":
garciadeblas5697b8b2021-03-24 09:17:02 +0100302 self.logger.debug(
303 "Task kafka_read receives {} {}: {}".format(topic, command, params)
304 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100305 self.consecutive_errors = 0
306 self.first_start = False
307 order_id += 1
308 if command == "exit":
309 raise LcmExceptionExit
310 elif command.startswith("#"):
311 return
312 elif command == "echo":
313 # just for test
314 print(params)
315 sys.stdout.flush()
316 return
317 elif command == "test":
Gabriel Cubae7898982023-05-11 01:57:21 -0500318 asyncio.Task(self.test(params))
gcalvinoed7f6d42018-12-14 14:44:56 +0100319 return
320
321 if topic == "admin":
322 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
tierno16427352019-04-22 11:37:36 +0000323 if params.get("worker_id") != self.worker_id:
324 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100325 self.pings_not_received = 0
tierno3e359b12019-02-03 02:29:13 +0100326 try:
aticig56b86c22022-06-29 10:43:05 +0300327 with open(self.health_check_file, "w") as f:
tierno3e359b12019-02-03 02:29:13 +0100328 f.write(str(time()))
329 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100330 self.logger.error(
331 "Cannot write into '{}' for healthcheck: {}".format(
aticig56b86c22022-06-29 10:43:05 +0300332 self.health_check_file, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100333 )
334 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100335 return
magnussonle9198bb2020-01-21 13:00:51 +0100336 elif topic == "pla":
337 if command == "placement":
338 self.ns.update_nsrs_with_pla_result(params)
339 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100340 elif topic == "k8scluster":
341 if command == "create" or command == "created":
342 k8scluster_id = params.get("_id")
343 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100344 self.lcm_tasks.register(
345 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
346 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100347 return
dariofaccin8bbeeb02023-01-23 18:13:27 +0100348 elif command == "edit" or command == "edited":
349 k8scluster_id = params.get("_id")
350 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
351 self.lcm_tasks.register(
352 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
353 )
354 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100355 elif command == "delete" or command == "deleted":
356 k8scluster_id = params.get("_id")
357 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100358 self.lcm_tasks.register(
359 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
360 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100361 return
David Garciac1fe90a2021-03-31 19:12:02 +0200362 elif topic == "vca":
363 if command == "create" or command == "created":
364 vca_id = params.get("_id")
365 task = asyncio.ensure_future(self.vca.create(params, order_id))
366 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
367 return
Dario Faccin8e53c6d2023-01-10 10:38:41 +0000368 elif command == "edit" or command == "edited":
369 vca_id = params.get("_id")
370 task = asyncio.ensure_future(self.vca.edit(params, order_id))
371 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
372 return
David Garciac1fe90a2021-03-31 19:12:02 +0200373 elif command == "delete" or command == "deleted":
374 vca_id = params.get("_id")
375 task = asyncio.ensure_future(self.vca.delete(params, order_id))
376 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
377 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100378 elif topic == "k8srepo":
379 if command == "create" or command == "created":
380 k8srepo_id = params.get("_id")
381 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
382 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100383 self.lcm_tasks.register(
384 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
385 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100386 return
387 elif command == "delete" or command == "deleted":
388 k8srepo_id = params.get("_id")
389 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100390 self.lcm_tasks.register(
391 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
392 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100393 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100394 elif topic == "ns":
tierno307425f2020-01-26 23:35:59 +0000395 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100396 # self.logger.debug("Deploying NS {}".format(nsr_id))
397 nslcmop = params
398 nslcmop_id = nslcmop["_id"]
399 nsr_id = nslcmop["nsInstanceId"]
400 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100401 self.lcm_tasks.register(
402 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
403 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100404 return
tierno307425f2020-01-26 23:35:59 +0000405 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100406 # self.logger.debug("Deleting NS {}".format(nsr_id))
407 nslcmop = params
408 nslcmop_id = nslcmop["_id"]
409 nsr_id = nslcmop["nsInstanceId"]
410 self.lcm_tasks.cancel(topic, nsr_id)
411 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
412 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
413 return
ksaikiranr3fde2c72021-03-15 10:39:06 +0530414 elif command == "vca_status_refresh":
415 nslcmop = params
416 nslcmop_id = nslcmop["_id"]
417 nsr_id = nslcmop["nsInstanceId"]
garciadeblas5697b8b2021-03-24 09:17:02 +0100418 task = asyncio.ensure_future(
419 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
420 )
421 self.lcm_tasks.register(
422 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
423 )
ksaikiranr3fde2c72021-03-15 10:39:06 +0530424 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100425 elif command == "action":
426 # self.logger.debug("Update NS {}".format(nsr_id))
427 nslcmop = params
428 nslcmop_id = nslcmop["_id"]
429 nsr_id = nslcmop["nsInstanceId"]
430 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
431 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
432 return
aticigdffa6212022-04-12 15:27:53 +0300433 elif command == "update":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
440 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100441 elif command == "scale":
442 # self.logger.debug("Update NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
448 return
garciadeblas07f4e4c2022-06-09 09:42:58 +0200449 elif command == "heal":
450 # self.logger.debug("Healing NS {}".format(nsr_id))
451 nslcmop = params
452 nslcmop_id = nslcmop["_id"]
453 nsr_id = nslcmop["nsInstanceId"]
454 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000455 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
garciadeblas07f4e4c2022-06-09 09:42:58 +0200456 return
elumalai80bcf1c2022-04-28 18:05:01 +0530457 elif command == "migrate":
458 nslcmop = params
459 nslcmop_id = nslcmop["_id"]
460 nsr_id = nslcmop["nsInstanceId"]
461 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
462 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
463 return
govindarajul4ff4b512022-05-02 20:02:41 +0530464 elif command == "verticalscale":
465 nslcmop = params
466 nslcmop_id = nslcmop["_id"]
467 nsr_id = nslcmop["nsInstanceId"]
468 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000469 self.logger.debug(
470 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
471 )
472 self.lcm_tasks.register(
473 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
474 )
475 self.logger.debug(
476 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
477 )
govindarajul4ff4b512022-05-02 20:02:41 +0530478 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100479 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000480 nsr_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100481 try:
482 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100483 print(
484 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
485 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
486 "".format(
487 nsr_id,
488 db_nsr["operational-status"],
489 db_nsr["config-status"],
490 db_nsr["detailed-status"],
491 db_nsr["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500492 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100493 )
494 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100495 except Exception as e:
496 print("nsr {} not found: {}".format(nsr_id, e))
497 sys.stdout.flush()
498 return
499 elif command == "deleted":
500 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100501 elif command in (
elumalaica7ece02022-04-12 12:47:32 +0530502 "vnf_terminated",
elumalaib9e357c2022-04-27 09:58:38 +0530503 "policy_updated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100504 "terminated",
505 "instantiated",
506 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200507 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100508 "actioned",
aticigdffa6212022-04-12 15:27:53 +0300509 "updated",
elumalai80bcf1c2022-04-28 18:05:01 +0530510 "migrated",
govindarajul4ff4b512022-05-02 20:02:41 +0530511 "verticalscaled",
garciadeblas5697b8b2021-03-24 09:17:02 +0100512 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100513 return
elumalaica7ece02022-04-12 12:47:32 +0530514
gcalvinoed7f6d42018-12-14 14:44:56 +0100515 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
tierno307425f2020-01-26 23:35:59 +0000516 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100517 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
518 nsilcmop = params
519 nsilcmop_id = nsilcmop["_id"] # slice operation id
520 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
garciadeblas5697b8b2021-03-24 09:17:02 +0100521 task = asyncio.ensure_future(
522 self.netslice.instantiate(nsir_id, nsilcmop_id)
523 )
524 self.lcm_tasks.register(
525 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
526 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100527 return
tierno307425f2020-01-26 23:35:59 +0000528 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100529 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
530 nsilcmop = params
531 nsilcmop_id = nsilcmop["_id"] # slice operation id
532 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
533 self.lcm_tasks.cancel(topic, nsir_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100534 task = asyncio.ensure_future(
535 self.netslice.terminate(nsir_id, nsilcmop_id)
536 )
537 self.lcm_tasks.register(
538 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
539 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100540 return
541 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000542 nsir_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100543 try:
544 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100545 print(
546 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
547 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
548 "".format(
549 nsir_id,
550 db_nsir["operational-status"],
551 db_nsir["config-status"],
552 db_nsir["detailed-status"],
553 db_nsir["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500554 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100555 )
556 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100557 except Exception as e:
558 print("nsir {} not found: {}".format(nsir_id, e))
559 sys.stdout.flush()
560 return
561 elif command == "deleted":
562 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100563 elif command in (
564 "terminated",
565 "instantiated",
566 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200567 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100568 "actioned",
569 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100570 return
571 elif topic == "vim_account":
572 vim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000573 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000574 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000575 task = asyncio.ensure_future(self.vim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100576 self.lcm_tasks.register(
577 "vim_account", vim_id, order_id, "vim_create", task
578 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100579 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100580 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100581 self.lcm_tasks.cancel(topic, vim_id)
kuuse6a470c62019-07-10 13:52:45 +0200582 task = asyncio.ensure_future(self.vim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100583 self.lcm_tasks.register(
584 "vim_account", vim_id, order_id, "vim_delete", task
585 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100586 return
587 elif command == "show":
588 print("not implemented show with vim_account")
589 sys.stdout.flush()
590 return
tiernof210c1c2019-10-16 09:09:58 +0000591 elif command in ("edit", "edited"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000592 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000593 task = asyncio.ensure_future(self.vim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100594 self.lcm_tasks.register(
595 "vim_account", vim_id, order_id, "vim_edit", task
596 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100597 return
tiernof210c1c2019-10-16 09:09:58 +0000598 elif command == "deleted":
599 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100600 elif topic == "wim_account":
601 wim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000602 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000603 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000604 task = asyncio.ensure_future(self.wim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100605 self.lcm_tasks.register(
606 "wim_account", wim_id, order_id, "wim_create", task
607 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100608 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100609 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100610 self.lcm_tasks.cancel(topic, wim_id)
kuuse6a470c62019-07-10 13:52:45 +0200611 task = asyncio.ensure_future(self.wim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100612 self.lcm_tasks.register(
613 "wim_account", wim_id, order_id, "wim_delete", task
614 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100615 return
616 elif command == "show":
617 print("not implemented show with wim_account")
618 sys.stdout.flush()
619 return
tiernof210c1c2019-10-16 09:09:58 +0000620 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100621 task = asyncio.ensure_future(self.wim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100622 self.lcm_tasks.register(
623 "wim_account", wim_id, order_id, "wim_edit", task
624 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100625 return
tiernof210c1c2019-10-16 09:09:58 +0000626 elif command == "deleted":
627 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100628 elif topic == "sdn":
629 _sdn_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000630 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000631 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000632 task = asyncio.ensure_future(self.sdn.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100633 self.lcm_tasks.register(
634 "sdn", _sdn_id, order_id, "sdn_create", task
635 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100636 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100637 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100638 self.lcm_tasks.cancel(topic, _sdn_id)
kuuse6a470c62019-07-10 13:52:45 +0200639 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
gcalvinoed7f6d42018-12-14 14:44:56 +0100640 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
641 return
tiernof210c1c2019-10-16 09:09:58 +0000642 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100643 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
644 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
645 return
tiernof210c1c2019-10-16 09:09:58 +0000646 elif command == "deleted":
647 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100648 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
649
tiernoc0e42e22018-05-11 11:36:10 +0200650 async def kafka_read(self):
garciadeblas5697b8b2021-03-24 09:17:02 +0100651 self.logger.debug(
652 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
653 )
tiernoc0e42e22018-05-11 11:36:10 +0200654 # future = asyncio.Future()
gcalvinoed7f6d42018-12-14 14:44:56 +0100655 self.consecutive_errors = 0
656 self.first_start = True
657 while self.consecutive_errors < 10:
tiernoc0e42e22018-05-11 11:36:10 +0200658 try:
garciadeblas5697b8b2021-03-24 09:17:02 +0100659 topics = (
660 "ns",
661 "vim_account",
662 "wim_account",
663 "sdn",
664 "nsi",
665 "k8scluster",
666 "vca",
667 "k8srepo",
668 "pla",
669 )
670 topics_admin = ("admin",)
tierno16427352019-04-22 11:37:36 +0000671 await asyncio.gather(
garciadeblas5697b8b2021-03-24 09:17:02 +0100672 self.msg.aioread(
Gabriel Cubae7898982023-05-11 01:57:21 -0500673 topics, self.kafka_read_callback, from_beginning=True
garciadeblas5697b8b2021-03-24 09:17:02 +0100674 ),
675 self.msg_admin.aioread(
676 topics_admin,
garciadeblas5697b8b2021-03-24 09:17:02 +0100677 self.kafka_read_callback,
678 group_id=False,
679 ),
tierno16427352019-04-22 11:37:36 +0000680 )
tiernoc0e42e22018-05-11 11:36:10 +0200681
gcalvinoed7f6d42018-12-14 14:44:56 +0100682 except LcmExceptionExit:
683 self.logger.debug("Bye!")
684 break
tiernoc0e42e22018-05-11 11:36:10 +0200685 except Exception as e:
686 # if not first_start is the first time after starting. So leave more time and wait
687 # to allow kafka starts
gcalvinoed7f6d42018-12-14 14:44:56 +0100688 if self.consecutive_errors == 8 if not self.first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100689 self.logger.error(
690 "Task kafka_read task exit error too many errors. Exception: {}".format(
691 e
692 )
693 )
tiernoc0e42e22018-05-11 11:36:10 +0200694 raise
gcalvinoed7f6d42018-12-14 14:44:56 +0100695 self.consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100696 self.logger.error(
697 "Task kafka_read retrying after Exception {}".format(e)
698 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100699 wait_time = 2 if not self.first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500700 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200701
702 # self.logger.debug("Task kafka_read terminating")
703 self.logger.debug("Task kafka_read exit")
704
Gabriel Cubae7898982023-05-11 01:57:21 -0500705 async def kafka_read_ping(self):
706 await asyncio.gather(self.kafka_read(), self.kafka_ping())
707
Mark Beierl1addc932023-05-18 15:11:34 -0400708 async def start(self):
tierno22f4f9c2018-06-11 18:53:39 +0200709 # check RO version
Mark Beierl1addc932023-05-18 15:11:34 -0400710 await self.check_RO_version()
tierno22f4f9c2018-06-11 18:53:39 +0200711
Gabriel Cubae7898982023-05-11 01:57:21 -0500712 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
Luis Vegaa27dc532022-11-11 20:10:49 +0000713 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
garciadeblas5697b8b2021-03-24 09:17:02 +0100714 self.netslice = netslice.NetsliceLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500715 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
garciadeblas5697b8b2021-03-24 09:17:02 +0100716 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500717 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
718 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
719 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100720 self.k8scluster = vim_sdn.K8sClusterLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500721 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100722 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500723 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100724 self.k8srepo = vim_sdn.K8sRepoLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500725 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100726 )
tierno2357f4e2020-10-19 16:38:59 +0000727
Mark Beierl1addc932023-05-18 15:11:34 -0400728 await self.kafka_read_ping()
bravof73bac502021-05-11 07:38:47 -0400729
tiernoc0e42e22018-05-11 11:36:10 +0200730 # TODO
731 # self.logger.debug("Terminating cancelling creation tasks")
tiernoca2e16a2018-06-29 15:25:24 +0200732 # self.lcm_tasks.cancel("ALL", "create")
tiernoc0e42e22018-05-11 11:36:10 +0200733 # timeout = 200
734 # while self.is_pending_tasks():
735 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
Gabriel Cubae7898982023-05-11 01:57:21 -0500736 # await asyncio.sleep(2)
tiernoc0e42e22018-05-11 11:36:10 +0200737 # timeout -= 2
738 # if not timeout:
tiernoca2e16a2018-06-29 15:25:24 +0200739 # self.lcm_tasks.cancel("ALL", "ALL")
tiernoc0e42e22018-05-11 11:36:10 +0200740 if self.db:
741 self.db.db_disconnect()
742 if self.msg:
743 self.msg.disconnect()
tierno16427352019-04-22 11:37:36 +0000744 if self.msg_admin:
745 self.msg_admin.disconnect()
tiernoc0e42e22018-05-11 11:36:10 +0200746 if self.fs:
747 self.fs.fs_disconnect()
748
tiernoc0e42e22018-05-11 11:36:10 +0200749 def read_config_file(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +0200750 try:
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500751 with open(config_file) as f:
752 return yaml.safe_load(f)
tiernoc0e42e22018-05-11 11:36:10 +0200753 except Exception as e:
754 self.logger.critical("At config file '{}': {}".format(config_file, e))
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500755 exit(1)
tiernoc0e42e22018-05-11 11:36:10 +0200756
tierno16427352019-04-22 11:37:36 +0000757 @staticmethod
758 def get_process_id():
759 """
760 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
761 will provide a random one
762 :return: Obtained ID
763 """
764 # Try getting docker id. If fails, get pid
765 try:
766 with open("/proc/self/cgroup", "r") as f:
767 text_id_ = f.readline()
768 _, _, text_id = text_id_.rpartition("/")
garciadeblas5697b8b2021-03-24 09:17:02 +0100769 text_id = text_id.replace("\n", "")[:12]
tierno16427352019-04-22 11:37:36 +0000770 if text_id:
771 return text_id
772 except Exception:
773 pass
774 # Return a random id
garciadeblas5697b8b2021-03-24 09:17:02 +0100775 return "".join(random_choice("0123456789abcdef") for _ in range(12))
tierno16427352019-04-22 11:37:36 +0000776
tiernoc0e42e22018-05-11 11:36:10 +0200777
tierno275411e2018-05-16 14:33:32 +0200778def usage():
garciadeblas5697b8b2021-03-24 09:17:02 +0100779 print(
780 """Usage: {} [options]
quilesj7e13aeb2019-10-08 13:34:55 +0200781 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
tiernoa9843d82018-10-24 10:44:20 +0200782 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
tierno275411e2018-05-16 14:33:32 +0200783 -h|--help: shows this help
garciadeblas5697b8b2021-03-24 09:17:02 +0100784 """.format(
785 sys.argv[0]
786 )
787 )
tierno750b2452018-05-17 16:39:29 +0200788 # --log-socket-host HOST: send logs to this host")
789 # --log-socket-port PORT: send logs using this port (default: 9022)")
tierno275411e2018-05-16 14:33:32 +0200790
791
garciadeblas5697b8b2021-03-24 09:17:02 +0100792if __name__ == "__main__":
tierno275411e2018-05-16 14:33:32 +0200793 try:
tierno8c16b052020-02-05 15:08:32 +0000794 # print("SYS.PATH='{}'".format(sys.path))
tierno275411e2018-05-16 14:33:32 +0200795 # load parameters and configuration
quilesj7e13aeb2019-10-08 13:34:55 +0200796 # -h
797 # -c value
798 # --config value
799 # --help
800 # --health-check
garciadeblas5697b8b2021-03-24 09:17:02 +0100801 opts, args = getopt.getopt(
802 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
803 )
tierno275411e2018-05-16 14:33:32 +0200804 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
805 config_file = None
806 for o, a in opts:
807 if o in ("-h", "--help"):
808 usage()
809 sys.exit()
810 elif o in ("-c", "--config"):
811 config_file = a
tiernoa9843d82018-10-24 10:44:20 +0200812 elif o == "--health-check":
tierno94f06112020-02-11 12:38:19 +0000813 from osm_lcm.lcm_hc import health_check
garciadeblas5697b8b2021-03-24 09:17:02 +0100814
aticig56b86c22022-06-29 10:43:05 +0300815 health_check(config_file, Lcm.ping_interval_pace)
tierno275411e2018-05-16 14:33:32 +0200816 # elif o == "--log-socket-port":
817 # log_socket_port = a
818 # elif o == "--log-socket-host":
819 # log_socket_host = a
820 # elif o == "--log-file":
821 # log_file = a
822 else:
823 assert False, "Unhandled option"
quilesj7e13aeb2019-10-08 13:34:55 +0200824
tierno275411e2018-05-16 14:33:32 +0200825 if config_file:
826 if not path.isfile(config_file):
garciadeblas5697b8b2021-03-24 09:17:02 +0100827 print(
828 "configuration file '{}' does not exist".format(config_file),
829 file=sys.stderr,
830 )
tierno275411e2018-05-16 14:33:32 +0200831 exit(1)
832 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100833 for config_file in (
834 __file__[: __file__.rfind(".")] + ".cfg",
835 "./lcm.cfg",
836 "/etc/osm/lcm.cfg",
837 ):
tierno275411e2018-05-16 14:33:32 +0200838 if path.isfile(config_file):
839 break
840 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100841 print(
842 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
843 file=sys.stderr,
844 )
tierno275411e2018-05-16 14:33:32 +0200845 exit(1)
846 lcm = Lcm(config_file)
Mark Beierl1addc932023-05-18 15:11:34 -0400847 asyncio.run(lcm.start())
tierno22f4f9c2018-06-11 18:53:39 +0200848 except (LcmException, getopt.GetoptError) as e:
tierno275411e2018-05-16 14:33:32 +0200849 print(str(e), file=sys.stderr)
850 # usage()
851 exit(1)