blob: 45fd9ad86e69308f63f39f45a9a74bbc0899a68c [file] [log] [blame]
tiernoc0e42e22018-05-11 11:36:10 +02001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
tierno2e215512018-11-28 09:37:52 +00004##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
quilesj7e13aeb2019-10-08 13:34:55 +020020
21# DEBUG WITH PDB
quilesj7e13aeb2019-10-08 13:34:55 +020022import pdb
23
k4.rahul44306072023-05-05 14:24:31 +053024import os
tiernoc0e42e22018-05-11 11:36:10 +020025import asyncio
26import yaml
tierno275411e2018-05-16 14:33:32 +020027import logging
28import logging.handlers
29import getopt
tierno275411e2018-05-16 14:33:32 +020030import sys
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050031from random import SystemRandom
tierno59d22d22018-09-25 18:10:19 +020032
rshri932105f2024-07-05 15:11:55 +000033from osm_lcm import ns, vim_sdn, netslice, k8s
tierno69f0d382020-05-07 13:08:09 +000034from osm_lcm.ng_ro import NgRoException, NgRoClient
35from osm_lcm.ROclient import ROClient, ROClientException
quilesj7e13aeb2019-10-08 13:34:55 +020036
tierno94f06112020-02-11 12:38:19 +000037from time import time
tierno8069ce52019-08-28 15:34:33 +000038from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
tiernoa4dea5a2020-01-05 16:29:30 +000039from osm_lcm import version as lcm_version, version_date as lcm_version_date
tierno8069ce52019-08-28 15:34:33 +000040
bravof922c4172020-11-24 21:21:43 -030041from osm_common import msglocal, msgkafka
garciadeblas02a33122025-12-10 18:54:20 +010042from osm_common._version import version as common_version
tierno59d22d22018-09-25 18:10:19 +020043from osm_common.dbbase import DbException
tiernoc0e42e22018-05-11 11:36:10 +020044from osm_common.fsbase import FsException
45from osm_common.msgbase import MsgException
bravof922c4172020-11-24 21:21:43 -030046from osm_lcm.data_utils.database.database import Database
47from osm_lcm.data_utils.filesystem.filesystem import Filesystem
Luis Vegaa27dc532022-11-11 20:10:49 +000048from osm_lcm.data_utils.lcm_config import LcmCfg
garciadeblas96b94f52024-07-08 16:18:21 +020049from osm_lcm.data_utils.list_utils import find_in_list
aticig56b86c22022-06-29 10:43:05 +030050from osm_lcm.lcm_hc import get_health_check_file
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050051from os import path, getenv
almagiacdd20ae2024-12-13 09:45:45 +010052from osm_lcm.n2vc import version as n2vc_version
bravof922c4172020-11-24 21:21:43 -030053import traceback
tiernoc0e42e22018-05-11 11:36:10 +020054
Gabriel Cuba4c0e6802023-10-09 13:22:38 -050055if getenv("OSMLCM_PDB_DEBUG", None) is not None:
quilesj7e13aeb2019-10-08 13:34:55 +020056 pdb.set_trace()
57
tiernoc0e42e22018-05-11 11:36:10 +020058
tierno275411e2018-05-16 14:33:32 +020059__author__ = "Alfonso Tierno"
tiernoe64f7fb2019-09-11 08:55:52 +000060min_RO_version = "6.0.2"
tierno6e9d2eb2018-09-12 17:47:18 +020061min_n2vc_version = "0.0.2"
quilesj7e13aeb2019-10-08 13:34:55 +020062
tierno16427352019-04-22 11:37:36 +000063min_common_version = "0.1.19"
tierno275411e2018-05-16 14:33:32 +020064
65
tiernoc0e42e22018-05-11 11:36:10 +020066class Lcm:
garciadeblas96b94f52024-07-08 16:18:21 +020067 profile_collection_mapping = {
68 "infra_controller_profiles": "k8sinfra_controller",
69 "infra_config_profiles": "k8sinfra_config",
70 "resource_profiles": "k8sresource",
71 "app_profiles": "k8sapp",
72 }
73
garciadeblas5697b8b2021-03-24 09:17:02 +010074 ping_interval_pace = (
75 120 # how many time ping is send once is confirmed all is running
76 )
77 ping_interval_boot = 5 # how many time ping is sent when booting
Luis Vegaa27dc532022-11-11 20:10:49 +000078
79 main_config = LcmCfg()
tiernoa9843d82018-10-24 10:44:20 +020080
Gabriel Cubae7898982023-05-11 01:57:21 -050081 def __init__(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +020082 """
83 Init, Connect to database, filesystem storage, and messaging
84 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
85 :return: None
86 """
tiernoc0e42e22018-05-11 11:36:10 +020087 self.db = None
88 self.msg = None
tierno16427352019-04-22 11:37:36 +000089 self.msg_admin = None
tiernoc0e42e22018-05-11 11:36:10 +020090 self.fs = None
91 self.pings_not_received = 1
tiernoc2564fe2019-01-28 16:18:56 +000092 self.consecutive_errors = 0
93 self.first_start = False
tiernoc0e42e22018-05-11 11:36:10 +020094
tiernoc0e42e22018-05-11 11:36:10 +020095 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +010096 self.logger = logging.getLogger("lcm")
tierno16427352019-04-22 11:37:36 +000097 # get id
98 self.worker_id = self.get_process_id()
tiernoc0e42e22018-05-11 11:36:10 +020099 # load configuration
100 config = self.read_config_file(config_file)
garciadeblase98a9182025-07-08 13:10:21 +0200101 self.logger.debug("Config from file" + str(config))
Luis Vegaa27dc532022-11-11 20:10:49 +0000102 self.main_config.set_from_dict(config)
garciadeblase98a9182025-07-08 13:10:21 +0200103 self.logger.debug("Main config" + str(self.main_config.to_dict()))
Luis Vegaa27dc532022-11-11 20:10:49 +0000104 self.main_config.transform()
garciadeblase98a9182025-07-08 13:10:21 +0200105 self.logger.debug("Main config" + str(self.main_config.to_dict()))
Luis Vegaa27dc532022-11-11 20:10:49 +0000106 self.main_config.load_from_env()
107 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
108 # TODO: check if lcm_hc.py is necessary
109 self.health_check_file = get_health_check_file(self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +0100110 self.ns = (
111 self.netslice
112 ) = (
113 self.vim
rshri932105f2024-07-05 15:11:55 +0000114 ) = (
115 self.wim
116 ) = (
117 self.sdn
118 ) = (
119 self.k8scluster
120 ) = (
121 self.vca
122 ) = (
123 self.k8srepo
124 ) = (
125 self.cluster
126 ) = (
127 self.k8s_app
128 ) = self.k8s_resource = self.k8s_infra_controller = self.k8s_infra_config = None
tiernoc0e42e22018-05-11 11:36:10 +0200129
130 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +0100131 log_format_simple = (
132 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
133 )
134 log_formatter_simple = logging.Formatter(
135 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
136 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000137 if self.main_config.globalConfig.logfile:
garciadeblas5697b8b2021-03-24 09:17:02 +0100138 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000139 self.main_config.globalConfig.logfile,
140 maxBytes=100e6,
141 backupCount=9,
142 delay=0,
garciadeblas5697b8b2021-03-24 09:17:02 +0100143 )
tiernoc0e42e22018-05-11 11:36:10 +0200144 file_handler.setFormatter(log_formatter_simple)
145 self.logger.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000146 if not self.main_config.globalConfig.to_dict()["nologging"]:
tiernoc0e42e22018-05-11 11:36:10 +0200147 str_handler = logging.StreamHandler()
148 str_handler.setFormatter(log_formatter_simple)
149 self.logger.addHandler(str_handler)
150
Luis Vegaa27dc532022-11-11 20:10:49 +0000151 if self.main_config.globalConfig.to_dict()["loglevel"]:
152 self.logger.setLevel(self.main_config.globalConfig.loglevel)
tiernoc0e42e22018-05-11 11:36:10 +0200153
154 # logging other modules
garciadeblas40539872024-09-11 14:28:38 +0200155 for logger in ("message", "database", "storage", "tsdb", "gitops"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000156 logger_config = self.main_config.to_dict()[logger]
157 logger_module = logging.getLogger(logger_config["logger_name"])
158 if logger_config["logfile"]:
garciadeblas5697b8b2021-03-24 09:17:02 +0100159 file_handler = logging.handlers.RotatingFileHandler(
Luis Vegaa27dc532022-11-11 20:10:49 +0000160 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
garciadeblas5697b8b2021-03-24 09:17:02 +0100161 )
tiernoc0e42e22018-05-11 11:36:10 +0200162 file_handler.setFormatter(log_formatter_simple)
163 logger_module.addHandler(file_handler)
Luis Vegaa27dc532022-11-11 20:10:49 +0000164 if logger_config["loglevel"]:
165 logger_module.setLevel(logger_config["loglevel"])
garciadeblas5697b8b2021-03-24 09:17:02 +0100166 self.logger.critical(
167 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
168 )
tierno59d22d22018-09-25 18:10:19 +0200169
tiernoc0e42e22018-05-11 11:36:10 +0200170 # check version of N2VC
171 # TODO enhance with int conversion or from distutils.version import LooseVersion
172 # or with list(map(int, version.split(".")))
tierno59d22d22018-09-25 18:10:19 +0200173 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100174 raise LcmException(
175 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
176 n2vc_version, min_n2vc_version
177 )
178 )
tierno59d22d22018-09-25 18:10:19 +0200179 # check version of common
tierno27246d82018-09-27 15:59:09 +0200180 if versiontuple(common_version) < versiontuple(min_common_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100181 raise LcmException(
182 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
183 common_version, min_common_version
184 )
185 )
tierno22f4f9c2018-06-11 18:53:39 +0200186
tiernoc0e42e22018-05-11 11:36:10 +0200187 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000188 self.db = Database(self.main_config.to_dict()).instance.db
tiernoc0e42e22018-05-11 11:36:10 +0200189
Luis Vegaa27dc532022-11-11 20:10:49 +0000190 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
sousaedu40365e82021-07-26 15:24:21 +0200191 self.fs.sync()
tiernoc0e42e22018-05-11 11:36:10 +0200192
quilesj7e13aeb2019-10-08 13:34:55 +0200193 # copy message configuration in order to remove 'group_id' for msg_admin
Luis Vegaa27dc532022-11-11 20:10:49 +0000194 config_message = self.main_config.message.to_dict()
Gabriel Cubae7898982023-05-11 01:57:21 -0500195 config_message["loop"] = asyncio.get_event_loop()
tiernoc2564fe2019-01-28 16:18:56 +0000196 if config_message["driver"] == "local":
tiernoc0e42e22018-05-11 11:36:10 +0200197 self.msg = msglocal.MsgLocal()
tiernoc2564fe2019-01-28 16:18:56 +0000198 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000199 self.msg_admin = msglocal.MsgLocal()
200 config_message.pop("group_id", None)
201 self.msg_admin.connect(config_message)
tiernoc2564fe2019-01-28 16:18:56 +0000202 elif config_message["driver"] == "kafka":
tiernoc0e42e22018-05-11 11:36:10 +0200203 self.msg = msgkafka.MsgKafka()
tiernoc2564fe2019-01-28 16:18:56 +0000204 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000205 self.msg_admin = msgkafka.MsgKafka()
206 config_message.pop("group_id", None)
207 self.msg_admin.connect(config_message)
tiernoc0e42e22018-05-11 11:36:10 +0200208 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100209 raise LcmException(
210 "Invalid configuration param '{}' at '[message]':'driver'".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000211 self.main_config.message.driver
garciadeblas5697b8b2021-03-24 09:17:02 +0100212 )
213 )
tiernoc0e42e22018-05-11 11:36:10 +0200214 except (DbException, FsException, MsgException) as e:
215 self.logger.critical(str(e), exc_info=True)
216 raise LcmException(str(e))
217
kuused124bfe2019-06-18 12:09:24 +0200218 # contains created tasks/futures to be able to cancel
bravof922c4172020-11-24 21:21:43 -0300219 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
kuused124bfe2019-06-18 12:09:24 +0200220
rshri932105f2024-07-05 15:11:55 +0000221 self.logger.info(
222 "Worker_id: {} main_config: {} lcm tasks: {}".format(
223 self.worker_id, self.main_config, self.lcm_tasks
224 )
225 )
226
tierno22f4f9c2018-06-11 18:53:39 +0200227 async def check_RO_version(self):
tiernoe64f7fb2019-09-11 08:55:52 +0000228 tries = 14
229 last_error = None
230 while True:
Luis Vegaa27dc532022-11-11 20:10:49 +0000231 ro_uri = self.main_config.RO.uri
232 if not ro_uri:
233 ro_uri = ""
tiernoe64f7fb2019-09-11 08:55:52 +0000234 try:
tierno2357f4e2020-10-19 16:38:59 +0000235 # try new RO, if fail old RO
236 try:
Luis Vegaa27dc532022-11-11 20:10:49 +0000237 self.main_config.RO.uri = ro_uri + "ro"
Gabriel Cubae7898982023-05-11 01:57:21 -0500238 ro_server = NgRoClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000239 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000240 self.main_config.RO.ng = True
tierno2357f4e2020-10-19 16:38:59 +0000241 except Exception:
Luis Vegaa27dc532022-11-11 20:10:49 +0000242 self.main_config.RO.uri = ro_uri + "openmano"
Gabriel Cubae7898982023-05-11 01:57:21 -0500243 ro_server = ROClient(**self.main_config.RO.to_dict())
tierno2357f4e2020-10-19 16:38:59 +0000244 ro_version = await ro_server.get_version()
Luis Vegaa27dc532022-11-11 20:10:49 +0000245 self.main_config.RO.ng = False
tiernoe64f7fb2019-09-11 08:55:52 +0000246 if versiontuple(ro_version) < versiontuple(min_RO_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100247 raise LcmException(
248 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
249 ro_version, min_RO_version
250 )
251 )
252 self.logger.info(
253 "Connected to RO version {} new-generation version {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000254 ro_version, self.main_config.RO.ng
garciadeblas5697b8b2021-03-24 09:17:02 +0100255 )
256 )
tiernoe64f7fb2019-09-11 08:55:52 +0000257 return
tierno69f0d382020-05-07 13:08:09 +0000258 except (ROClientException, NgRoException) as e:
Luis Vegaa27dc532022-11-11 20:10:49 +0000259 self.main_config.RO.uri = ro_uri
tiernoe64f7fb2019-09-11 08:55:52 +0000260 tries -= 1
bravof922c4172020-11-24 21:21:43 -0300261 traceback.print_tb(e.__traceback__)
garciadeblas5697b8b2021-03-24 09:17:02 +0100262 error_text = "Error while connecting to RO on {}: {}".format(
Luis Vegaa27dc532022-11-11 20:10:49 +0000263 self.main_config.RO.uri, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100264 )
tiernoe64f7fb2019-09-11 08:55:52 +0000265 if tries <= 0:
266 self.logger.critical(error_text)
267 raise LcmException(error_text)
268 if last_error != error_text:
269 last_error = error_text
garciadeblas5697b8b2021-03-24 09:17:02 +0100270 self.logger.error(
271 error_text + ". Waiting until {} seconds".format(5 * tries)
272 )
tiernoe64f7fb2019-09-11 08:55:52 +0000273 await asyncio.sleep(5)
tierno22f4f9c2018-06-11 18:53:39 +0200274
tiernoc0e42e22018-05-11 11:36:10 +0200275 async def test(self, param=None):
276 self.logger.debug("Starting/Ending test task: {}".format(param))
277
tiernoc0e42e22018-05-11 11:36:10 +0200278 async def kafka_ping(self):
279 self.logger.debug("Task kafka_ping Enter")
280 consecutive_errors = 0
281 first_start = True
282 kafka_has_received = False
283 self.pings_not_received = 1
284 while True:
285 try:
tierno16427352019-04-22 11:37:36 +0000286 await self.msg_admin.aiowrite(
garciadeblas5697b8b2021-03-24 09:17:02 +0100287 "admin",
288 "ping",
289 {
290 "from": "lcm",
291 "to": "lcm",
292 "worker_id": self.worker_id,
293 "version": lcm_version,
294 },
garciadeblas5697b8b2021-03-24 09:17:02 +0100295 )
tiernoc0e42e22018-05-11 11:36:10 +0200296 # time between pings are low when it is not received and at starting
garciadeblas5697b8b2021-03-24 09:17:02 +0100297 wait_time = (
298 self.ping_interval_boot
299 if not kafka_has_received
300 else self.ping_interval_pace
301 )
tiernoc0e42e22018-05-11 11:36:10 +0200302 if not self.pings_not_received:
303 kafka_has_received = True
304 self.pings_not_received += 1
Gabriel Cubae7898982023-05-11 01:57:21 -0500305 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200306 if self.pings_not_received > 10:
307 raise LcmException("It is not receiving pings from Kafka bus")
308 consecutive_errors = 0
309 first_start = False
310 except LcmException:
311 raise
312 except Exception as e:
313 # if not first_start is the first time after starting. So leave more time and wait
314 # to allow kafka starts
315 if consecutive_errors == 8 if not first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100316 self.logger.error(
317 "Task kafka_read task exit error too many errors. Exception: {}".format(
318 e
319 )
320 )
tiernoc0e42e22018-05-11 11:36:10 +0200321 raise
322 consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100323 self.logger.error(
324 "Task kafka_read retrying after Exception {}".format(e)
325 )
tierno16427352019-04-22 11:37:36 +0000326 wait_time = 2 if not first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -0500327 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +0200328
garciadeblas96b94f52024-07-08 16:18:21 +0200329 def get_operation_params(self, item, operation_id):
330 operation_history = item.get("operationHistory", [])
331 operation = find_in_list(
332 operation_history, lambda op: op["op_id"] == operation_id
333 )
334 return operation.get("operationParams", {})
335
Gabriel Cubab6049d32023-10-30 13:44:49 -0500336 async def kafka_read_callback(self, topic, command, params):
gcalvinoed7f6d42018-12-14 14:44:56 +0100337 order_id = 1
garciadeblas53cd63a2024-09-23 10:12:22 +0200338 self.logger.debug(
rshri932105f2024-07-05 15:11:55 +0000339 "Topic: {} command: {} params: {} order ID: {}".format(
340 topic, command, params, order_id
341 )
342 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100343 if topic != "admin" and command != "ping":
garciadeblas53cd63a2024-09-23 10:12:22 +0200344 self.logger.info(
garciadeblas5697b8b2021-03-24 09:17:02 +0100345 "Task kafka_read receives {} {}: {}".format(topic, command, params)
346 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100347 self.consecutive_errors = 0
348 self.first_start = False
349 order_id += 1
garciadeblas53cd63a2024-09-23 10:12:22 +0200350 self.logger.debug(
rshri932105f2024-07-05 15:11:55 +0000351 "Consecutive error: {} First start: {}".format(
352 self.consecutive_errors, self.first_start
353 )
354 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100355 if command == "exit":
356 raise LcmExceptionExit
357 elif command.startswith("#"):
358 return
359 elif command == "echo":
360 # just for test
361 print(params)
362 sys.stdout.flush()
363 return
364 elif command == "test":
Gabriel Cubae7898982023-05-11 01:57:21 -0500365 asyncio.Task(self.test(params))
gcalvinoed7f6d42018-12-14 14:44:56 +0100366 return
367
368 if topic == "admin":
369 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
tierno16427352019-04-22 11:37:36 +0000370 if params.get("worker_id") != self.worker_id:
371 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100372 self.pings_not_received = 0
tierno3e359b12019-02-03 02:29:13 +0100373 try:
aticig56b86c22022-06-29 10:43:05 +0300374 with open(self.health_check_file, "w") as f:
tierno3e359b12019-02-03 02:29:13 +0100375 f.write(str(time()))
376 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100377 self.logger.error(
378 "Cannot write into '{}' for healthcheck: {}".format(
aticig56b86c22022-06-29 10:43:05 +0300379 self.health_check_file, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100380 )
381 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100382 return
Gabriel Cubab6049d32023-10-30 13:44:49 -0500383 elif topic == "nslcmops":
384 if command == "cancel":
385 nslcmop_id = params["_id"]
386 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
387 nsr_id = params["nsInstanceId"]
388 # cancel the tasks and wait
389 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
390 try:
391 await task
392 self.logger.debug(
393 "Cancelled task ended {},{},{}".format(
394 nsr_id, nslcmop_id, task
395 )
396 )
397 except asyncio.CancelledError:
398 self.logger.debug(
399 "Task already cancelled and finished {},{},{}".format(
400 nsr_id, nslcmop_id, task
401 )
402 )
403 # update DB
404 q_filter = {"_id": nslcmop_id}
405 update_dict = {
406 "operationState": "FAILED_TEMP",
407 "isCancelPending": False,
408 }
409 unset_dict = {
410 "cancelMode": None,
411 }
412 self.db.set_one(
413 "nslcmops",
414 q_filter=q_filter,
415 update_dict=update_dict,
416 fail_on_empty=False,
417 unset=unset_dict,
418 )
419 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
420 return
magnussonle9198bb2020-01-21 13:00:51 +0100421 elif topic == "pla":
422 if command == "placement":
423 self.ns.update_nsrs_with_pla_result(params)
424 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100425 elif topic == "k8scluster":
426 if command == "create" or command == "created":
427 k8scluster_id = params.get("_id")
428 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100429 self.lcm_tasks.register(
430 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
431 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100432 return
dariofaccin8bbeeb02023-01-23 18:13:27 +0100433 elif command == "edit" or command == "edited":
434 k8scluster_id = params.get("_id")
435 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
436 self.lcm_tasks.register(
437 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
438 )
439 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100440 elif command == "delete" or command == "deleted":
441 k8scluster_id = params.get("_id")
442 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100443 self.lcm_tasks.register(
444 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
445 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100446 return
David Garciac1fe90a2021-03-31 19:12:02 +0200447 elif topic == "vca":
448 if command == "create" or command == "created":
449 vca_id = params.get("_id")
450 task = asyncio.ensure_future(self.vca.create(params, order_id))
451 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
452 return
Dario Faccin8e53c6d2023-01-10 10:38:41 +0000453 elif command == "edit" or command == "edited":
454 vca_id = params.get("_id")
455 task = asyncio.ensure_future(self.vca.edit(params, order_id))
456 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
457 return
David Garciac1fe90a2021-03-31 19:12:02 +0200458 elif command == "delete" or command == "deleted":
459 vca_id = params.get("_id")
460 task = asyncio.ensure_future(self.vca.delete(params, order_id))
461 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
462 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100463 elif topic == "k8srepo":
464 if command == "create" or command == "created":
465 k8srepo_id = params.get("_id")
466 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
467 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100468 self.lcm_tasks.register(
469 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
470 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100471 return
472 elif command == "delete" or command == "deleted":
473 k8srepo_id = params.get("_id")
474 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100475 self.lcm_tasks.register(
476 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
477 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100478 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100479 elif topic == "ns":
tierno307425f2020-01-26 23:35:59 +0000480 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100481 # self.logger.debug("Deploying NS {}".format(nsr_id))
rshri932105f2024-07-05 15:11:55 +0000482 self.logger.info("NS instantiate")
gcalvinoed7f6d42018-12-14 14:44:56 +0100483 nslcmop = params
484 nslcmop_id = nslcmop["_id"]
485 nsr_id = nslcmop["nsInstanceId"]
rshri932105f2024-07-05 15:11:55 +0000486 self.logger.info(
487 "NsLCMOP: {} NsLCMOP_ID:{} nsr_id: {}".format(
488 nslcmop, nslcmop_id, nsr_id
489 )
490 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100491 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100492 self.lcm_tasks.register(
493 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
494 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100495 return
tierno307425f2020-01-26 23:35:59 +0000496 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100497 # self.logger.debug("Deleting NS {}".format(nsr_id))
498 nslcmop = params
499 nslcmop_id = nslcmop["_id"]
500 nsr_id = nslcmop["nsInstanceId"]
501 self.lcm_tasks.cancel(topic, nsr_id)
502 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
503 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
504 return
ksaikiranr3fde2c72021-03-15 10:39:06 +0530505 elif command == "vca_status_refresh":
506 nslcmop = params
507 nslcmop_id = nslcmop["_id"]
508 nsr_id = nslcmop["nsInstanceId"]
garciadeblas5697b8b2021-03-24 09:17:02 +0100509 task = asyncio.ensure_future(
510 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
511 )
512 self.lcm_tasks.register(
513 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
514 )
ksaikiranr3fde2c72021-03-15 10:39:06 +0530515 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100516 elif command == "action":
517 # self.logger.debug("Update NS {}".format(nsr_id))
518 nslcmop = params
519 nslcmop_id = nslcmop["_id"]
520 nsr_id = nslcmop["nsInstanceId"]
521 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
522 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
523 return
aticigdffa6212022-04-12 15:27:53 +0300524 elif command == "update":
525 # self.logger.debug("Update NS {}".format(nsr_id))
526 nslcmop = params
527 nslcmop_id = nslcmop["_id"]
528 nsr_id = nslcmop["nsInstanceId"]
529 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
530 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
531 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100532 elif command == "scale":
533 # self.logger.debug("Update NS {}".format(nsr_id))
534 nslcmop = params
535 nslcmop_id = nslcmop["_id"]
536 nsr_id = nslcmop["nsInstanceId"]
537 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
538 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
539 return
garciadeblas07f4e4c2022-06-09 09:42:58 +0200540 elif command == "heal":
541 # self.logger.debug("Healing NS {}".format(nsr_id))
542 nslcmop = params
543 nslcmop_id = nslcmop["_id"]
544 nsr_id = nslcmop["nsInstanceId"]
545 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
preethika.p28b0bf82022-09-23 07:36:28 +0000546 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
garciadeblas07f4e4c2022-06-09 09:42:58 +0200547 return
elumalai80bcf1c2022-04-28 18:05:01 +0530548 elif command == "migrate":
549 nslcmop = params
550 nslcmop_id = nslcmop["_id"]
551 nsr_id = nslcmop["nsInstanceId"]
552 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
553 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
554 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100555 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000556 nsr_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100557 try:
558 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100559 print(
560 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
561 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
562 "".format(
563 nsr_id,
564 db_nsr["operational-status"],
565 db_nsr["config-status"],
566 db_nsr["detailed-status"],
567 db_nsr["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500568 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100569 )
570 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100571 except Exception as e:
572 print("nsr {} not found: {}".format(nsr_id, e))
573 sys.stdout.flush()
574 return
575 elif command == "deleted":
576 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100577 elif command in (
elumalaica7ece02022-04-12 12:47:32 +0530578 "vnf_terminated",
elumalaib9e357c2022-04-27 09:58:38 +0530579 "policy_updated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100580 "terminated",
581 "instantiated",
582 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200583 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100584 "actioned",
aticigdffa6212022-04-12 15:27:53 +0300585 "updated",
elumalai80bcf1c2022-04-28 18:05:01 +0530586 "migrated",
rshri932105f2024-07-05 15:11:55 +0000587 "verticalscaled",
garciadeblas5697b8b2021-03-24 09:17:02 +0100588 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100589 return
elumalaica7ece02022-04-12 12:47:32 +0530590
gcalvinoed7f6d42018-12-14 14:44:56 +0100591 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
tierno307425f2020-01-26 23:35:59 +0000592 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100593 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
594 nsilcmop = params
595 nsilcmop_id = nsilcmop["_id"] # slice operation id
596 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
garciadeblas5697b8b2021-03-24 09:17:02 +0100597 task = asyncio.ensure_future(
598 self.netslice.instantiate(nsir_id, nsilcmop_id)
599 )
600 self.lcm_tasks.register(
601 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
602 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100603 return
tierno307425f2020-01-26 23:35:59 +0000604 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100605 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
606 nsilcmop = params
607 nsilcmop_id = nsilcmop["_id"] # slice operation id
608 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
609 self.lcm_tasks.cancel(topic, nsir_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100610 task = asyncio.ensure_future(
611 self.netslice.terminate(nsir_id, nsilcmop_id)
612 )
613 self.lcm_tasks.register(
614 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
615 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100616 return
617 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000618 nsir_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100619 try:
620 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100621 print(
622 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
623 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
624 "".format(
625 nsir_id,
626 db_nsir["operational-status"],
627 db_nsir["config-status"],
628 db_nsir["detailed-status"],
629 db_nsir["_admin"]["deployed"],
Gabriel Cuba411af2e2023-01-06 17:23:22 -0500630 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
garciadeblas5697b8b2021-03-24 09:17:02 +0100631 )
632 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100633 except Exception as e:
634 print("nsir {} not found: {}".format(nsir_id, e))
635 sys.stdout.flush()
636 return
637 elif command == "deleted":
638 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100639 elif command in (
640 "terminated",
641 "instantiated",
642 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200643 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100644 "actioned",
645 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100646 return
647 elif topic == "vim_account":
648 vim_id = params["_id"]
garciadeblas96b94f52024-07-08 16:18:21 +0200649 op_id = vim_id
yshah771dea82024-07-05 15:11:49 +0000650 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
garciadeblas96b94f52024-07-08 16:18:21 +0200651 vim_config = db_vim.get("config", {})
tiernof210c1c2019-10-16 09:09:58 +0000652 if command in ("create", "created"):
garciadeblas96b94f52024-07-08 16:18:21 +0200653 self.logger.debug("Main config: {}".format(self.main_config.to_dict()))
garciadeblasc0db9e22025-04-03 13:43:57 +0200654 if "credentials" in vim_config or "credentials_base64" in vim_config:
garciadeblas96b94f52024-07-08 16:18:21 +0200655 self.logger.info("Vim add cloud credentials")
656 task = asyncio.ensure_future(
yshah564ec9c2024-11-29 07:33:32 +0000657 self.cloud_credentials.add(params, order_id)
garciadeblas96b94f52024-07-08 16:18:21 +0200658 )
659 self.lcm_tasks.register(
660 "vim_account", vim_id, op_id, "cloud_credentials_add", task
661 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000662 if not self.main_config.RO.ng:
garciadeblas96b94f52024-07-08 16:18:21 +0200663 self.logger.info("Calling RO to create VIM (no NG-RO)")
tierno2357f4e2020-10-19 16:38:59 +0000664 task = asyncio.ensure_future(self.vim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100665 self.lcm_tasks.register(
666 "vim_account", vim_id, order_id, "vim_create", task
667 )
garciadeblas96b94f52024-07-08 16:18:21 +0200668 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100669 elif command == "delete" or command == "deleted":
garciadeblas96b94f52024-07-08 16:18:21 +0200670 self.lcm_tasks.cancel(topic, vim_id)
garciadeblasc0db9e22025-04-03 13:43:57 +0200671 if "credentials" in vim_config or "credentials_base64" in vim_config:
garciadeblas96b94f52024-07-08 16:18:21 +0200672 self.logger.info("Vim remove cloud credentials")
yshah771dea82024-07-05 15:11:49 +0000673 task = asyncio.ensure_future(
yshah564ec9c2024-11-29 07:33:32 +0000674 self.cloud_credentials.remove(params, order_id)
yshah771dea82024-07-05 15:11:49 +0000675 )
676 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200677 "vim_account", vim_id, op_id, "cloud_credentials_remove", task
yshah771dea82024-07-05 15:11:49 +0000678 )
garciadeblas96b94f52024-07-08 16:18:21 +0200679 task = asyncio.ensure_future(self.vim.delete(params, order_id))
680 self.lcm_tasks.register(
681 "vim_account", vim_id, order_id, "vim_delete", task
682 )
683 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100684 elif command == "show":
685 print("not implemented show with vim_account")
686 sys.stdout.flush()
687 return
tiernof210c1c2019-10-16 09:09:58 +0000688 elif command in ("edit", "edited"):
garciadeblasc0db9e22025-04-03 13:43:57 +0200689 if "credentials" in vim_config or "credentials_base64" in vim_config:
garciadeblas96b94f52024-07-08 16:18:21 +0200690 self.logger.info("Vim update cloud credentials")
yshah771dea82024-07-05 15:11:49 +0000691 task = asyncio.ensure_future(
yshah564ec9c2024-11-29 07:33:32 +0000692 self.cloud_credentials.edit(params, order_id)
yshah771dea82024-07-05 15:11:49 +0000693 )
694 self.lcm_tasks.register(
695 "vim_account", vim_id, op_id, "cloud_credentials_update", task
696 )
Luis Vegaa27dc532022-11-11 20:10:49 +0000697 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000698 task = asyncio.ensure_future(self.vim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100699 self.lcm_tasks.register(
700 "vim_account", vim_id, order_id, "vim_edit", task
701 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100702 return
tiernof210c1c2019-10-16 09:09:58 +0000703 elif command == "deleted":
704 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100705 elif topic == "wim_account":
706 wim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000707 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000708 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000709 task = asyncio.ensure_future(self.wim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100710 self.lcm_tasks.register(
711 "wim_account", wim_id, order_id, "wim_create", task
712 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100713 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100714 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100715 self.lcm_tasks.cancel(topic, wim_id)
kuuse6a470c62019-07-10 13:52:45 +0200716 task = asyncio.ensure_future(self.wim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100717 self.lcm_tasks.register(
718 "wim_account", wim_id, order_id, "wim_delete", task
719 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100720 return
721 elif command == "show":
722 print("not implemented show with wim_account")
723 sys.stdout.flush()
724 return
tiernof210c1c2019-10-16 09:09:58 +0000725 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100726 task = asyncio.ensure_future(self.wim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100727 self.lcm_tasks.register(
728 "wim_account", wim_id, order_id, "wim_edit", task
729 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100730 return
tiernof210c1c2019-10-16 09:09:58 +0000731 elif command == "deleted":
732 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100733 elif topic == "sdn":
734 _sdn_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000735 if command in ("create", "created"):
Luis Vegaa27dc532022-11-11 20:10:49 +0000736 if not self.main_config.RO.ng:
tierno2357f4e2020-10-19 16:38:59 +0000737 task = asyncio.ensure_future(self.sdn.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100738 self.lcm_tasks.register(
739 "sdn", _sdn_id, order_id, "sdn_create", task
740 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100741 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100742 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100743 self.lcm_tasks.cancel(topic, _sdn_id)
kuuse6a470c62019-07-10 13:52:45 +0200744 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
gcalvinoed7f6d42018-12-14 14:44:56 +0100745 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
746 return
tiernof210c1c2019-10-16 09:09:58 +0000747 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100748 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
749 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
750 return
tiernof210c1c2019-10-16 09:09:58 +0000751 elif command == "deleted":
752 return # TODO cleaning of task just in case should be done
rshri932105f2024-07-05 15:11:55 +0000753 elif topic == "cluster":
rshri948f7de2024-12-02 03:42:35 +0000754 cluster_id = params["cluster_id"]
755 op_id = params["operation_id"]
rshri932105f2024-07-05 15:11:55 +0000756 if command == "create" or command == "created":
rshri932105f2024-07-05 15:11:55 +0000757 self.logger.debug("cluster_id = {}".format(cluster_id))
rshri948f7de2024-12-02 03:42:35 +0000758 task = asyncio.ensure_future(self.cluster.create(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000759 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200760 "cluster", cluster_id, op_id, "cluster_create", task
rshri932105f2024-07-05 15:11:55 +0000761 )
762 return
763 elif command == "delete" or command == "deleted":
rshri948f7de2024-12-02 03:42:35 +0000764 task = asyncio.ensure_future(self.cluster.delete(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000765 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200766 "cluster", cluster_id, op_id, "cluster_delete", task
rshri932105f2024-07-05 15:11:55 +0000767 )
768 return
769 elif command == "add" or command == "added":
garciadeblas96b94f52024-07-08 16:18:21 +0200770 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000771 self.cluster.attach_profile(params, order_id)
garciadeblas96b94f52024-07-08 16:18:21 +0200772 )
rshri932105f2024-07-05 15:11:55 +0000773 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200774 "cluster", cluster_id, op_id, "profile_add", task
rshri932105f2024-07-05 15:11:55 +0000775 )
776 return
777 elif command == "remove" or command == "removed":
garciadeblas96b94f52024-07-08 16:18:21 +0200778 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000779 self.cluster.detach_profile(params, order_id)
garciadeblas96b94f52024-07-08 16:18:21 +0200780 )
rshri932105f2024-07-05 15:11:55 +0000781 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200782 "cluster", cluster_id, op_id, "profile_remove", task
rshri932105f2024-07-05 15:11:55 +0000783 )
784 return
785 elif command == "register" or command == "registered":
rshri948f7de2024-12-02 03:42:35 +0000786 task = asyncio.ensure_future(self.cluster.register(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000787 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200788 "cluster", cluster_id, op_id, "cluster_register", task
rshri932105f2024-07-05 15:11:55 +0000789 )
790 return
791 elif command == "deregister" or command == "deregistered":
rshri948f7de2024-12-02 03:42:35 +0000792 task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000793 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200794 "cluster", cluster_id, op_id, "cluster_deregister", task
rshri932105f2024-07-05 15:11:55 +0000795 )
796 return
yshah771dea82024-07-05 15:11:49 +0000797 elif command == "get_creds":
rshri948f7de2024-12-02 03:42:35 +0000798 task = asyncio.ensure_future(self.cluster.get_creds(params, order_id))
yshah771dea82024-07-05 15:11:49 +0000799 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200800 "cluster", cluster_id, cluster_id, "cluster_get_credentials", task
yshah771dea82024-07-05 15:11:49 +0000801 )
802 return
yshah0defcd52024-11-18 07:41:35 +0000803 elif command == "upgrade" or command == "scale" or command == "update":
rshri948f7de2024-12-02 03:42:35 +0000804 cluster_id = params["cluster_id"]
805 op_id = params["operation_id"]
garciadeblas96b94f52024-07-08 16:18:21 +0200806 # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
Your Name86149632024-11-14 16:17:16 +0000807 """
garciadeblas96b94f52024-07-08 16:18:21 +0200808 db_vim = self.db.get_one(
809 "vim_accounts", {"name": db_cluster["vim_account"]}
810 )
811 db_content["vim_account"] = db_vim
Your Name86149632024-11-14 16:17:16 +0000812 """
rshri948f7de2024-12-02 03:42:35 +0000813 task = asyncio.ensure_future(self.cluster.update(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200814 self.lcm_tasks.register(
815 "cluster", cluster_id, op_id, "cluster_update", task
816 )
yshah771dea82024-07-05 15:11:49 +0000817 return
rshri932105f2024-07-05 15:11:55 +0000818 elif topic == "k8s_app":
garciadeblas96b94f52024-07-08 16:18:21 +0200819 op_id = params["operation_id"]
820 profile_id = params["profile_id"]
rshri932105f2024-07-05 15:11:55 +0000821 if command == "profile_create" or command == "profile_created":
garciadeblas96b94f52024-07-08 16:18:21 +0200822 self.logger.debug("Create k8s_app_id = {}".format(profile_id))
rshri948f7de2024-12-02 03:42:35 +0000823 task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000824 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200825 "k8s_app", profile_id, op_id, "k8s_app_create", task
rshri932105f2024-07-05 15:11:55 +0000826 )
827 return
828 elif command == "delete" or command == "deleted":
garciadeblas96b94f52024-07-08 16:18:21 +0200829 self.logger.debug("Delete k8s_app_id = {}".format(profile_id))
rshri948f7de2024-12-02 03:42:35 +0000830 task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000831 self.lcm_tasks.register(
garciadeblas96b94f52024-07-08 16:18:21 +0200832 "k8s_app", profile_id, op_id, "k8s_app_delete", task
rshri932105f2024-07-05 15:11:55 +0000833 )
834 return
835 elif topic == "k8s_resource":
garciadeblas96b94f52024-07-08 16:18:21 +0200836 op_id = params["operation_id"]
837 profile_id = params["profile_id"]
rshri932105f2024-07-05 15:11:55 +0000838 if command == "profile_create" or command == "profile_created":
garciadeblas96b94f52024-07-08 16:18:21 +0200839 self.logger.debug("Create k8s_resource_id = {}".format(profile_id))
rshri948f7de2024-12-02 03:42:35 +0000840 task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000841 self.lcm_tasks.register(
842 "k8s_resource",
garciadeblas96b94f52024-07-08 16:18:21 +0200843 profile_id,
844 op_id,
rshri932105f2024-07-05 15:11:55 +0000845 "k8s_resource_create",
846 task,
847 )
848 return
849 elif command == "delete" or command == "deleted":
garciadeblas96b94f52024-07-08 16:18:21 +0200850 self.logger.debug("Delete k8s_resource_id = {}".format(profile_id))
rshri948f7de2024-12-02 03:42:35 +0000851 task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
rshri932105f2024-07-05 15:11:55 +0000852 self.lcm_tasks.register(
853 "k8s_resource",
garciadeblas96b94f52024-07-08 16:18:21 +0200854 profile_id,
855 op_id,
rshri932105f2024-07-05 15:11:55 +0000856 "k8s_resource_delete",
857 task,
858 )
859 return
860
861 elif topic == "k8s_infra_controller":
garciadeblas96b94f52024-07-08 16:18:21 +0200862 op_id = params["operation_id"]
863 profile_id = params["profile_id"]
rshri932105f2024-07-05 15:11:55 +0000864 if command == "profile_create" or command == "profile_created":
rshri932105f2024-07-05 15:11:55 +0000865 self.logger.debug(
garciadeblas96b94f52024-07-08 16:18:21 +0200866 "Create k8s_infra_controller_id = {}".format(profile_id)
rshri932105f2024-07-05 15:11:55 +0000867 )
868 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000869 self.k8s_infra_controller.create(params, order_id)
rshri932105f2024-07-05 15:11:55 +0000870 )
871 self.lcm_tasks.register(
872 "k8s_infra_controller",
garciadeblas96b94f52024-07-08 16:18:21 +0200873 profile_id,
874 op_id,
rshri932105f2024-07-05 15:11:55 +0000875 "k8s_infra_controller_create",
876 task,
877 )
878 return
879 elif command == "delete" or command == "deleted":
garciadeblas96b94f52024-07-08 16:18:21 +0200880 self.logger.debug(
881 "Delete k8s_infra_controller_id = {}".format(profile_id)
882 )
rshri932105f2024-07-05 15:11:55 +0000883 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000884 self.k8s_infra_controller.delete(params, order_id)
rshri932105f2024-07-05 15:11:55 +0000885 )
886 self.lcm_tasks.register(
887 "k8s_infra_controller",
garciadeblas96b94f52024-07-08 16:18:21 +0200888 profile_id,
889 op_id,
rshri932105f2024-07-05 15:11:55 +0000890 "k8s_infra_controller_delete",
891 task,
892 )
893 return
894
895 elif topic == "k8s_infra_config":
garciadeblas96b94f52024-07-08 16:18:21 +0200896 op_id = params["operation_id"]
897 profile_id = params["profile_id"]
rshri932105f2024-07-05 15:11:55 +0000898 if command == "profile_create" or command == "profile_created":
garciadeblas96b94f52024-07-08 16:18:21 +0200899 self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id))
rshri932105f2024-07-05 15:11:55 +0000900 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000901 self.k8s_infra_config.create(params, order_id)
rshri932105f2024-07-05 15:11:55 +0000902 )
903 self.lcm_tasks.register(
904 "k8s_infra_config",
garciadeblas96b94f52024-07-08 16:18:21 +0200905 profile_id,
906 op_id,
rshri932105f2024-07-05 15:11:55 +0000907 "k8s_infra_config_create",
908 task,
909 )
910 return
911 elif command == "delete" or command == "deleted":
garciadeblas96b94f52024-07-08 16:18:21 +0200912 self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id))
rshri932105f2024-07-05 15:11:55 +0000913 task = asyncio.ensure_future(
rshri948f7de2024-12-02 03:42:35 +0000914 self.k8s_infra_config.delete(params, order_id)
rshri932105f2024-07-05 15:11:55 +0000915 )
916 self.lcm_tasks.register(
917 "k8s_infra_config",
garciadeblas96b94f52024-07-08 16:18:21 +0200918 profile_id,
919 op_id,
rshri932105f2024-07-05 15:11:55 +0000920 "k8s_infra_config_delete",
921 task,
922 )
923 return
yshah771dea82024-07-05 15:11:49 +0000924 elif topic == "oka":
garciadeblas96b94f52024-07-08 16:18:21 +0200925 op_id = params["operation_id"]
926 oka_id = params["oka_id"]
yshah771dea82024-07-05 15:11:49 +0000927 if command == "create":
yshah564ec9c2024-11-29 07:33:32 +0000928 task = asyncio.ensure_future(self.oka.create(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200929 self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task)
yshah771dea82024-07-05 15:11:49 +0000930 return
931 elif command == "edit":
yshah564ec9c2024-11-29 07:33:32 +0000932 task = asyncio.ensure_future(self.oka.edit(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200933 self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task)
yshah771dea82024-07-05 15:11:49 +0000934 return
935 elif command == "delete":
yshah564ec9c2024-11-29 07:33:32 +0000936 task = asyncio.ensure_future(self.oka.delete(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200937 self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task)
yshah771dea82024-07-05 15:11:49 +0000938 return
939 elif topic == "ksu":
garciadeblas96b94f52024-07-08 16:18:21 +0200940 op_id = params["operation_id"]
yshah564ec9c2024-11-29 07:33:32 +0000941 ksu_id = op_id
yshah771dea82024-07-05 15:11:49 +0000942 if command == "create":
yshah564ec9c2024-11-29 07:33:32 +0000943 task = asyncio.ensure_future(self.ksu.create(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200944 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task)
yshah771dea82024-07-05 15:11:49 +0000945 return
946 elif command == "edit" or command == "edited":
yshah564ec9c2024-11-29 07:33:32 +0000947 task = asyncio.ensure_future(self.ksu.edit(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200948 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task)
yshah771dea82024-07-05 15:11:49 +0000949 return
950 elif command == "delete":
yshah564ec9c2024-11-29 07:33:32 +0000951 task = asyncio.ensure_future(self.ksu.delete(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200952 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task)
yshah771dea82024-07-05 15:11:49 +0000953 return
954 elif command == "clone":
yshah564ec9c2024-11-29 07:33:32 +0000955 task = asyncio.ensure_future(self.ksu.clone(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200956 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task)
yshah771dea82024-07-05 15:11:49 +0000957 return
958 elif command == "move":
yshah564ec9c2024-11-29 07:33:32 +0000959 task = asyncio.ensure_future(self.ksu.move(params, order_id))
garciadeblas96b94f52024-07-08 16:18:21 +0200960 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
yshah771dea82024-07-05 15:11:49 +0000961 return
garciadeblas61a4c692025-07-17 13:04:13 +0200962 elif topic == "appinstance":
963 op_id = params["operation_id"]
964 appinstance_id = params["appinstance"]
965 if command == "create":
966 task = asyncio.ensure_future(self.appinstance.create(params, order_id))
967 self.lcm_tasks.register(
968 "appinstance", appinstance_id, op_id, "app_create", task
969 )
970 return
971 elif command == "update" or command == "updated":
972 task = asyncio.ensure_future(self.appinstance.update(params, order_id))
973 self.lcm_tasks.register(
974 "appinstance", appinstance_id, op_id, "app_edit", task
975 )
976 return
977 elif command == "delete":
978 task = asyncio.ensure_future(self.appinstance.delete(params, order_id))
979 self.lcm_tasks.register(
980 "appinstance", appinstance_id, op_id, "app_delete", task
981 )
982 return
yshah83a30572025-06-13 08:38:49 +0000983 elif topic == "nodegroup":
984 nodegroup_id = params["nodegroup_id"]
985 op_id = params["operation_id"]
986 if command == "add_nodegroup":
987 task = asyncio.ensure_future(self.nodegroup.create(params, order_id))
988 self.lcm_tasks.register(
989 "nodegroup", nodegroup_id, op_id, "add_node", task
990 )
991 return
992 elif command == "scale_nodegroup":
993 task = asyncio.ensure_future(self.nodegroup.scale(params, order_id))
994 self.lcm_tasks.register(
995 "nodegroup", nodegroup_id, op_id, "scale_node", task
996 )
997 return
998 elif command == "delete_nodegroup":
999 task = asyncio.ensure_future(self.nodegroup.delete(params, order_id))
1000 self.lcm_tasks.register(
1001 "nodegroup", nodegroup_id, op_id, "delete_node", task
1002 )
1003 return
rshri932105f2024-07-05 15:11:55 +00001004
gcalvinoed7f6d42018-12-14 14:44:56 +01001005 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1006
tiernoc0e42e22018-05-11 11:36:10 +02001007 async def kafka_read(self):
garciadeblas5697b8b2021-03-24 09:17:02 +01001008 self.logger.debug(
1009 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
1010 )
gcalvinoed7f6d42018-12-14 14:44:56 +01001011 self.consecutive_errors = 0
1012 self.first_start = True
1013 while self.consecutive_errors < 10:
tiernoc0e42e22018-05-11 11:36:10 +02001014 try:
garciadeblas5697b8b2021-03-24 09:17:02 +01001015 topics = (
1016 "ns",
1017 "vim_account",
1018 "wim_account",
1019 "sdn",
1020 "nsi",
1021 "k8scluster",
1022 "vca",
1023 "k8srepo",
1024 "pla",
Gabriel Cubab6049d32023-10-30 13:44:49 -05001025 "nslcmops",
rshri932105f2024-07-05 15:11:55 +00001026 "cluster",
1027 "k8s_app",
1028 "k8s_resource",
1029 "k8s_infra_controller",
1030 "k8s_infra_config",
yshah771dea82024-07-05 15:11:49 +00001031 "oka",
1032 "ksu",
garciadeblas61a4c692025-07-17 13:04:13 +02001033 "appinstance",
yshah83a30572025-06-13 08:38:49 +00001034 "nodegroup",
rshri932105f2024-07-05 15:11:55 +00001035 )
garciadeblas5c12d9f2024-09-17 11:38:51 +02001036 self.logger.debug(
rshri932105f2024-07-05 15:11:55 +00001037 "Consecutive errors: {} first start: {}".format(
1038 self.consecutive_errors, self.first_start
1039 )
garciadeblas5697b8b2021-03-24 09:17:02 +01001040 )
1041 topics_admin = ("admin",)
tierno16427352019-04-22 11:37:36 +00001042 await asyncio.gather(
garciadeblas5697b8b2021-03-24 09:17:02 +01001043 self.msg.aioread(
Gabriel Cubab6049d32023-10-30 13:44:49 -05001044 topics,
1045 aiocallback=self.kafka_read_callback,
1046 from_beginning=True,
garciadeblas5697b8b2021-03-24 09:17:02 +01001047 ),
1048 self.msg_admin.aioread(
1049 topics_admin,
Gabriel Cubab6049d32023-10-30 13:44:49 -05001050 aiocallback=self.kafka_read_callback,
garciadeblas5697b8b2021-03-24 09:17:02 +01001051 group_id=False,
1052 ),
tierno16427352019-04-22 11:37:36 +00001053 )
tiernoc0e42e22018-05-11 11:36:10 +02001054
gcalvinoed7f6d42018-12-14 14:44:56 +01001055 except LcmExceptionExit:
1056 self.logger.debug("Bye!")
1057 break
tiernoc0e42e22018-05-11 11:36:10 +02001058 except Exception as e:
1059 # if not first_start is the first time after starting. So leave more time and wait
1060 # to allow kafka starts
gcalvinoed7f6d42018-12-14 14:44:56 +01001061 if self.consecutive_errors == 8 if not self.first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +01001062 self.logger.error(
1063 "Task kafka_read task exit error too many errors. Exception: {}".format(
1064 e
1065 )
1066 )
tiernoc0e42e22018-05-11 11:36:10 +02001067 raise
gcalvinoed7f6d42018-12-14 14:44:56 +01001068 self.consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +01001069 self.logger.error(
1070 "Task kafka_read retrying after Exception {}".format(e)
1071 )
gcalvinoed7f6d42018-12-14 14:44:56 +01001072 wait_time = 2 if not self.first_start else 5
Gabriel Cubae7898982023-05-11 01:57:21 -05001073 await asyncio.sleep(wait_time)
tiernoc0e42e22018-05-11 11:36:10 +02001074
tiernoc0e42e22018-05-11 11:36:10 +02001075 self.logger.debug("Task kafka_read exit")
1076
Gabriel Cubae7898982023-05-11 01:57:21 -05001077 async def kafka_read_ping(self):
1078 await asyncio.gather(self.kafka_read(), self.kafka_ping())
1079
Mark Beierl1addc932023-05-18 15:11:34 -04001080 async def start(self):
rshri932105f2024-07-05 15:11:55 +00001081 self.logger.info("Start LCM")
tierno22f4f9c2018-06-11 18:53:39 +02001082 # check RO version
Mark Beierl1addc932023-05-18 15:11:34 -04001083 await self.check_RO_version()
tierno22f4f9c2018-06-11 18:53:39 +02001084
Gabriel Cubae7898982023-05-11 01:57:21 -05001085 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
Luis Vegaa27dc532022-11-11 20:10:49 +00001086 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
garciadeblas5697b8b2021-03-24 09:17:02 +01001087 self.netslice = netslice.NetsliceLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -05001088 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
garciadeblas5697b8b2021-03-24 09:17:02 +01001089 )
Gabriel Cubae7898982023-05-11 01:57:21 -05001090 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
1091 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
1092 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +01001093 self.k8scluster = vim_sdn.K8sClusterLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -05001094 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +01001095 )
Gabriel Cubae7898982023-05-11 01:57:21 -05001096 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas5697b8b2021-03-24 09:17:02 +01001097 self.k8srepo = vim_sdn.K8sRepoLcm(
Gabriel Cubae7898982023-05-11 01:57:21 -05001098 self.msg, self.lcm_tasks, self.main_config.to_dict()
garciadeblas5697b8b2021-03-24 09:17:02 +01001099 )
rshri932105f2024-07-05 15:11:55 +00001100 self.cluster = k8s.ClusterLcm(
1101 self.msg, self.lcm_tasks, self.main_config.to_dict()
1102 )
1103 self.k8s_app = k8s.K8sAppLcm(
1104 self.msg, self.lcm_tasks, self.main_config.to_dict()
1105 )
1106 self.k8s_resource = k8s.K8sResourceLcm(
1107 self.msg, self.lcm_tasks, self.main_config.to_dict()
1108 )
1109 self.k8s_infra_controller = k8s.K8sInfraControllerLcm(
1110 self.msg, self.lcm_tasks, self.main_config.to_dict()
1111 )
1112 self.k8s_infra_config = k8s.K8sInfraConfigLcm(
1113 self.msg, self.lcm_tasks, self.main_config.to_dict()
1114 )
yshah771dea82024-07-05 15:11:49 +00001115 self.cloud_credentials = k8s.CloudCredentialsLcm(
1116 self.msg, self.lcm_tasks, self.main_config.to_dict()
1117 )
1118 self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
1119 self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
garciadeblas61a4c692025-07-17 13:04:13 +02001120 self.appinstance = k8s.AppInstanceLcm(
1121 self.msg, self.lcm_tasks, self.main_config.to_dict()
1122 )
yshah83a30572025-06-13 08:38:49 +00001123 self.nodegroup = k8s.NodeGroupLcm(
1124 self.msg, self.lcm_tasks, self.main_config.to_dict()
1125 )
rshri932105f2024-07-05 15:11:55 +00001126
1127 self.logger.info(
1128 "Msg: {} lcm tasks: {} main config: {}".format(
1129 self.msg, self.lcm_tasks, self.main_config
1130 )
1131 )
tierno2357f4e2020-10-19 16:38:59 +00001132
Mark Beierl1addc932023-05-18 15:11:34 -04001133 await self.kafka_read_ping()
bravof73bac502021-05-11 07:38:47 -04001134
tiernoc0e42e22018-05-11 11:36:10 +02001135 # TODO
1136 # self.logger.debug("Terminating cancelling creation tasks")
tiernoca2e16a2018-06-29 15:25:24 +02001137 # self.lcm_tasks.cancel("ALL", "create")
tiernoc0e42e22018-05-11 11:36:10 +02001138 # timeout = 200
1139 # while self.is_pending_tasks():
1140 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
Gabriel Cubae7898982023-05-11 01:57:21 -05001141 # await asyncio.sleep(2)
tiernoc0e42e22018-05-11 11:36:10 +02001142 # timeout -= 2
1143 # if not timeout:
tiernoca2e16a2018-06-29 15:25:24 +02001144 # self.lcm_tasks.cancel("ALL", "ALL")
tiernoc0e42e22018-05-11 11:36:10 +02001145 if self.db:
1146 self.db.db_disconnect()
1147 if self.msg:
1148 self.msg.disconnect()
tierno16427352019-04-22 11:37:36 +00001149 if self.msg_admin:
1150 self.msg_admin.disconnect()
tiernoc0e42e22018-05-11 11:36:10 +02001151 if self.fs:
1152 self.fs.fs_disconnect()
1153
tiernoc0e42e22018-05-11 11:36:10 +02001154 def read_config_file(self, config_file):
tiernoc0e42e22018-05-11 11:36:10 +02001155 try:
Gabriel Cubaa89a5a72022-11-26 18:55:15 -05001156 with open(config_file) as f:
1157 return yaml.safe_load(f)
tiernoc0e42e22018-05-11 11:36:10 +02001158 except Exception as e:
1159 self.logger.critical("At config file '{}': {}".format(config_file, e))
Gabriel Cubaa89a5a72022-11-26 18:55:15 -05001160 exit(1)
tiernoc0e42e22018-05-11 11:36:10 +02001161
tierno16427352019-04-22 11:37:36 +00001162 @staticmethod
1163 def get_process_id():
1164 """
1165 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
1166 will provide a random one
1167 :return: Obtained ID
1168 """
Gabriel Cuba4c0e6802023-10-09 13:22:38 -05001169
1170 def get_docker_id():
1171 try:
1172 with open("/proc/self/cgroup", "r") as f:
1173 text_id_ = f.readline()
1174 _, _, text_id = text_id_.rpartition("/")
1175 return text_id.replace("\n", "")[:12]
1176 except Exception:
1177 return None
1178
1179 def generate_random_id():
1180 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
1181
1182 # Try getting docker id. If it fails, generate a random id
1183 docker_id = get_docker_id()
1184 return docker_id if docker_id else generate_random_id()
tierno16427352019-04-22 11:37:36 +00001185
tiernoc0e42e22018-05-11 11:36:10 +02001186
tierno275411e2018-05-16 14:33:32 +02001187def usage():
garciadeblas5697b8b2021-03-24 09:17:02 +01001188 print(
1189 """Usage: {} [options]
quilesj7e13aeb2019-10-08 13:34:55 +02001190 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
tiernoa9843d82018-10-24 10:44:20 +02001191 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
tierno275411e2018-05-16 14:33:32 +02001192 -h|--help: shows this help
garciadeblas5697b8b2021-03-24 09:17:02 +01001193 """.format(
1194 sys.argv[0]
1195 )
1196 )
tierno750b2452018-05-17 16:39:29 +02001197 # --log-socket-host HOST: send logs to this host")
1198 # --log-socket-port PORT: send logs using this port (default: 9022)")
tierno275411e2018-05-16 14:33:32 +02001199
1200
garciadeblas5697b8b2021-03-24 09:17:02 +01001201if __name__ == "__main__":
tierno275411e2018-05-16 14:33:32 +02001202 try:
tierno8c16b052020-02-05 15:08:32 +00001203 # print("SYS.PATH='{}'".format(sys.path))
tierno275411e2018-05-16 14:33:32 +02001204 # load parameters and configuration
quilesj7e13aeb2019-10-08 13:34:55 +02001205 # -h
1206 # -c value
1207 # --config value
1208 # --help
1209 # --health-check
garciadeblas5697b8b2021-03-24 09:17:02 +01001210 opts, args = getopt.getopt(
1211 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
1212 )
tierno275411e2018-05-16 14:33:32 +02001213 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
1214 config_file = None
1215 for o, a in opts:
1216 if o in ("-h", "--help"):
1217 usage()
1218 sys.exit()
1219 elif o in ("-c", "--config"):
1220 config_file = a
tiernoa9843d82018-10-24 10:44:20 +02001221 elif o == "--health-check":
tierno94f06112020-02-11 12:38:19 +00001222 from osm_lcm.lcm_hc import health_check
garciadeblas5697b8b2021-03-24 09:17:02 +01001223
aticig56b86c22022-06-29 10:43:05 +03001224 health_check(config_file, Lcm.ping_interval_pace)
tierno275411e2018-05-16 14:33:32 +02001225 else:
Gabriel Cuba4c0e6802023-10-09 13:22:38 -05001226 print(f"Unhandled option: {o}")
1227 exit(1)
quilesj7e13aeb2019-10-08 13:34:55 +02001228
tierno275411e2018-05-16 14:33:32 +02001229 if config_file:
1230 if not path.isfile(config_file):
garciadeblas5697b8b2021-03-24 09:17:02 +01001231 print(
1232 "configuration file '{}' does not exist".format(config_file),
1233 file=sys.stderr,
1234 )
tierno275411e2018-05-16 14:33:32 +02001235 exit(1)
1236 else:
garciadeblas5697b8b2021-03-24 09:17:02 +01001237 for config_file in (
1238 __file__[: __file__.rfind(".")] + ".cfg",
1239 "./lcm.cfg",
1240 "/etc/osm/lcm.cfg",
1241 ):
tierno275411e2018-05-16 14:33:32 +02001242 if path.isfile(config_file):
1243 break
1244 else:
garciadeblas5697b8b2021-03-24 09:17:02 +01001245 print(
1246 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
1247 file=sys.stderr,
1248 )
tierno275411e2018-05-16 14:33:32 +02001249 exit(1)
k4.rahul44306072023-05-05 14:24:31 +05301250 config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file)))
tierno275411e2018-05-16 14:33:32 +02001251 lcm = Lcm(config_file)
Mark Beierl1addc932023-05-18 15:11:34 -04001252 asyncio.run(lcm.start())
tierno22f4f9c2018-06-11 18:53:39 +02001253 except (LcmException, getopt.GetoptError) as e:
tierno275411e2018-05-16 14:33:32 +02001254 print(str(e), file=sys.stderr)
1255 # usage()
1256 exit(1)