blob: 1c81da110329ee4e714da5e3475d73c0922d4d12 [file] [log] [blame]
tiernoc0e42e22018-05-11 11:36:10 +02001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
tierno2e215512018-11-28 09:37:52 +00004##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
quilesj7e13aeb2019-10-08 13:34:55 +020020
21# DEBUG WITH PDB
quilesj7e13aeb2019-10-08 13:34:55 +020022import pdb
23
k4.rahul44306072023-05-05 14:24:31 +053024import os
tiernoc0e42e22018-05-11 11:36:10 +020025import asyncio
26import yaml
tierno275411e2018-05-16 14:33:32 +020027import logging
28import logging.handlers
29import getopt
tierno275411e2018-05-16 14:33:32 +020030import sys
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050031from random import SystemRandom
tierno59d22d22018-09-25 18:10:19 +020032
bravof73bac502021-05-11 07:38:47 -040033from osm_lcm import ns, vim_sdn, netslice
tierno69f0d382020-05-07 13:08:09 +000034from osm_lcm.ng_ro import NgRoException, NgRoClient
35from osm_lcm.ROclient import ROClient, ROClientException
quilesj7e13aeb2019-10-08 13:34:55 +020036
tierno94f06112020-02-11 12:38:19 +000037from time import time
tierno8069ce52019-08-28 15:34:33 +000038from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
tiernoa4dea5a2020-01-05 16:29:30 +000039from osm_lcm import version as lcm_version, version_date as lcm_version_date
tierno8069ce52019-08-28 15:34:33 +000040
bravof922c4172020-11-24 21:21:43 -030041from osm_common import msglocal, msgkafka
tierno98768132018-09-11 12:07:21 +020042from osm_common import version as common_version
tierno59d22d22018-09-25 18:10:19 +020043from osm_common.dbbase import DbException
tiernoc0e42e22018-05-11 11:36:10 +020044from osm_common.fsbase import FsException
45from osm_common.msgbase import MsgException
bravof922c4172020-11-24 21:21:43 -030046from osm_lcm.data_utils.database.database import Database
47from osm_lcm.data_utils.filesystem.filesystem import Filesystem
Luis Vegaa27dc532022-11-11 20:10:49 +000048from osm_lcm.data_utils.lcm_config import LcmCfg
aticig56b86c22022-06-29 10:43:05 +030049from osm_lcm.lcm_hc import get_health_check_file
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050050from os import path, getenv
tierno59d22d22018-09-25 18:10:19 +020051from n2vc import version as n2vc_version
bravof922c4172020-11-24 21:21:43 -030052import traceback
tiernoc0e42e22018-05-11 11:36:10 +020053
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050054if getenv("OSMLCM_PDB_DEBUG", None) is not None:
quilesj7e13aeb2019-10-08 13:34:55 +020055 pdb.set_trace()
56
tiernoc0e42e22018-05-11 11:36:10 +020057
tierno275411e2018-05-16 14:33:32 +020058__author__ = "Alfonso Tierno"
tiernoe64f7fb2019-09-11 08:55:52 +000059min_RO_version = "6.0.2"
tierno6e9d2eb2018-09-12 17:47:18 +020060min_n2vc_version = "0.0.2"
quilesj7e13aeb2019-10-08 13:34:55 +020061
tierno16427352019-04-22 11:37:36 +000062min_common_version = "0.1.19"
tierno275411e2018-05-16 14:33:32 +020063
64
tiernoc0e42e22018-05-11 11:36:10 +020065class Lcm:
garciadeblas5697b8b2021-03-24 09:17:02 +010066 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
Luis Vegaa27dc532022-11-11 20:10:49 +000070
71 main_config = LcmCfg()
tiernoa9843d82018-10-24 10:44:20 +020072
Gabriel Cubae7898982023-05-11 01:57:21 -050073 def __init__(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +020074 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
tiernoc0e42e22018-05-11 11:36:10 +020079 self.db = None
80 self.msg = None
tierno16427352019-04-22 11:37:36 +000081 self.msg_admin = None
tiernoc0e42e22018-05-11 11:36:10 +020082 self.fs = None
83 self.pings_not_received = 1
tiernoc2564fe2019-01-28 16:18:56 +000084 self.consecutive_errors = 0
85 self.first_start = False
tiernoc0e42e22018-05-11 11:36:10 +020086
tiernoc0e42e22018-05-11 11:36:10 +020087 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +010088 self.logger = logging.getLogger("lcm")
tierno16427352019-04-22 11:37:36 +000089 # get id
90 self.worker_id = self.get_process_id()
tiernoc0e42e22018-05-11 11:36:10 +020091 # load configuration
92 config = self.read_config_file(config_file)
Luis Vegaa27dc532022-11-11 20:10:49 +000093 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +010099 self.ns = (
100 self.netslice
101 ) = (
102 self.vim
103 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
tiernoc0e42e22018-05-11 11:36:10 +0200104
105 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +0100106 log_format_simple = (
107 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 )
109 log_formatter_simple = logging.Formatter(
110 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
111 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000112 if self.main_config.globalConfig.logfile:
garciadeblas5697b8b2021-03-24 09:17:02 +0100113 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000114 self.main_config.globalConfig.logfile,
115 maxBytes=100e6,
116 backupCount=9,
117 delay=0,
garciadeblas5697b8b2021-03-24 09:17:02 +0100118 )
tiernoc0e42e22018-05-11 11:36:10 +0200119 file_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000121 if not self.main_config.globalConfig.to_dict()["nologging"]:
tiernoc0e42e22018-05-11 11:36:10 +0200122 str_handler = logging.StreamHandler()
123 str_handler.setFormatter(log_formatter_simple)
124 self.logger.addHandler(str_handler)
125
Luis Vegaa27dc532022-11-11 20:10:49 +0000126 if self.main_config.globalConfig.to_dict()["loglevel"]:
127 self.logger.setLevel(self.main_config.globalConfig.loglevel)
tiernoc0e42e22018-05-11 11:36:10 +0200128
129 # logging other modules
Luis Vegaa27dc532022-11-11 20:10:49 +0000130 for logger in ("message", "database", "storage", "tsdb"):
131 logger_config = self.main_config.to_dict()[logger]
132 logger_module = logging.getLogger(logger_config["logger_name"])
133 if logger_config["logfile"]:
garciadeblas5697b8b2021-03-24 09:17:02 +0100134 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000135 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
garciadeblas5697b8b2021-03-24 09:17:02 +0100136 )
tiernoc0e42e22018-05-11 11:36:10 +0200137 file_handler.setFormatter(log_formatter_simple)
138 logger_module.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000139 if logger_config["loglevel"]:
140 logger_module.setLevel(logger_config["loglevel"])
garciadeblas5697b8b2021-03-24 09:17:02 +0100141 self.logger.critical(
142 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
143 )
tierno59d22d22018-09-25 18:10:19 +0200144
tiernoc0e42e22018-05-11 11:36:10 +0200145 # check version of N2VC
146 # TODO enhance with int conversion or from distutils.version import LooseVersion
147 # or with list(map(int, version.split(".")))
tierno59d22d22018-09-25 18:10:19 +0200148 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100149 raise LcmException(
150 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
151 n2vc_version, min_n2vc_version
152 )
153 )
tierno59d22d22018-09-25 18:10:19 +0200154 # check version of common
tierno27246d82018-09-27 15:59:09 +0200155 if versiontuple(common_version) < versiontuple(min_common_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100156 raise LcmException(
157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
158 common_version, min_common_version
159 )
160 )
tierno22f4f9c2018-06-11 18:53:39 +0200161
tiernoc0e42e22018-05-11 11:36:10 +0200162 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000163 self.db = Database(self.main_config.to_dict()).instance.db
tiernoc0e42e22018-05-11 11:36:10 +0200164
Luis Vegaa27dc532022-11-11 20:10:49 +0000165 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
sousaedu40365e82021-07-26 15:24:21 +0200166 self.fs.sync()
tiernoc0e42e22018-05-11 11:36:10 +0200167
quilesj7e13aeb2019-10-08 13:34:55 +0200168 # copy message configuration in order to remove 'group_id' for msg_admin
Luis Vegaa27dc532022-11-11 20:10:49 +0000169 config_message = self.main_config.message.to_dict()
Gabriel Cubae7898982023-05-11 01:57:21 -0500170 config_message["loop"] = asyncio.get_event_loop()
tiernoc2564fe2019-01-28 16:18:56 +0000171 if config_message["driver"] == "local":
tiernoc0e42e22018-05-11 11:36:10 +0200172 self.msg = msglocal.MsgLocal()
tiernoc2564fe2019-01-28 16:18:56 +0000173 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
tiernoc2564fe2019-01-28 16:18:56 +0000177 elif config_message["driver"] == "kafka":
tiernoc0e42e22018-05-11 11:36:10 +0200178 self.msg = msgkafka.MsgKafka()
tiernoc2564fe2019-01-28 16:18:56 +0000179 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
tiernoc0e42e22018-05-11 11:36:10 +0200183 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100184 raise LcmException(
185 "Invalid configuration param '{}' at '[message]':'driver'".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000186 self.main_config.message.driver
garciadeblas5697b8b2021-03-24 09:17:02 +0100187 )
188 )
tiernoc0e42e22018-05-11 11:36:10 +0200189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
kuused124bfe2019-06-18 12:09:24 +0200193 # contains created tasks/futures to be able to cancel
bravof922c4172020-11-24 21:21:43 -0300194 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
kuused124bfe2019-06-18 12:09:24 +0200195
tierno22f4f9c2018-06-11 18:53:39 +0200196 async def check_RO_version(self):
tiernoe64f7fb2019-09-11 08:55:52 +0000197 tries = 14
198 last_error = None
199 while True:
Luis Vegaa27dc532022-11-11 20:10:49 +0000200 ro_uri = self.main_config.RO.uri
201 if not ro_uri:
202 ro_uri = ""
tiernoe64f7fb2019-09-11 08:55:52 +0000203 try:
tierno2357f4e2020-10-19 16:38:59 +0000204 # try new RO, if fail old RO
205 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000206 self.main_config.RO.uri = ro_uri + "ro"
Gabriel Cubae7898982023-05-11 01:57:21 -0500207 ro_server = NgRoClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000208 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000209 self.main_config.RO.ng = True
tierno2357f4e2020-10-19 16:38:59 +0000210 except Exception:
Luis Vegaa27dc532022-11-11 20:10:49 +0000211 self.main_config.RO.uri = ro_uri + "openmano"
Gabriel Cubae7898982023-05-11 01:57:21 -0500212 ro_server = ROClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000213 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000214 self.main_config.RO.ng = False
tiernoe64f7fb2019-09-11 08:55:52 +0000215 if versiontuple(ro_version) < versiontuple(min_RO_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100216 raise LcmException(
217 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version
219 )
220 )
221 self.logger.info(
222 "Connected to RO version {} new-generation version {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000223 ro_version, self.main_config.RO.ng
garciadeblas5697b8b2021-03-24 09:17:02 +0100224 )
225 )
tiernoe64f7fb2019-09-11 08:55:52 +0000226 return
tierno69f0d382020-05-07 13:08:09 +0000227 except (ROClientException, NgRoException) as e:
Luis Vegaa27dc532022-11-11 20:10:49 +0000228 self.main_config.RO.uri = ro_uri
tiernoe64f7fb2019-09-11 08:55:52 +0000229 tries -= 1
bravof922c4172020-11-24 21:21:43 -0300230 traceback.print_tb(e.__traceback__)
garciadeblas5697b8b2021-03-24 09:17:02 +0100231 error_text = "Error while connecting to RO on {}: {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000232 self.main_config.RO.uri, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100233 )
tiernoe64f7fb2019-09-11 08:55:52 +0000234 if tries <= 0:
235 self.logger.critical(error_text)
236 raise LcmException(error_text)
237 if last_error != error_text:
238 last_error = error_text
garciadeblas5697b8b2021-03-24 09:17:02 +0100239 self.logger.error(
240 error_text + ". Waiting until {} seconds".format(5 * tries)
241 )
tiernoe64f7fb2019-09-11 08:55:52 +0000242 await asyncio.sleep(5)
tierno22f4f9c2018-06-11 18:53:39 +0200243
tiernoc0e42e22018-05-11 11:36:10 +0200244 async def test(self, param=None):
245 self.logger.debug("Starting/Ending test task: {}".format(param))
246
tiernoc0e42e22018-05-11 11:36:10 +0200247 async def kafka_ping(self):
248 self.logger.debug("Task kafka_ping Enter")
249 consecutive_errors = 0
250 first_start = True
251 kafka_has_received = False
252 self.pings_not_received = 1
253 while True:
254 try:
tierno16427352019-04-22 11:37:36 +0000255 await self.msg_admin.aiowrite(
garciadeblas5697b8b2021-03-24 09:17:02 +0100256 "admin",
257 "ping",
258 {
259 "from": "lcm",
260 "to": "lcm",
261 "worker_id": self.worker_id,
262 "version": lcm_version,
263 },
garciadeblas5697b8b2021-03-24 09:17:02 +0100264 )
tiernoc0e42e22018-05-11 11:36:10 +0200265 # time between pings are low when it is not received and at starting
garciadeblas5697b8b2021-03-24 09:17:02 +0100266 wait_time = (
267 self.ping_interval_boot
268 if not kafka_has_received
269 else self.ping_interval_pace
270 )
tiernoc0e42e22018-05-11 11:36:10 +0200271 if not self.pings_not_received:
272 kafka_has_received = True
273 self.pings_not_received += 1
Gabriel Cubae7898982023-05-11 01:57:21 -0500274 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200275 if self.pings_not_received > 10:
276 raise LcmException("It is not receiving pings from Kafka bus")
277 consecutive_errors = 0
278 first_start = False
279 except LcmException:
280 raise
281 except Exception as e:
282 # if not first_start is the first time after starting. So leave more time and wait
283 # to allow kafka starts
284 if consecutive_errors == 8 if not first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100285 self.logger.error(
286 "Task kafka_read task exit error too many errors. Exception: {}".format(
287 e
288 )
289 )
tiernoc0e42e22018-05-11 11:36:10 +0200290 raise
291 consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100292 self.logger.error(
293 "Task kafka_read retrying after Exception {}".format(e)
294 )
tierno16427352019-04-22 11:37:36 +0000295 wait_time = 2 if not first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500296 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200297
Gabriel Cubab6049d32023-10-30 13:44:49 -0500298 async def kafka_read_callback(self, topic, command, params):
gcalvinoed7f6d42018-12-14 14:44:56 +0100299 order_id = 1
300
301 if topic != "admin" and command != "ping":
garciadeblas5697b8b2021-03-24 09:17:02 +0100302 self.logger.debug(
303 "Task kafka_read receives {} {}: {}".format(topic, command, params)
304 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100305 self.consecutive_errors = 0
306 self.first_start = False
307 order_id += 1
308 if command == "exit":
309 raise LcmExceptionExit
310 elif command.startswith("#"):
311 return
312 elif command == "echo":
313 # just for test
314 print(params)
315 sys.stdout.flush()
316 return
317 elif command == "test":
Gabriel Cubae7898982023-05-11 01:57:21 -0500318 asyncio.Task(self.test(params))
gcalvinoed7f6d42018-12-14 14:44:56 +0100319 return
320
321 if topic == "admin":
322 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
tierno16427352019-04-22 11:37:36 +0000323 if params.get("worker_id") != self.worker_id:
324 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100325 self.pings_not_received = 0
tierno3e359b12019-02-03 02:29:13 +0100326 try:
aticig56b86c22022-06-29 10:43:05 +0300327 with open(self.health_check_file, "w") as f:
tierno3e359b12019-02-03 02:29:13 +0100328 f.write(str(time()))
329 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100330 self.logger.error(
331 "Cannot write into '{}' for healthcheck: {}".format(
aticig56b86c22022-06-29 10:43:05 +0300332 self.health_check_file, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100333 )
334 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100335 return
Gabriel Cubab6049d32023-10-30 13:44:49 -0500336 elif topic == "nslcmops":
337 if command == "cancel":
338 nslcmop_id = params["_id"]
339 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
340 nsr_id = params["nsInstanceId"]
341 # cancel the tasks and wait
342 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
343 try:
344 await task
345 self.logger.debug(
346 "Cancelled task ended {},{},{}".format(
347 nsr_id, nslcmop_id, task
348 )
349 )
350 except asyncio.CancelledError:
351 self.logger.debug(
352 "Task already cancelled and finished {},{},{}".format(
353 nsr_id, nslcmop_id, task
354 )
355 )
356 # update DB
357 q_filter = {"_id": nslcmop_id}
358 update_dict = {
359 "operationState": "FAILED_TEMP",
360 "isCancelPending": False,
361 }
362 unset_dict = {
363 "cancelMode": None,
364 }
365 self.db.set_one(
366 "nslcmops",
367 q_filter=q_filter,
368 update_dict=update_dict,
369 fail_on_empty=False,
370 unset=unset_dict,
371 )
372 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
373 return
magnussonle9198bb2020-01-21 13:00:51 +0100374 elif topic == "pla":
375 if command == "placement":
376 self.ns.update_nsrs_with_pla_result(params)
377 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100378 elif topic == "k8scluster":
379 if command == "create" or command == "created":
380 k8scluster_id = params.get("_id")
381 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100382 self.lcm_tasks.register(
383 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
384 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100385 return
dariofaccin8bbeeb02023-01-23 18:13:27 +0100386 elif command == "edit" or command == "edited":
387 k8scluster_id = params.get("_id")
388 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
389 self.lcm_tasks.register(
390 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
391 )
392 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100393 elif command == "delete" or command == "deleted":
394 k8scluster_id = params.get("_id")
395 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100396 self.lcm_tasks.register(
397 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
398 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100399 return
David Garciac1fe90a2021-03-31 19:12:02 +0200400 elif topic == "vca":
401 if command == "create" or command == "created":
402 vca_id = params.get("_id")
403 task = asyncio.ensure_future(self.vca.create(params, order_id))
404 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
405 return
Dario Faccin8e53c6d2023-01-10 10:38:41 +0000406 elif command == "edit" or command == "edited":
407 vca_id = params.get("_id")
408 task = asyncio.ensure_future(self.vca.edit(params, order_id))
409 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
410 return
David Garciac1fe90a2021-03-31 19:12:02 +0200411 elif command == "delete" or command == "deleted":
412 vca_id = params.get("_id")
413 task = asyncio.ensure_future(self.vca.delete(params, order_id))
414 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
415 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100416 elif topic == "k8srepo":
417 if command == "create" or command == "created":
418 k8srepo_id = params.get("_id")
419 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
420 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100421 self.lcm_tasks.register(
422 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
423 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100424 return
425 elif command == "delete" or command == "deleted":
426 k8srepo_id = params.get("_id")
427 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100428 self.lcm_tasks.register(
429 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
430 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100431 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100432 elif topic == "ns":
tierno307425f2020-01-26 23:35:59 +0000433 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100434 # self.logger.debug("Deploying NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100439 self.lcm_tasks.register(
440 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
441 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100442 return
tierno307425f2020-01-26 23:35:59 +0000443 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100444 # self.logger.debug("Deleting NS {}".format(nsr_id))
445 nslcmop = params
446 nslcmop_id = nslcmop["_id"]
447 nsr_id = nslcmop["nsInstanceId"]
448 self.lcm_tasks.cancel(topic, nsr_id)
449 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
450 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
451 return
ksaikiranr3fde2c72021-03-15 10:39:06 +0530452 elif command == "vca_status_refresh":
453 nslcmop = params
454 nslcmop_id = nslcmop["_id"]
455 nsr_id = nslcmop["nsInstanceId"]
garciadeblas5697b8b2021-03-24 09:17:02 +0100456 task = asyncio.ensure_future(
457 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
458 )
459 self.lcm_tasks.register(
460 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
461 )
ksaikiranr3fde2c72021-03-15 10:39:06 +0530462 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100463 elif command == "action":
464 # self.logger.debug("Update NS {}".format(nsr_id))
465 nslcmop = params
466 nslcmop_id = nslcmop["_id"]
467 nsr_id = nslcmop["nsInstanceId"]
468 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
469 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
470 return
aticigdffa6212022-04-12 15:27:53 +0300471 elif command == "update":
472 # self.logger.debug("Update NS {}".format(nsr_id))
473 nslcmop = params
474 nslcmop_id = nslcmop["_id"]
475 nsr_id = nslcmop["nsInstanceId"]
476 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
477 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
478 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100479 elif command == "scale":
480 # self.logger.debug("Update NS {}".format(nsr_id))
481 nslcmop = params
482 nslcmop_id = nslcmop["_id"]
483 nsr_id = nslcmop["nsInstanceId"]
484 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
485 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
486 return
garciadeblas07f4e4c2022-06-09 09:42:58 +0200487 elif command == "heal":
488 # self.logger.debug("Healing NS {}".format(nsr_id))
489 nslcmop = params
490 nslcmop_id = nslcmop["_id"]
491 nsr_id = nslcmop["nsInstanceId"]
492 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000493 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
garciadeblas07f4e4c2022-06-09 09:42:58 +0200494 return
elumalai80bcf1c2022-04-28 18:05:01 +0530495 elif command == "migrate":
496 nslcmop = params
497 nslcmop_id = nslcmop["_id"]
498 nsr_id = nslcmop["nsInstanceId"]
499 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
500 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
501 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100502 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000503 nsr_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100504 try:
505 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100506 print(
507 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
508 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
509 "".format(
510 nsr_id,
511 db_nsr["operational-status"],
512 db_nsr["config-status"],
513 db_nsr["detailed-status"],
514 db_nsr["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500515 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100516 )
517 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100518 except Exception as e:
519 print("nsr {} not found: {}".format(nsr_id, e))
520 sys.stdout.flush()
521 return
522 elif command == "deleted":
523 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100524 elif command in (
elumalaica7ece02022-04-12 12:47:32 +0530525 "vnf_terminated",
elumalaib9e357c2022-04-27 09:58:38 +0530526 "policy_updated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100527 "terminated",
528 "instantiated",
529 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200530 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100531 "actioned",
aticigdffa6212022-04-12 15:27:53 +0300532 "updated",
elumalai80bcf1c2022-04-28 18:05:01 +0530533 "migrated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100534 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100535 return
elumalaica7ece02022-04-12 12:47:32 +0530536
gcalvinoed7f6d42018-12-14 14:44:56 +0100537 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
tierno307425f2020-01-26 23:35:59 +0000538 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100539 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
540 nsilcmop = params
541 nsilcmop_id = nsilcmop["_id"] # slice operation id
542 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
garciadeblas5697b8b2021-03-24 09:17:02 +0100543 task = asyncio.ensure_future(
544 self.netslice.instantiate(nsir_id, nsilcmop_id)
545 )
546 self.lcm_tasks.register(
547 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
548 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100549 return
tierno307425f2020-01-26 23:35:59 +0000550 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100551 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
552 nsilcmop = params
553 nsilcmop_id = nsilcmop["_id"] # slice operation id
554 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
555 self.lcm_tasks.cancel(topic, nsir_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100556 task = asyncio.ensure_future(
557 self.netslice.terminate(nsir_id, nsilcmop_id)
558 )
559 self.lcm_tasks.register(
560 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
561 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100562 return
563 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000564 nsir_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100565 try:
566 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100567 print(
568 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
569 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
570 "".format(
571 nsir_id,
572 db_nsir["operational-status"],
573 db_nsir["config-status"],
574 db_nsir["detailed-status"],
575 db_nsir["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500576 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100577 )
578 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100579 except Exception as e:
580 print("nsir {} not found: {}".format(nsir_id, e))
581 sys.stdout.flush()
582 return
583 elif command == "deleted":
584 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100585 elif command in (
586 "terminated",
587 "instantiated",
588 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200589 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100590 "actioned",
591 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100592 return
593 elif topic == "vim_account":
594 vim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000595 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000596 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000597 task = asyncio.ensure_future(self.vim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100598 self.lcm_tasks.register(
599 "vim_account", vim_id, order_id, "vim_create", task
600 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100601 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100602 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100603 self.lcm_tasks.cancel(topic, vim_id)
kuuse6a470c62019-07-10 13:52:45 +0200604 task = asyncio.ensure_future(self.vim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100605 self.lcm_tasks.register(
606 "vim_account", vim_id, order_id, "vim_delete", task
607 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100608 return
609 elif command == "show":
610 print("not implemented show with vim_account")
611 sys.stdout.flush()
612 return
tiernof210c1c2019-10-16 09:09:58 +0000613 elif command in ("edit", "edited"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000614 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000615 task = asyncio.ensure_future(self.vim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100616 self.lcm_tasks.register(
617 "vim_account", vim_id, order_id, "vim_edit", task
618 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100619 return
tiernof210c1c2019-10-16 09:09:58 +0000620 elif command == "deleted":
621 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100622 elif topic == "wim_account":
623 wim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000624 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000625 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000626 task = asyncio.ensure_future(self.wim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100627 self.lcm_tasks.register(
628 "wim_account", wim_id, order_id, "wim_create", task
629 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100630 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100631 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100632 self.lcm_tasks.cancel(topic, wim_id)
kuuse6a470c62019-07-10 13:52:45 +0200633 task = asyncio.ensure_future(self.wim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100634 self.lcm_tasks.register(
635 "wim_account", wim_id, order_id, "wim_delete", task
636 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100637 return
638 elif command == "show":
639 print("not implemented show with wim_account")
640 sys.stdout.flush()
641 return
tiernof210c1c2019-10-16 09:09:58 +0000642 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100643 task = asyncio.ensure_future(self.wim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100644 self.lcm_tasks.register(
645 "wim_account", wim_id, order_id, "wim_edit", task
646 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100647 return
tiernof210c1c2019-10-16 09:09:58 +0000648 elif command == "deleted":
649 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100650 elif topic == "sdn":
651 _sdn_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000652 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000653 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000654 task = asyncio.ensure_future(self.sdn.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100655 self.lcm_tasks.register(
656 "sdn", _sdn_id, order_id, "sdn_create", task
657 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100658 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100659 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100660 self.lcm_tasks.cancel(topic, _sdn_id)
kuuse6a470c62019-07-10 13:52:45 +0200661 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
gcalvinoed7f6d42018-12-14 14:44:56 +0100662 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
663 return
tiernof210c1c2019-10-16 09:09:58 +0000664 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100665 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
666 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
667 return
tiernof210c1c2019-10-16 09:09:58 +0000668 elif command == "deleted":
669 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100670 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
671
tiernoc0e42e22018-05-11 11:36:10 +0200672 async def kafka_read(self):
garciadeblas5697b8b2021-03-24 09:17:02 +0100673 self.logger.debug(
674 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
675 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100676 self.consecutive_errors = 0
677 self.first_start = True
678 while self.consecutive_errors < 10:
tiernoc0e42e22018-05-11 11:36:10 +0200679 try:
garciadeblas5697b8b2021-03-24 09:17:02 +0100680 topics = (
681 "ns",
682 "vim_account",
683 "wim_account",
684 "sdn",
685 "nsi",
686 "k8scluster",
687 "vca",
688 "k8srepo",
689 "pla",
Gabriel Cubab6049d32023-10-30 13:44:49 -0500690 "nslcmops",
garciadeblas5697b8b2021-03-24 09:17:02 +0100691 )
692 topics_admin = ("admin",)
tierno16427352019-04-22 11:37:36 +0000693 await asyncio.gather(
garciadeblas5697b8b2021-03-24 09:17:02 +0100694 self.msg.aioread(
Gabriel Cubab6049d32023-10-30 13:44:49 -0500695 topics,
696 aiocallback=self.kafka_read_callback,
697 from_beginning=True,
garciadeblas5697b8b2021-03-24 09:17:02 +0100698 ),
699 self.msg_admin.aioread(
700 topics_admin,
Gabriel Cubab6049d32023-10-30 13:44:49 -0500701 aiocallback=self.kafka_read_callback,
garciadeblas5697b8b2021-03-24 09:17:02 +0100702 group_id=False,
703 ),
tierno16427352019-04-22 11:37:36 +0000704 )
tiernoc0e42e22018-05-11 11:36:10 +0200705
gcalvinoed7f6d42018-12-14 14:44:56 +0100706 except LcmExceptionExit:
707 self.logger.debug("Bye!")
708 break
tiernoc0e42e22018-05-11 11:36:10 +0200709 except Exception as e:
710 # if not first_start is the first time after starting. So leave more time and wait
711 # to allow kafka starts
gcalvinoed7f6d42018-12-14 14:44:56 +0100712 if self.consecutive_errors == 8 if not self.first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100713 self.logger.error(
714 "Task kafka_read task exit error too many errors. Exception: {}".format(
715 e
716 )
717 )
tiernoc0e42e22018-05-11 11:36:10 +0200718 raise
gcalvinoed7f6d42018-12-14 14:44:56 +0100719 self.consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100720 self.logger.error(
721 "Task kafka_read retrying after Exception {}".format(e)
722 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100723 wait_time = 2 if not self.first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500724 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200725
tiernoc0e42e22018-05-11 11:36:10 +0200726 self.logger.debug("Task kafka_read exit")
727
Gabriel Cubae7898982023-05-11 01:57:21 -0500728 async def kafka_read_ping(self):
729 await asyncio.gather(self.kafka_read(), self.kafka_ping())
730
Mark Beierl1addc932023-05-18 15:11:34 -0400731 async def start(self):
tierno22f4f9c2018-06-11 18:53:39 +0200732 # check RO version
Mark Beierl1addc932023-05-18 15:11:34 -0400733 await self.check_RO_version()
tierno22f4f9c2018-06-11 18:53:39 +0200734
Gabriel Cubae7898982023-05-11 01:57:21 -0500735 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
Luis Vegaa27dc532022-11-11 20:10:49 +0000736 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
garciadeblas5697b8b2021-03-24 09:17:02 +0100737 self.netslice = netslice.NetsliceLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500738 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
garciadeblas5697b8b2021-03-24 09:17:02 +0100739 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500740 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
741 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
742 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100743 self.k8scluster = vim_sdn.K8sClusterLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500744 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100745 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500746 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100747 self.k8srepo = vim_sdn.K8sRepoLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500748 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100749 )
tierno2357f4e2020-10-19 16:38:59 +0000750
Mark Beierl1addc932023-05-18 15:11:34 -0400751 await self.kafka_read_ping()
bravof73bac502021-05-11 07:38:47 -0400752
tiernoc0e42e22018-05-11 11:36:10 +0200753 # TODO
754 # self.logger.debug("Terminating cancelling creation tasks")
tiernoca2e16a2018-06-29 15:25:24 +0200755 # self.lcm_tasks.cancel("ALL", "create")
tiernoc0e42e22018-05-11 11:36:10 +0200756 # timeout = 200
757 # while self.is_pending_tasks():
758 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
Gabriel Cubae7898982023-05-11 01:57:21 -0500759 # await asyncio.sleep(2)
tiernoc0e42e22018-05-11 11:36:10 +0200760 # timeout -= 2
761 # if not timeout:
tiernoca2e16a2018-06-29 15:25:24 +0200762 # self.lcm_tasks.cancel("ALL", "ALL")
tiernoc0e42e22018-05-11 11:36:10 +0200763 if self.db:
764 self.db.db_disconnect()
765 if self.msg:
766 self.msg.disconnect()
tierno16427352019-04-22 11:37:36 +0000767 if self.msg_admin:
768 self.msg_admin.disconnect()
tiernoc0e42e22018-05-11 11:36:10 +0200769 if self.fs:
770 self.fs.fs_disconnect()
771
tiernoc0e42e22018-05-11 11:36:10 +0200772 def read_config_file(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +0200773 try:
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500774 with open(config_file) as f:
775 return yaml.safe_load(f)
tiernoc0e42e22018-05-11 11:36:10 +0200776 except Exception as e:
777 self.logger.critical("At config file '{}': {}".format(config_file, e))
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500778 exit(1)
tiernoc0e42e22018-05-11 11:36:10 +0200779
tierno16427352019-04-22 11:37:36 +0000780 @staticmethod
781 def get_process_id():
782 """
783 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
784 will provide a random one
785 :return: Obtained ID
786 """
Gabriel Cuba4c0e6802023-10-09 13:22:38 -0500787
788 def get_docker_id():
789 try:
790 with open("/proc/self/cgroup", "r") as f:
791 text_id_ = f.readline()
792 _, _, text_id = text_id_.rpartition("/")
793 return text_id.replace("\n", "")[:12]
794 except Exception:
795 return None
796
797 def generate_random_id():
798 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
799
800 # Try getting docker id. If it fails, generate a random id
801 docker_id = get_docker_id()
802 return docker_id if docker_id else generate_random_id()
tierno16427352019-04-22 11:37:36 +0000803
tiernoc0e42e22018-05-11 11:36:10 +0200804
tierno275411e2018-05-16 14:33:32 +0200805def usage():
garciadeblas5697b8b2021-03-24 09:17:02 +0100806 print(
807 """Usage: {} [options]
quilesj7e13aeb2019-10-08 13:34:55 +0200808 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
tiernoa9843d82018-10-24 10:44:20 +0200809 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
tierno275411e2018-05-16 14:33:32 +0200810 -h|--help: shows this help
garciadeblas5697b8b2021-03-24 09:17:02 +0100811 """.format(
812 sys.argv[0]
813 )
814 )
tierno750b2452018-05-17 16:39:29 +0200815 # --log-socket-host HOST: send logs to this host")
816 # --log-socket-port PORT: send logs using this port (default: 9022)")
tierno275411e2018-05-16 14:33:32 +0200817
818
garciadeblas5697b8b2021-03-24 09:17:02 +0100819if __name__ == "__main__":
tierno275411e2018-05-16 14:33:32 +0200820 try:
tierno8c16b052020-02-05 15:08:32 +0000821 # print("SYS.PATH='{}'".format(sys.path))
tierno275411e2018-05-16 14:33:32 +0200822 # load parameters and configuration
quilesj7e13aeb2019-10-08 13:34:55 +0200823 # -h
824 # -c value
825 # --config value
826 # --help
827 # --health-check
garciadeblas5697b8b2021-03-24 09:17:02 +0100828 opts, args = getopt.getopt(
829 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
830 )
tierno275411e2018-05-16 14:33:32 +0200831 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
832 config_file = None
833 for o, a in opts:
834 if o in ("-h", "--help"):
835 usage()
836 sys.exit()
837 elif o in ("-c", "--config"):
838 config_file = a
tiernoa9843d82018-10-24 10:44:20 +0200839 elif o == "--health-check":
tierno94f06112020-02-11 12:38:19 +0000840 from osm_lcm.lcm_hc import health_check
garciadeblas5697b8b2021-03-24 09:17:02 +0100841
aticig56b86c22022-06-29 10:43:05 +0300842 health_check(config_file, Lcm.ping_interval_pace)
tierno275411e2018-05-16 14:33:32 +0200843 else:
Gabriel Cuba4c0e6802023-10-09 13:22:38 -0500844 print(f"Unhandled option: {o}")
845 exit(1)
quilesj7e13aeb2019-10-08 13:34:55 +0200846
tierno275411e2018-05-16 14:33:32 +0200847 if config_file:
848 if not path.isfile(config_file):
garciadeblas5697b8b2021-03-24 09:17:02 +0100849 print(
850 "configuration file '{}' does not exist".format(config_file),
851 file=sys.stderr,
852 )
tierno275411e2018-05-16 14:33:32 +0200853 exit(1)
854 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100855 for config_file in (
856 __file__[: __file__.rfind(".")] + ".cfg",
857 "./lcm.cfg",
858 "/etc/osm/lcm.cfg",
859 ):
tierno275411e2018-05-16 14:33:32 +0200860 if path.isfile(config_file):
861 break
862 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100863 print(
864 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
865 file=sys.stderr,
866 )
tierno275411e2018-05-16 14:33:32 +0200867 exit(1)
k4.rahul44306072023-05-05 14:24:31 +0530868 config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file)))
tierno275411e2018-05-16 14:33:32 +0200869 lcm = Lcm(config_file)
Mark Beierl1addc932023-05-18 15:11:34 -0400870 asyncio.run(lcm.start())
tierno22f4f9c2018-06-11 18:53:39 +0200871 except (LcmException, getopt.GetoptError) as e:
tierno275411e2018-05-16 14:33:32 +0200872 print(str(e), file=sys.stderr)
873 # usage()
874 exit(1)