blob: 9b62d82eef07a337eebb1445c675f0de256919ff [file] [log] [blame]
tiernoc0e42e22018-05-11 11:36:10 +02001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
tierno2e215512018-11-28 09:37:52 +00004##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
quilesj7e13aeb2019-10-08 13:34:55 +020020
21# DEBUG WITH PDB
quilesj7e13aeb2019-10-08 13:34:55 +020022import pdb
23
tiernoc0e42e22018-05-11 11:36:10 +020024import asyncio
25import yaml
tierno275411e2018-05-16 14:33:32 +020026import logging
27import logging.handlers
28import getopt
tierno275411e2018-05-16 14:33:32 +020029import sys
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050030from random import SystemRandom
tierno59d22d22018-09-25 18:10:19 +020031
bravof73bac502021-05-11 07:38:47 -040032from osm_lcm import ns, vim_sdn, netslice
tierno69f0d382020-05-07 13:08:09 +000033from osm_lcm.ng_ro import NgRoException, NgRoClient
34from osm_lcm.ROclient import ROClient, ROClientException
quilesj7e13aeb2019-10-08 13:34:55 +020035
tierno94f06112020-02-11 12:38:19 +000036from time import time
tierno8069ce52019-08-28 15:34:33 +000037from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
tiernoa4dea5a2020-01-05 16:29:30 +000038from osm_lcm import version as lcm_version, version_date as lcm_version_date
tierno8069ce52019-08-28 15:34:33 +000039
bravof922c4172020-11-24 21:21:43 -030040from osm_common import msglocal, msgkafka
tierno98768132018-09-11 12:07:21 +020041from osm_common import version as common_version
tierno59d22d22018-09-25 18:10:19 +020042from osm_common.dbbase import DbException
tiernoc0e42e22018-05-11 11:36:10 +020043from osm_common.fsbase import FsException
44from osm_common.msgbase import MsgException
bravof922c4172020-11-24 21:21:43 -030045from osm_lcm.data_utils.database.database import Database
46from osm_lcm.data_utils.filesystem.filesystem import Filesystem
Luis Vegaa27dc532022-11-11 20:10:49 +000047from osm_lcm.data_utils.lcm_config import LcmCfg
aticig56b86c22022-06-29 10:43:05 +030048from osm_lcm.lcm_hc import get_health_check_file
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050049from os import path, getenv
tierno59d22d22018-09-25 18:10:19 +020050from n2vc import version as n2vc_version
bravof922c4172020-11-24 21:21:43 -030051import traceback
tiernoc0e42e22018-05-11 11:36:10 +020052
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050053if getenv("OSMLCM_PDB_DEBUG", None) is not None:
quilesj7e13aeb2019-10-08 13:34:55 +020054 pdb.set_trace()
55
tiernoc0e42e22018-05-11 11:36:10 +020056
tierno275411e2018-05-16 14:33:32 +020057__author__ = "Alfonso Tierno"
tiernoe64f7fb2019-09-11 08:55:52 +000058min_RO_version = "6.0.2"
tierno6e9d2eb2018-09-12 17:47:18 +020059min_n2vc_version = "0.0.2"
quilesj7e13aeb2019-10-08 13:34:55 +020060
tierno16427352019-04-22 11:37:36 +000061min_common_version = "0.1.19"
tierno275411e2018-05-16 14:33:32 +020062
63
tiernoc0e42e22018-05-11 11:36:10 +020064class Lcm:
garciadeblas5697b8b2021-03-24 09:17:02 +010065 ping_interval_pace = (
66 120 # how many time ping is send once is confirmed all is running
67 )
68 ping_interval_boot = 5 # how many time ping is sent when booting
Luis Vegaa27dc532022-11-11 20:10:49 +000069
70 main_config = LcmCfg()
tiernoa9843d82018-10-24 10:44:20 +020071
Gabriel Cubae7898982023-05-11 01:57:21 -050072 def __init__(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +020073 """
74 Init, Connect to database, filesystem storage, and messaging
75 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
76 :return: None
77 """
tiernoc0e42e22018-05-11 11:36:10 +020078 self.db = None
79 self.msg = None
tierno16427352019-04-22 11:37:36 +000080 self.msg_admin = None
tiernoc0e42e22018-05-11 11:36:10 +020081 self.fs = None
82 self.pings_not_received = 1
tiernoc2564fe2019-01-28 16:18:56 +000083 self.consecutive_errors = 0
84 self.first_start = False
tiernoc0e42e22018-05-11 11:36:10 +020085
tiernoc0e42e22018-05-11 11:36:10 +020086 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +010087 self.logger = logging.getLogger("lcm")
tierno16427352019-04-22 11:37:36 +000088 # get id
89 self.worker_id = self.get_process_id()
tiernoc0e42e22018-05-11 11:36:10 +020090 # load configuration
91 config = self.read_config_file(config_file)
Luis Vegaa27dc532022-11-11 20:10:49 +000092 self.main_config.set_from_dict(config)
93 self.main_config.transform()
94 self.main_config.load_from_env()
95 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
96 # TODO: check if lcm_hc.py is necessary
97 self.health_check_file = get_health_check_file(self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +010098 self.ns = (
99 self.netslice
100 ) = (
101 self.vim
102 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
tiernoc0e42e22018-05-11 11:36:10 +0200103
104 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +0100105 log_format_simple = (
106 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 )
108 log_formatter_simple = logging.Formatter(
109 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
110 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000111 if self.main_config.globalConfig.logfile:
garciadeblas5697b8b2021-03-24 09:17:02 +0100112 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000113 self.main_config.globalConfig.logfile,
114 maxBytes=100e6,
115 backupCount=9,
116 delay=0,
garciadeblas5697b8b2021-03-24 09:17:02 +0100117 )
tiernoc0e42e22018-05-11 11:36:10 +0200118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000120 if not self.main_config.globalConfig.to_dict()["nologging"]:
tiernoc0e42e22018-05-11 11:36:10 +0200121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
Luis Vegaa27dc532022-11-11 20:10:49 +0000125 if self.main_config.globalConfig.to_dict()["loglevel"]:
126 self.logger.setLevel(self.main_config.globalConfig.loglevel)
tiernoc0e42e22018-05-11 11:36:10 +0200127
128 # logging other modules
Luis Vegaa27dc532022-11-11 20:10:49 +0000129 for logger in ("message", "database", "storage", "tsdb"):
130 logger_config = self.main_config.to_dict()[logger]
131 logger_module = logging.getLogger(logger_config["logger_name"])
132 if logger_config["logfile"]:
garciadeblas5697b8b2021-03-24 09:17:02 +0100133 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000134 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
garciadeblas5697b8b2021-03-24 09:17:02 +0100135 )
tiernoc0e42e22018-05-11 11:36:10 +0200136 file_handler.setFormatter(log_formatter_simple)
137 logger_module.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000138 if logger_config["loglevel"]:
139 logger_module.setLevel(logger_config["loglevel"])
garciadeblas5697b8b2021-03-24 09:17:02 +0100140 self.logger.critical(
141 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
142 )
tierno59d22d22018-09-25 18:10:19 +0200143
tiernoc0e42e22018-05-11 11:36:10 +0200144 # check version of N2VC
145 # TODO enhance with int conversion or from distutils.version import LooseVersion
146 # or with list(map(int, version.split(".")))
tierno59d22d22018-09-25 18:10:19 +0200147 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100148 raise LcmException(
149 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
150 n2vc_version, min_n2vc_version
151 )
152 )
tierno59d22d22018-09-25 18:10:19 +0200153 # check version of common
tierno27246d82018-09-27 15:59:09 +0200154 if versiontuple(common_version) < versiontuple(min_common_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100155 raise LcmException(
156 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
157 common_version, min_common_version
158 )
159 )
tierno22f4f9c2018-06-11 18:53:39 +0200160
tiernoc0e42e22018-05-11 11:36:10 +0200161 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000162 self.db = Database(self.main_config.to_dict()).instance.db
tiernoc0e42e22018-05-11 11:36:10 +0200163
Luis Vegaa27dc532022-11-11 20:10:49 +0000164 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
sousaedu40365e82021-07-26 15:24:21 +0200165 self.fs.sync()
tiernoc0e42e22018-05-11 11:36:10 +0200166
quilesj7e13aeb2019-10-08 13:34:55 +0200167 # copy message configuration in order to remove 'group_id' for msg_admin
Luis Vegaa27dc532022-11-11 20:10:49 +0000168 config_message = self.main_config.message.to_dict()
Gabriel Cubae7898982023-05-11 01:57:21 -0500169 config_message["loop"] = asyncio.get_event_loop()
tiernoc2564fe2019-01-28 16:18:56 +0000170 if config_message["driver"] == "local":
tiernoc0e42e22018-05-11 11:36:10 +0200171 self.msg = msglocal.MsgLocal()
tiernoc2564fe2019-01-28 16:18:56 +0000172 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000173 self.msg_admin = msglocal.MsgLocal()
174 config_message.pop("group_id", None)
175 self.msg_admin.connect(config_message)
tiernoc2564fe2019-01-28 16:18:56 +0000176 elif config_message["driver"] == "kafka":
tiernoc0e42e22018-05-11 11:36:10 +0200177 self.msg = msgkafka.MsgKafka()
tiernoc2564fe2019-01-28 16:18:56 +0000178 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000179 self.msg_admin = msgkafka.MsgKafka()
180 config_message.pop("group_id", None)
181 self.msg_admin.connect(config_message)
tiernoc0e42e22018-05-11 11:36:10 +0200182 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100183 raise LcmException(
184 "Invalid configuration param '{}' at '[message]':'driver'".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000185 self.main_config.message.driver
garciadeblas5697b8b2021-03-24 09:17:02 +0100186 )
187 )
tiernoc0e42e22018-05-11 11:36:10 +0200188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
kuused124bfe2019-06-18 12:09:24 +0200192 # contains created tasks/futures to be able to cancel
bravof922c4172020-11-24 21:21:43 -0300193 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
kuused124bfe2019-06-18 12:09:24 +0200194
tierno22f4f9c2018-06-11 18:53:39 +0200195 async def check_RO_version(self):
tiernoe64f7fb2019-09-11 08:55:52 +0000196 tries = 14
197 last_error = None
198 while True:
Luis Vegaa27dc532022-11-11 20:10:49 +0000199 ro_uri = self.main_config.RO.uri
200 if not ro_uri:
201 ro_uri = ""
tiernoe64f7fb2019-09-11 08:55:52 +0000202 try:
tierno2357f4e2020-10-19 16:38:59 +0000203 # try new RO, if fail old RO
204 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000205 self.main_config.RO.uri = ro_uri + "ro"
Gabriel Cubae7898982023-05-11 01:57:21 -0500206 ro_server = NgRoClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000207 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000208 self.main_config.RO.ng = True
tierno2357f4e2020-10-19 16:38:59 +0000209 except Exception:
Luis Vegaa27dc532022-11-11 20:10:49 +0000210 self.main_config.RO.uri = ro_uri + "openmano"
Gabriel Cubae7898982023-05-11 01:57:21 -0500211 ro_server = ROClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000212 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000213 self.main_config.RO.ng = False
tiernoe64f7fb2019-09-11 08:55:52 +0000214 if versiontuple(ro_version) < versiontuple(min_RO_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100215 raise LcmException(
216 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
217 ro_version, min_RO_version
218 )
219 )
220 self.logger.info(
221 "Connected to RO version {} new-generation version {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000222 ro_version, self.main_config.RO.ng
garciadeblas5697b8b2021-03-24 09:17:02 +0100223 )
224 )
tiernoe64f7fb2019-09-11 08:55:52 +0000225 return
tierno69f0d382020-05-07 13:08:09 +0000226 except (ROClientException, NgRoException) as e:
Luis Vegaa27dc532022-11-11 20:10:49 +0000227 self.main_config.RO.uri = ro_uri
tiernoe64f7fb2019-09-11 08:55:52 +0000228 tries -= 1
bravof922c4172020-11-24 21:21:43 -0300229 traceback.print_tb(e.__traceback__)
garciadeblas5697b8b2021-03-24 09:17:02 +0100230 error_text = "Error while connecting to RO on {}: {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000231 self.main_config.RO.uri, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100232 )
tiernoe64f7fb2019-09-11 08:55:52 +0000233 if tries <= 0:
234 self.logger.critical(error_text)
235 raise LcmException(error_text)
236 if last_error != error_text:
237 last_error = error_text
garciadeblas5697b8b2021-03-24 09:17:02 +0100238 self.logger.error(
239 error_text + ". Waiting until {} seconds".format(5 * tries)
240 )
tiernoe64f7fb2019-09-11 08:55:52 +0000241 await asyncio.sleep(5)
tierno22f4f9c2018-06-11 18:53:39 +0200242
tiernoc0e42e22018-05-11 11:36:10 +0200243 async def test(self, param=None):
244 self.logger.debug("Starting/Ending test task: {}".format(param))
245
tiernoc0e42e22018-05-11 11:36:10 +0200246 async def kafka_ping(self):
247 self.logger.debug("Task kafka_ping Enter")
248 consecutive_errors = 0
249 first_start = True
250 kafka_has_received = False
251 self.pings_not_received = 1
252 while True:
253 try:
tierno16427352019-04-22 11:37:36 +0000254 await self.msg_admin.aiowrite(
garciadeblas5697b8b2021-03-24 09:17:02 +0100255 "admin",
256 "ping",
257 {
258 "from": "lcm",
259 "to": "lcm",
260 "worker_id": self.worker_id,
261 "version": lcm_version,
262 },
garciadeblas5697b8b2021-03-24 09:17:02 +0100263 )
tiernoc0e42e22018-05-11 11:36:10 +0200264 # time between pings are low when it is not received and at starting
garciadeblas5697b8b2021-03-24 09:17:02 +0100265 wait_time = (
266 self.ping_interval_boot
267 if not kafka_has_received
268 else self.ping_interval_pace
269 )
tiernoc0e42e22018-05-11 11:36:10 +0200270 if not self.pings_not_received:
271 kafka_has_received = True
272 self.pings_not_received += 1
Gabriel Cubae7898982023-05-11 01:57:21 -0500273 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200274 if self.pings_not_received > 10:
275 raise LcmException("It is not receiving pings from Kafka bus")
276 consecutive_errors = 0
277 first_start = False
278 except LcmException:
279 raise
280 except Exception as e:
281 # if not first_start is the first time after starting. So leave more time and wait
282 # to allow kafka starts
283 if consecutive_errors == 8 if not first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100284 self.logger.error(
285 "Task kafka_read task exit error too many errors. Exception: {}".format(
286 e
287 )
288 )
tiernoc0e42e22018-05-11 11:36:10 +0200289 raise
290 consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100291 self.logger.error(
292 "Task kafka_read retrying after Exception {}".format(e)
293 )
tierno16427352019-04-22 11:37:36 +0000294 wait_time = 2 if not first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500295 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200296
Gabriel Cubab6049d32023-10-30 13:44:49 -0500297 async def kafka_read_callback(self, topic, command, params):
gcalvinoed7f6d42018-12-14 14:44:56 +0100298 order_id = 1
299
300 if topic != "admin" and command != "ping":
garciadeblas5697b8b2021-03-24 09:17:02 +0100301 self.logger.debug(
302 "Task kafka_read receives {} {}: {}".format(topic, command, params)
303 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100304 self.consecutive_errors = 0
305 self.first_start = False
306 order_id += 1
307 if command == "exit":
308 raise LcmExceptionExit
309 elif command.startswith("#"):
310 return
311 elif command == "echo":
312 # just for test
313 print(params)
314 sys.stdout.flush()
315 return
316 elif command == "test":
Gabriel Cubae7898982023-05-11 01:57:21 -0500317 asyncio.Task(self.test(params))
gcalvinoed7f6d42018-12-14 14:44:56 +0100318 return
319
320 if topic == "admin":
321 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
tierno16427352019-04-22 11:37:36 +0000322 if params.get("worker_id") != self.worker_id:
323 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100324 self.pings_not_received = 0
tierno3e359b12019-02-03 02:29:13 +0100325 try:
aticig56b86c22022-06-29 10:43:05 +0300326 with open(self.health_check_file, "w") as f:
tierno3e359b12019-02-03 02:29:13 +0100327 f.write(str(time()))
328 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100329 self.logger.error(
330 "Cannot write into '{}' for healthcheck: {}".format(
aticig56b86c22022-06-29 10:43:05 +0300331 self.health_check_file, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100332 )
333 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100334 return
Gabriel Cubab6049d32023-10-30 13:44:49 -0500335 elif topic == "nslcmops":
336 if command == "cancel":
337 nslcmop_id = params["_id"]
338 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
339 nsr_id = params["nsInstanceId"]
340 # cancel the tasks and wait
341 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
342 try:
343 await task
344 self.logger.debug(
345 "Cancelled task ended {},{},{}".format(
346 nsr_id, nslcmop_id, task
347 )
348 )
349 except asyncio.CancelledError:
350 self.logger.debug(
351 "Task already cancelled and finished {},{},{}".format(
352 nsr_id, nslcmop_id, task
353 )
354 )
355 # update DB
356 q_filter = {"_id": nslcmop_id}
357 update_dict = {
358 "operationState": "FAILED_TEMP",
359 "isCancelPending": False,
360 }
361 unset_dict = {
362 "cancelMode": None,
363 }
364 self.db.set_one(
365 "nslcmops",
366 q_filter=q_filter,
367 update_dict=update_dict,
368 fail_on_empty=False,
369 unset=unset_dict,
370 )
371 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
372 return
magnussonle9198bb2020-01-21 13:00:51 +0100373 elif topic == "pla":
374 if command == "placement":
375 self.ns.update_nsrs_with_pla_result(params)
376 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100377 elif topic == "k8scluster":
378 if command == "create" or command == "created":
379 k8scluster_id = params.get("_id")
380 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100381 self.lcm_tasks.register(
382 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
383 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100384 return
dariofaccin8bbeeb02023-01-23 18:13:27 +0100385 elif command == "edit" or command == "edited":
386 k8scluster_id = params.get("_id")
387 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
388 self.lcm_tasks.register(
389 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
390 )
391 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100392 elif command == "delete" or command == "deleted":
393 k8scluster_id = params.get("_id")
394 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100395 self.lcm_tasks.register(
396 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
397 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100398 return
David Garciac1fe90a2021-03-31 19:12:02 +0200399 elif topic == "vca":
400 if command == "create" or command == "created":
401 vca_id = params.get("_id")
402 task = asyncio.ensure_future(self.vca.create(params, order_id))
403 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
404 return
Dario Faccin8e53c6d2023-01-10 10:38:41 +0000405 elif command == "edit" or command == "edited":
406 vca_id = params.get("_id")
407 task = asyncio.ensure_future(self.vca.edit(params, order_id))
408 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
409 return
David Garciac1fe90a2021-03-31 19:12:02 +0200410 elif command == "delete" or command == "deleted":
411 vca_id = params.get("_id")
412 task = asyncio.ensure_future(self.vca.delete(params, order_id))
413 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
414 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100415 elif topic == "k8srepo":
416 if command == "create" or command == "created":
417 k8srepo_id = params.get("_id")
418 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
419 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100420 self.lcm_tasks.register(
421 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
422 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100423 return
424 elif command == "delete" or command == "deleted":
425 k8srepo_id = params.get("_id")
426 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100427 self.lcm_tasks.register(
428 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
429 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100430 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100431 elif topic == "ns":
tierno307425f2020-01-26 23:35:59 +0000432 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100433 # self.logger.debug("Deploying NS {}".format(nsr_id))
434 nslcmop = params
435 nslcmop_id = nslcmop["_id"]
436 nsr_id = nslcmop["nsInstanceId"]
437 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100438 self.lcm_tasks.register(
439 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
440 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100441 return
tierno307425f2020-01-26 23:35:59 +0000442 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100443 # self.logger.debug("Deleting NS {}".format(nsr_id))
444 nslcmop = params
445 nslcmop_id = nslcmop["_id"]
446 nsr_id = nslcmop["nsInstanceId"]
447 self.lcm_tasks.cancel(topic, nsr_id)
448 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
449 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
450 return
ksaikiranr3fde2c72021-03-15 10:39:06 +0530451 elif command == "vca_status_refresh":
452 nslcmop = params
453 nslcmop_id = nslcmop["_id"]
454 nsr_id = nslcmop["nsInstanceId"]
garciadeblas5697b8b2021-03-24 09:17:02 +0100455 task = asyncio.ensure_future(
456 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
457 )
458 self.lcm_tasks.register(
459 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
460 )
ksaikiranr3fde2c72021-03-15 10:39:06 +0530461 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100462 elif command == "action":
463 # self.logger.debug("Update NS {}".format(nsr_id))
464 nslcmop = params
465 nslcmop_id = nslcmop["_id"]
466 nsr_id = nslcmop["nsInstanceId"]
467 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
468 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
469 return
aticigdffa6212022-04-12 15:27:53 +0300470 elif command == "update":
471 # self.logger.debug("Update NS {}".format(nsr_id))
472 nslcmop = params
473 nslcmop_id = nslcmop["_id"]
474 nsr_id = nslcmop["nsInstanceId"]
475 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
476 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
477 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100478 elif command == "scale":
479 # self.logger.debug("Update NS {}".format(nsr_id))
480 nslcmop = params
481 nslcmop_id = nslcmop["_id"]
482 nsr_id = nslcmop["nsInstanceId"]
483 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
484 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
485 return
garciadeblas07f4e4c2022-06-09 09:42:58 +0200486 elif command == "heal":
487 # self.logger.debug("Healing NS {}".format(nsr_id))
488 nslcmop = params
489 nslcmop_id = nslcmop["_id"]
490 nsr_id = nslcmop["nsInstanceId"]
491 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000492 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
garciadeblas07f4e4c2022-06-09 09:42:58 +0200493 return
elumalai80bcf1c2022-04-28 18:05:01 +0530494 elif command == "migrate":
495 nslcmop = params
496 nslcmop_id = nslcmop["_id"]
497 nsr_id = nslcmop["nsInstanceId"]
498 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
499 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
500 return
govindarajul4ff4b512022-05-02 20:02:41 +0530501 elif command == "verticalscale":
502 nslcmop = params
503 nslcmop_id = nslcmop["_id"]
504 nsr_id = nslcmop["nsInstanceId"]
505 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000506 self.logger.debug(
507 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
508 )
509 self.lcm_tasks.register(
510 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
511 )
512 self.logger.debug(
513 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
514 )
govindarajul4ff4b512022-05-02 20:02:41 +0530515 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100516 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000517 nsr_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100518 try:
519 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100520 print(
521 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
522 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
523 "".format(
524 nsr_id,
525 db_nsr["operational-status"],
526 db_nsr["config-status"],
527 db_nsr["detailed-status"],
528 db_nsr["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500529 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100530 )
531 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100532 except Exception as e:
533 print("nsr {} not found: {}".format(nsr_id, e))
534 sys.stdout.flush()
535 return
536 elif command == "deleted":
537 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100538 elif command in (
elumalaica7ece02022-04-12 12:47:32 +0530539 "vnf_terminated",
elumalaib9e357c2022-04-27 09:58:38 +0530540 "policy_updated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100541 "terminated",
542 "instantiated",
543 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200544 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100545 "actioned",
aticigdffa6212022-04-12 15:27:53 +0300546 "updated",
elumalai80bcf1c2022-04-28 18:05:01 +0530547 "migrated",
govindarajul4ff4b512022-05-02 20:02:41 +0530548 "verticalscaled",
garciadeblas5697b8b2021-03-24 09:17:02 +0100549 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100550 return
elumalaica7ece02022-04-12 12:47:32 +0530551
gcalvinoed7f6d42018-12-14 14:44:56 +0100552 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
tierno307425f2020-01-26 23:35:59 +0000553 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100554 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
555 nsilcmop = params
556 nsilcmop_id = nsilcmop["_id"] # slice operation id
557 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
garciadeblas5697b8b2021-03-24 09:17:02 +0100558 task = asyncio.ensure_future(
559 self.netslice.instantiate(nsir_id, nsilcmop_id)
560 )
561 self.lcm_tasks.register(
562 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
563 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100564 return
tierno307425f2020-01-26 23:35:59 +0000565 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100566 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
567 nsilcmop = params
568 nsilcmop_id = nsilcmop["_id"] # slice operation id
569 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
570 self.lcm_tasks.cancel(topic, nsir_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100571 task = asyncio.ensure_future(
572 self.netslice.terminate(nsir_id, nsilcmop_id)
573 )
574 self.lcm_tasks.register(
575 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
576 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100577 return
578 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000579 nsir_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100580 try:
581 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100582 print(
583 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
584 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
585 "".format(
586 nsir_id,
587 db_nsir["operational-status"],
588 db_nsir["config-status"],
589 db_nsir["detailed-status"],
590 db_nsir["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500591 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100592 )
593 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100594 except Exception as e:
595 print("nsir {} not found: {}".format(nsir_id, e))
596 sys.stdout.flush()
597 return
598 elif command == "deleted":
599 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100600 elif command in (
601 "terminated",
602 "instantiated",
603 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200604 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100605 "actioned",
606 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100607 return
608 elif topic == "vim_account":
609 vim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000610 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000611 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000612 task = asyncio.ensure_future(self.vim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100613 self.lcm_tasks.register(
614 "vim_account", vim_id, order_id, "vim_create", task
615 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100616 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100617 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100618 self.lcm_tasks.cancel(topic, vim_id)
kuuse6a470c62019-07-10 13:52:45 +0200619 task = asyncio.ensure_future(self.vim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100620 self.lcm_tasks.register(
621 "vim_account", vim_id, order_id, "vim_delete", task
622 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100623 return
624 elif command == "show":
625 print("not implemented show with vim_account")
626 sys.stdout.flush()
627 return
tiernof210c1c2019-10-16 09:09:58 +0000628 elif command in ("edit", "edited"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000629 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000630 task = asyncio.ensure_future(self.vim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100631 self.lcm_tasks.register(
632 "vim_account", vim_id, order_id, "vim_edit", task
633 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100634 return
tiernof210c1c2019-10-16 09:09:58 +0000635 elif command == "deleted":
636 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100637 elif topic == "wim_account":
638 wim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000639 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000640 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000641 task = asyncio.ensure_future(self.wim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100642 self.lcm_tasks.register(
643 "wim_account", wim_id, order_id, "wim_create", task
644 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100645 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100646 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100647 self.lcm_tasks.cancel(topic, wim_id)
kuuse6a470c62019-07-10 13:52:45 +0200648 task = asyncio.ensure_future(self.wim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100649 self.lcm_tasks.register(
650 "wim_account", wim_id, order_id, "wim_delete", task
651 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100652 return
653 elif command == "show":
654 print("not implemented show with wim_account")
655 sys.stdout.flush()
656 return
tiernof210c1c2019-10-16 09:09:58 +0000657 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100658 task = asyncio.ensure_future(self.wim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100659 self.lcm_tasks.register(
660 "wim_account", wim_id, order_id, "wim_edit", task
661 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100662 return
tiernof210c1c2019-10-16 09:09:58 +0000663 elif command == "deleted":
664 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100665 elif topic == "sdn":
666 _sdn_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000667 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000668 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000669 task = asyncio.ensure_future(self.sdn.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100670 self.lcm_tasks.register(
671 "sdn", _sdn_id, order_id, "sdn_create", task
672 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100673 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100674 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100675 self.lcm_tasks.cancel(topic, _sdn_id)
kuuse6a470c62019-07-10 13:52:45 +0200676 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
gcalvinoed7f6d42018-12-14 14:44:56 +0100677 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
678 return
tiernof210c1c2019-10-16 09:09:58 +0000679 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100680 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
681 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
682 return
tiernof210c1c2019-10-16 09:09:58 +0000683 elif command == "deleted":
684 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100685 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
686
tiernoc0e42e22018-05-11 11:36:10 +0200687 async def kafka_read(self):
garciadeblas5697b8b2021-03-24 09:17:02 +0100688 self.logger.debug(
689 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
690 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100691 self.consecutive_errors = 0
692 self.first_start = True
693 while self.consecutive_errors < 10:
tiernoc0e42e22018-05-11 11:36:10 +0200694 try:
garciadeblas5697b8b2021-03-24 09:17:02 +0100695 topics = (
696 "ns",
697 "vim_account",
698 "wim_account",
699 "sdn",
700 "nsi",
701 "k8scluster",
702 "vca",
703 "k8srepo",
704 "pla",
Gabriel Cubab6049d32023-10-30 13:44:49 -0500705 "nslcmops",
garciadeblas5697b8b2021-03-24 09:17:02 +0100706 )
707 topics_admin = ("admin",)
tierno16427352019-04-22 11:37:36 +0000708 await asyncio.gather(
garciadeblas5697b8b2021-03-24 09:17:02 +0100709 self.msg.aioread(
Gabriel Cubab6049d32023-10-30 13:44:49 -0500710 topics,
711 aiocallback=self.kafka_read_callback,
712 from_beginning=True,
garciadeblas5697b8b2021-03-24 09:17:02 +0100713 ),
714 self.msg_admin.aioread(
715 topics_admin,
Gabriel Cubab6049d32023-10-30 13:44:49 -0500716 aiocallback=self.kafka_read_callback,
garciadeblas5697b8b2021-03-24 09:17:02 +0100717 group_id=False,
718 ),
tierno16427352019-04-22 11:37:36 +0000719 )
tiernoc0e42e22018-05-11 11:36:10 +0200720
gcalvinoed7f6d42018-12-14 14:44:56 +0100721 except LcmExceptionExit:
722 self.logger.debug("Bye!")
723 break
tiernoc0e42e22018-05-11 11:36:10 +0200724 except Exception as e:
725 # if not first_start is the first time after starting. So leave more time and wait
726 # to allow kafka starts
gcalvinoed7f6d42018-12-14 14:44:56 +0100727 if self.consecutive_errors == 8 if not self.first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100728 self.logger.error(
729 "Task kafka_read task exit error too many errors. Exception: {}".format(
730 e
731 )
732 )
tiernoc0e42e22018-05-11 11:36:10 +0200733 raise
gcalvinoed7f6d42018-12-14 14:44:56 +0100734 self.consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100735 self.logger.error(
736 "Task kafka_read retrying after Exception {}".format(e)
737 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100738 wait_time = 2 if not self.first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500739 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200740
tiernoc0e42e22018-05-11 11:36:10 +0200741 self.logger.debug("Task kafka_read exit")
742
Gabriel Cubae7898982023-05-11 01:57:21 -0500743 async def kafka_read_ping(self):
744 await asyncio.gather(self.kafka_read(), self.kafka_ping())
745
Mark Beierl1addc932023-05-18 15:11:34 -0400746 async def start(self):
tierno22f4f9c2018-06-11 18:53:39 +0200747 # check RO version
Mark Beierl1addc932023-05-18 15:11:34 -0400748 await self.check_RO_version()
tierno22f4f9c2018-06-11 18:53:39 +0200749
Gabriel Cubae7898982023-05-11 01:57:21 -0500750 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
Luis Vegaa27dc532022-11-11 20:10:49 +0000751 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
garciadeblas5697b8b2021-03-24 09:17:02 +0100752 self.netslice = netslice.NetsliceLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500753 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
garciadeblas5697b8b2021-03-24 09:17:02 +0100754 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500755 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
756 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
757 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100758 self.k8scluster = vim_sdn.K8sClusterLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500759 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100760 )
Gabriel Cubae7898982023-05-11 01:57:21 -0500761 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100762 self.k8srepo = vim_sdn.K8sRepoLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -0500763 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +0100764 )
tierno2357f4e2020-10-19 16:38:59 +0000765
Mark Beierl1addc932023-05-18 15:11:34 -0400766 await self.kafka_read_ping()
bravof73bac502021-05-11 07:38:47 -0400767
tiernoc0e42e22018-05-11 11:36:10 +0200768 # TODO
769 # self.logger.debug("Terminating cancelling creation tasks")
tiernoca2e16a2018-06-29 15:25:24 +0200770 # self.lcm_tasks.cancel("ALL", "create")
tiernoc0e42e22018-05-11 11:36:10 +0200771 # timeout = 200
772 # while self.is_pending_tasks():
773 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
Gabriel Cubae7898982023-05-11 01:57:21 -0500774 # await asyncio.sleep(2)
tiernoc0e42e22018-05-11 11:36:10 +0200775 # timeout -= 2
776 # if not timeout:
tiernoca2e16a2018-06-29 15:25:24 +0200777 # self.lcm_tasks.cancel("ALL", "ALL")
tiernoc0e42e22018-05-11 11:36:10 +0200778 if self.db:
779 self.db.db_disconnect()
780 if self.msg:
781 self.msg.disconnect()
tierno16427352019-04-22 11:37:36 +0000782 if self.msg_admin:
783 self.msg_admin.disconnect()
tiernoc0e42e22018-05-11 11:36:10 +0200784 if self.fs:
785 self.fs.fs_disconnect()
786
tiernoc0e42e22018-05-11 11:36:10 +0200787 def read_config_file(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +0200788 try:
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500789 with open(config_file) as f:
790 return yaml.safe_load(f)
tiernoc0e42e22018-05-11 11:36:10 +0200791 except Exception as e:
792 self.logger.critical("At config file '{}': {}".format(config_file, e))
Gabriel Cubaa89a5a72022-11-26 18:55:15 -0500793 exit(1)
tiernoc0e42e22018-05-11 11:36:10 +0200794
tierno16427352019-04-22 11:37:36 +0000795 @staticmethod
796 def get_process_id():
797 """
798 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
799 will provide a random one
800 :return: Obtained ID
801 """
Gabriel Cuba4c0e6802023-10-09 13:22:38 -0500802
803 def get_docker_id():
804 try:
805 with open("/proc/self/cgroup", "r") as f:
806 text_id_ = f.readline()
807 _, _, text_id = text_id_.rpartition("/")
808 return text_id.replace("\n", "")[:12]
809 except Exception:
810 return None
811
812 def generate_random_id():
813 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
814
815 # Try getting docker id. If it fails, generate a random id
816 docker_id = get_docker_id()
817 return docker_id if docker_id else generate_random_id()
tierno16427352019-04-22 11:37:36 +0000818
tiernoc0e42e22018-05-11 11:36:10 +0200819
tierno275411e2018-05-16 14:33:32 +0200820def usage():
garciadeblas5697b8b2021-03-24 09:17:02 +0100821 print(
822 """Usage: {} [options]
quilesj7e13aeb2019-10-08 13:34:55 +0200823 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
tiernoa9843d82018-10-24 10:44:20 +0200824 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
tierno275411e2018-05-16 14:33:32 +0200825 -h|--help: shows this help
garciadeblas5697b8b2021-03-24 09:17:02 +0100826 """.format(
827 sys.argv[0]
828 )
829 )
tierno750b2452018-05-17 16:39:29 +0200830 # --log-socket-host HOST: send logs to this host")
831 # --log-socket-port PORT: send logs using this port (default: 9022)")
tierno275411e2018-05-16 14:33:32 +0200832
833
garciadeblas5697b8b2021-03-24 09:17:02 +0100834if __name__ == "__main__":
tierno275411e2018-05-16 14:33:32 +0200835 try:
tierno8c16b052020-02-05 15:08:32 +0000836 # print("SYS.PATH='{}'".format(sys.path))
tierno275411e2018-05-16 14:33:32 +0200837 # load parameters and configuration
quilesj7e13aeb2019-10-08 13:34:55 +0200838 # -h
839 # -c value
840 # --config value
841 # --help
842 # --health-check
garciadeblas5697b8b2021-03-24 09:17:02 +0100843 opts, args = getopt.getopt(
844 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
845 )
tierno275411e2018-05-16 14:33:32 +0200846 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
847 config_file = None
848 for o, a in opts:
849 if o in ("-h", "--help"):
850 usage()
851 sys.exit()
852 elif o in ("-c", "--config"):
853 config_file = a
tiernoa9843d82018-10-24 10:44:20 +0200854 elif o == "--health-check":
tierno94f06112020-02-11 12:38:19 +0000855 from osm_lcm.lcm_hc import health_check
garciadeblas5697b8b2021-03-24 09:17:02 +0100856
aticig56b86c22022-06-29 10:43:05 +0300857 health_check(config_file, Lcm.ping_interval_pace)
tierno275411e2018-05-16 14:33:32 +0200858 else:
Gabriel Cuba4c0e6802023-10-09 13:22:38 -0500859 print(f"Unhandled option: {o}")
860 exit(1)
quilesj7e13aeb2019-10-08 13:34:55 +0200861
tierno275411e2018-05-16 14:33:32 +0200862 if config_file:
863 if not path.isfile(config_file):
garciadeblas5697b8b2021-03-24 09:17:02 +0100864 print(
865 "configuration file '{}' does not exist".format(config_file),
866 file=sys.stderr,
867 )
tierno275411e2018-05-16 14:33:32 +0200868 exit(1)
869 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100870 for config_file in (
871 __file__[: __file__.rfind(".")] + ".cfg",
872 "./lcm.cfg",
873 "/etc/osm/lcm.cfg",
874 ):
tierno275411e2018-05-16 14:33:32 +0200875 if path.isfile(config_file):
876 break
877 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100878 print(
879 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
880 file=sys.stderr,
881 )
tierno275411e2018-05-16 14:33:32 +0200882 exit(1)
883 lcm = Lcm(config_file)
Mark Beierl1addc932023-05-18 15:11:34 -0400884 asyncio.run(lcm.start())
tierno22f4f9c2018-06-11 18:53:39 +0200885 except (LcmException, getopt.GetoptError) as e:
tierno275411e2018-05-16 14:33:32 +0200886 print(str(e), file=sys.stderr)
887 # usage()
888 exit(1)