blob: 5f630b23811bcefc6a84aff7cda7212eb1de4bde [file] [log] [blame]
tiernoc0e42e22018-05-11 11:36:10 +02001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
tierno2e215512018-11-28 09:37:52 +00004##
5# Copyright 2018 Telefonica S.A.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18##
19
quilesj7e13aeb2019-10-08 13:34:55 +020020
21# DEBUG WITH PDB
22import os
23import pdb
24
tiernoc0e42e22018-05-11 11:36:10 +020025import asyncio
26import yaml
tierno275411e2018-05-16 14:33:32 +020027import logging
28import logging.handlers
29import getopt
tierno275411e2018-05-16 14:33:32 +020030import sys
tierno59d22d22018-09-25 18:10:19 +020031
bravof73bac502021-05-11 07:38:47 -040032from osm_lcm import ns, vim_sdn, netslice
tierno69f0d382020-05-07 13:08:09 +000033from osm_lcm.ng_ro import NgRoException, NgRoClient
34from osm_lcm.ROclient import ROClient, ROClientException
quilesj7e13aeb2019-10-08 13:34:55 +020035
tierno94f06112020-02-11 12:38:19 +000036from time import time
tierno8069ce52019-08-28 15:34:33 +000037from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
tiernoa4dea5a2020-01-05 16:29:30 +000038from osm_lcm import version as lcm_version, version_date as lcm_version_date
tierno8069ce52019-08-28 15:34:33 +000039
bravof922c4172020-11-24 21:21:43 -030040from osm_common import msglocal, msgkafka
tierno98768132018-09-11 12:07:21 +020041from osm_common import version as common_version
tierno59d22d22018-09-25 18:10:19 +020042from osm_common.dbbase import DbException
tiernoc0e42e22018-05-11 11:36:10 +020043from osm_common.fsbase import FsException
44from osm_common.msgbase import MsgException
bravof922c4172020-11-24 21:21:43 -030045from osm_lcm.data_utils.database.database import Database
46from osm_lcm.data_utils.filesystem.filesystem import Filesystem
aticig3abb0ba2022-06-29 10:43:05 +030047from osm_lcm.lcm_hc import get_health_check_file
tierno275411e2018-05-16 14:33:32 +020048from os import environ, path
tierno16427352019-04-22 11:37:36 +000049from random import choice as random_choice
tierno59d22d22018-09-25 18:10:19 +020050from n2vc import version as n2vc_version
bravof922c4172020-11-24 21:21:43 -030051import traceback
tiernoc0e42e22018-05-11 11:36:10 +020052
garciadeblas5697b8b2021-03-24 09:17:02 +010053if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
quilesj7e13aeb2019-10-08 13:34:55 +020054 pdb.set_trace()
55
tiernoc0e42e22018-05-11 11:36:10 +020056
tierno275411e2018-05-16 14:33:32 +020057__author__ = "Alfonso Tierno"
tiernoe64f7fb2019-09-11 08:55:52 +000058min_RO_version = "6.0.2"
tierno6e9d2eb2018-09-12 17:47:18 +020059min_n2vc_version = "0.0.2"
quilesj7e13aeb2019-10-08 13:34:55 +020060
tierno16427352019-04-22 11:37:36 +000061min_common_version = "0.1.19"
tierno275411e2018-05-16 14:33:32 +020062
63
tiernoc0e42e22018-05-11 11:36:10 +020064class Lcm:
65
garciadeblas5697b8b2021-03-24 09:17:02 +010066 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70 cfg_logger_name = {
71 "message": "lcm.msg",
72 "database": "lcm.db",
73 "storage": "lcm.fs",
74 "tsdb": "lcm.prometheus",
75 }
tierno991e95d2020-07-21 12:41:25 +000076 # ^ contains for each section at lcm.cfg the used logger name
tiernoa9843d82018-10-24 10:44:20 +020077
tierno59d22d22018-09-25 18:10:19 +020078 def __init__(self, config_file, loop=None):
tiernoc0e42e22018-05-11 11:36:10 +020079 """
80 Init, Connect to database, filesystem storage, and messaging
81 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
82 :return: None
83 """
tiernoc0e42e22018-05-11 11:36:10 +020084 self.db = None
85 self.msg = None
tierno16427352019-04-22 11:37:36 +000086 self.msg_admin = None
tiernoc0e42e22018-05-11 11:36:10 +020087 self.fs = None
88 self.pings_not_received = 1
tiernoc2564fe2019-01-28 16:18:56 +000089 self.consecutive_errors = 0
90 self.first_start = False
tiernoc0e42e22018-05-11 11:36:10 +020091
tiernoc0e42e22018-05-11 11:36:10 +020092 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +010093 self.logger = logging.getLogger("lcm")
tierno16427352019-04-22 11:37:36 +000094 # get id
95 self.worker_id = self.get_process_id()
tiernoc0e42e22018-05-11 11:36:10 +020096 # load configuration
97 config = self.read_config_file(config_file)
98 self.config = config
aticig3abb0ba2022-06-29 10:43:05 +030099 self.health_check_file = get_health_check_file(self.config)
tierno744303e2020-01-13 16:46:31 +0000100 self.config["ro_config"] = {
tierno69f0d382020-05-07 13:08:09 +0000101 "ng": config["RO"].get("ng", False),
102 "uri": config["RO"].get("uri"),
tierno750b2452018-05-17 16:39:29 +0200103 "tenant": config.get("tenant", "osm"),
tierno69f0d382020-05-07 13:08:09 +0000104 "logger_name": "lcm.roclient",
105 "loglevel": config["RO"].get("loglevel", "ERROR"),
tiernoc0e42e22018-05-11 11:36:10 +0200106 }
tierno69f0d382020-05-07 13:08:09 +0000107 if not self.config["ro_config"]["uri"]:
garciadeblas5697b8b2021-03-24 09:17:02 +0100108 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
109 config["RO"]["host"], config["RO"]["port"]
110 )
111 elif (
112 "/ro" in self.config["ro_config"]["uri"][-4:]
113 or "/openmano" in self.config["ro_config"]["uri"][-10:]
114 ):
tierno2357f4e2020-10-19 16:38:59 +0000115 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
116 index = self.config["ro_config"]["uri"][-1].rfind("/")
garciadeblas5697b8b2021-03-24 09:17:02 +0100117 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
tiernoc0e42e22018-05-11 11:36:10 +0200118
tierno59d22d22018-09-25 18:10:19 +0200119 self.loop = loop or asyncio.get_event_loop()
garciadeblas5697b8b2021-03-24 09:17:02 +0100120 self.ns = (
121 self.netslice
122 ) = (
123 self.vim
124 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
tiernoc0e42e22018-05-11 11:36:10 +0200125
126 # logging
garciadeblas5697b8b2021-03-24 09:17:02 +0100127 log_format_simple = (
128 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
129 )
130 log_formatter_simple = logging.Formatter(
131 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
132 )
tiernoc0e42e22018-05-11 11:36:10 +0200133 config["database"]["logger_name"] = "lcm.db"
134 config["storage"]["logger_name"] = "lcm.fs"
135 config["message"]["logger_name"] = "lcm.msg"
tierno86aa62f2018-08-20 11:57:04 +0000136 if config["global"].get("logfile"):
garciadeblas5697b8b2021-03-24 09:17:02 +0100137 file_handler = logging.handlers.RotatingFileHandler(
138 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
139 )
tiernoc0e42e22018-05-11 11:36:10 +0200140 file_handler.setFormatter(log_formatter_simple)
141 self.logger.addHandler(file_handler)
tierno86aa62f2018-08-20 11:57:04 +0000142 if not config["global"].get("nologging"):
tiernoc0e42e22018-05-11 11:36:10 +0200143 str_handler = logging.StreamHandler()
144 str_handler.setFormatter(log_formatter_simple)
145 self.logger.addHandler(str_handler)
146
147 if config["global"].get("loglevel"):
148 self.logger.setLevel(config["global"]["loglevel"])
149
150 # logging other modules
tierno991e95d2020-07-21 12:41:25 +0000151 for k1, logname in self.cfg_logger_name.items():
tiernoc0e42e22018-05-11 11:36:10 +0200152 config[k1]["logger_name"] = logname
153 logger_module = logging.getLogger(logname)
tierno86aa62f2018-08-20 11:57:04 +0000154 if config[k1].get("logfile"):
garciadeblas5697b8b2021-03-24 09:17:02 +0100155 file_handler = logging.handlers.RotatingFileHandler(
156 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
157 )
tiernoc0e42e22018-05-11 11:36:10 +0200158 file_handler.setFormatter(log_formatter_simple)
159 logger_module.addHandler(file_handler)
tierno86aa62f2018-08-20 11:57:04 +0000160 if config[k1].get("loglevel"):
tiernoc0e42e22018-05-11 11:36:10 +0200161 logger_module.setLevel(config[k1]["loglevel"])
garciadeblas5697b8b2021-03-24 09:17:02 +0100162 self.logger.critical(
163 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
164 )
tierno59d22d22018-09-25 18:10:19 +0200165
tiernoc0e42e22018-05-11 11:36:10 +0200166 # check version of N2VC
167 # TODO enhance with int conversion or from distutils.version import LooseVersion
168 # or with list(map(int, version.split(".")))
tierno59d22d22018-09-25 18:10:19 +0200169 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100170 raise LcmException(
171 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
172 n2vc_version, min_n2vc_version
173 )
174 )
tierno59d22d22018-09-25 18:10:19 +0200175 # check version of common
tierno27246d82018-09-27 15:59:09 +0200176 if versiontuple(common_version) < versiontuple(min_common_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100177 raise LcmException(
178 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
179 common_version, min_common_version
180 )
181 )
tierno22f4f9c2018-06-11 18:53:39 +0200182
tiernoc0e42e22018-05-11 11:36:10 +0200183 try:
bravof922c4172020-11-24 21:21:43 -0300184 self.db = Database(config).instance.db
tiernoc0e42e22018-05-11 11:36:10 +0200185
bravof922c4172020-11-24 21:21:43 -0300186 self.fs = Filesystem(config).instance.fs
sousaedu40365e82021-07-26 15:24:21 +0200187 self.fs.sync()
tiernoc0e42e22018-05-11 11:36:10 +0200188
quilesj7e13aeb2019-10-08 13:34:55 +0200189 # copy message configuration in order to remove 'group_id' for msg_admin
tiernoc2564fe2019-01-28 16:18:56 +0000190 config_message = config["message"].copy()
191 config_message["loop"] = self.loop
192 if config_message["driver"] == "local":
tiernoc0e42e22018-05-11 11:36:10 +0200193 self.msg = msglocal.MsgLocal()
tiernoc2564fe2019-01-28 16:18:56 +0000194 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000195 self.msg_admin = msglocal.MsgLocal()
196 config_message.pop("group_id", None)
197 self.msg_admin.connect(config_message)
tiernoc2564fe2019-01-28 16:18:56 +0000198 elif config_message["driver"] == "kafka":
tiernoc0e42e22018-05-11 11:36:10 +0200199 self.msg = msgkafka.MsgKafka()
tiernoc2564fe2019-01-28 16:18:56 +0000200 self.msg.connect(config_message)
tierno16427352019-04-22 11:37:36 +0000201 self.msg_admin = msgkafka.MsgKafka()
202 config_message.pop("group_id", None)
203 self.msg_admin.connect(config_message)
tiernoc0e42e22018-05-11 11:36:10 +0200204 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100205 raise LcmException(
206 "Invalid configuration param '{}' at '[message]':'driver'".format(
207 config["message"]["driver"]
208 )
209 )
tiernoc0e42e22018-05-11 11:36:10 +0200210 except (DbException, FsException, MsgException) as e:
211 self.logger.critical(str(e), exc_info=True)
212 raise LcmException(str(e))
213
kuused124bfe2019-06-18 12:09:24 +0200214 # contains created tasks/futures to be able to cancel
bravof922c4172020-11-24 21:21:43 -0300215 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
kuused124bfe2019-06-18 12:09:24 +0200216
tierno22f4f9c2018-06-11 18:53:39 +0200217 async def check_RO_version(self):
tiernoe64f7fb2019-09-11 08:55:52 +0000218 tries = 14
219 last_error = None
220 while True:
tierno2357f4e2020-10-19 16:38:59 +0000221 ro_uri = self.config["ro_config"]["uri"]
tiernoe64f7fb2019-09-11 08:55:52 +0000222 try:
tierno2357f4e2020-10-19 16:38:59 +0000223 # try new RO, if fail old RO
224 try:
225 self.config["ro_config"]["uri"] = ro_uri + "ro"
tierno69f0d382020-05-07 13:08:09 +0000226 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
tierno2357f4e2020-10-19 16:38:59 +0000227 ro_version = await ro_server.get_version()
228 self.config["ro_config"]["ng"] = True
229 except Exception:
230 self.config["ro_config"]["uri"] = ro_uri + "openmano"
tierno69f0d382020-05-07 13:08:09 +0000231 ro_server = ROClient(self.loop, **self.config["ro_config"])
tierno2357f4e2020-10-19 16:38:59 +0000232 ro_version = await ro_server.get_version()
233 self.config["ro_config"]["ng"] = False
tiernoe64f7fb2019-09-11 08:55:52 +0000234 if versiontuple(ro_version) < versiontuple(min_RO_version):
garciadeblas5697b8b2021-03-24 09:17:02 +0100235 raise LcmException(
236 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
237 ro_version, min_RO_version
238 )
239 )
240 self.logger.info(
241 "Connected to RO version {} new-generation version {}".format(
242 ro_version, self.config["ro_config"]["ng"]
243 )
244 )
tiernoe64f7fb2019-09-11 08:55:52 +0000245 return
tierno69f0d382020-05-07 13:08:09 +0000246 except (ROClientException, NgRoException) as e:
tierno2357f4e2020-10-19 16:38:59 +0000247 self.config["ro_config"]["uri"] = ro_uri
tiernoe64f7fb2019-09-11 08:55:52 +0000248 tries -= 1
bravof922c4172020-11-24 21:21:43 -0300249 traceback.print_tb(e.__traceback__)
garciadeblas5697b8b2021-03-24 09:17:02 +0100250 error_text = "Error while connecting to RO on {}: {}".format(
251 self.config["ro_config"]["uri"], e
252 )
tiernoe64f7fb2019-09-11 08:55:52 +0000253 if tries <= 0:
254 self.logger.critical(error_text)
255 raise LcmException(error_text)
256 if last_error != error_text:
257 last_error = error_text
garciadeblas5697b8b2021-03-24 09:17:02 +0100258 self.logger.error(
259 error_text + ". Waiting until {} seconds".format(5 * tries)
260 )
tiernoe64f7fb2019-09-11 08:55:52 +0000261 await asyncio.sleep(5)
tierno22f4f9c2018-06-11 18:53:39 +0200262
tiernoc0e42e22018-05-11 11:36:10 +0200263 async def test(self, param=None):
264 self.logger.debug("Starting/Ending test task: {}".format(param))
265
tiernoc0e42e22018-05-11 11:36:10 +0200266 async def kafka_ping(self):
267 self.logger.debug("Task kafka_ping Enter")
268 consecutive_errors = 0
269 first_start = True
270 kafka_has_received = False
271 self.pings_not_received = 1
272 while True:
273 try:
tierno16427352019-04-22 11:37:36 +0000274 await self.msg_admin.aiowrite(
garciadeblas5697b8b2021-03-24 09:17:02 +0100275 "admin",
276 "ping",
277 {
278 "from": "lcm",
279 "to": "lcm",
280 "worker_id": self.worker_id,
281 "version": lcm_version,
282 },
283 self.loop,
284 )
tiernoc0e42e22018-05-11 11:36:10 +0200285 # time between pings are low when it is not received and at starting
garciadeblas5697b8b2021-03-24 09:17:02 +0100286 wait_time = (
287 self.ping_interval_boot
288 if not kafka_has_received
289 else self.ping_interval_pace
290 )
tiernoc0e42e22018-05-11 11:36:10 +0200291 if not self.pings_not_received:
292 kafka_has_received = True
293 self.pings_not_received += 1
294 await asyncio.sleep(wait_time, loop=self.loop)
295 if self.pings_not_received > 10:
296 raise LcmException("It is not receiving pings from Kafka bus")
297 consecutive_errors = 0
298 first_start = False
299 except LcmException:
300 raise
301 except Exception as e:
302 # if not first_start is the first time after starting. So leave more time and wait
303 # to allow kafka starts
304 if consecutive_errors == 8 if not first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100305 self.logger.error(
306 "Task kafka_read task exit error too many errors. Exception: {}".format(
307 e
308 )
309 )
tiernoc0e42e22018-05-11 11:36:10 +0200310 raise
311 consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100312 self.logger.error(
313 "Task kafka_read retrying after Exception {}".format(e)
314 )
tierno16427352019-04-22 11:37:36 +0000315 wait_time = 2 if not first_start else 5
tiernoc0e42e22018-05-11 11:36:10 +0200316 await asyncio.sleep(wait_time, loop=self.loop)
317
gcalvinoed7f6d42018-12-14 14:44:56 +0100318 def kafka_read_callback(self, topic, command, params):
319 order_id = 1
320
321 if topic != "admin" and command != "ping":
garciadeblas5697b8b2021-03-24 09:17:02 +0100322 self.logger.debug(
323 "Task kafka_read receives {} {}: {}".format(topic, command, params)
324 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100325 self.consecutive_errors = 0
326 self.first_start = False
327 order_id += 1
328 if command == "exit":
329 raise LcmExceptionExit
330 elif command.startswith("#"):
331 return
332 elif command == "echo":
333 # just for test
334 print(params)
335 sys.stdout.flush()
336 return
337 elif command == "test":
338 asyncio.Task(self.test(params), loop=self.loop)
339 return
340
341 if topic == "admin":
342 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
tierno16427352019-04-22 11:37:36 +0000343 if params.get("worker_id") != self.worker_id:
344 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100345 self.pings_not_received = 0
tierno3e359b12019-02-03 02:29:13 +0100346 try:
aticig3abb0ba2022-06-29 10:43:05 +0300347 with open(self.health_check_file, "w") as f:
tierno3e359b12019-02-03 02:29:13 +0100348 f.write(str(time()))
349 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100350 self.logger.error(
351 "Cannot write into '{}' for healthcheck: {}".format(
aticig3abb0ba2022-06-29 10:43:05 +0300352 self.health_check_file, e
garciadeblas5697b8b2021-03-24 09:17:02 +0100353 )
354 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100355 return
magnussonle9198bb2020-01-21 13:00:51 +0100356 elif topic == "pla":
357 if command == "placement":
358 self.ns.update_nsrs_with_pla_result(params)
359 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100360 elif topic == "k8scluster":
361 if command == "create" or command == "created":
362 k8scluster_id = params.get("_id")
363 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100364 self.lcm_tasks.register(
365 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
366 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100367 return
368 elif command == "delete" or command == "deleted":
369 k8scluster_id = params.get("_id")
370 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100371 self.lcm_tasks.register(
372 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
373 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100374 return
David Garciac1fe90a2021-03-31 19:12:02 +0200375 elif topic == "vca":
376 if command == "create" or command == "created":
377 vca_id = params.get("_id")
378 task = asyncio.ensure_future(self.vca.create(params, order_id))
379 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
380 return
381 elif command == "delete" or command == "deleted":
382 vca_id = params.get("_id")
383 task = asyncio.ensure_future(self.vca.delete(params, order_id))
384 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
385 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100386 elif topic == "k8srepo":
387 if command == "create" or command == "created":
388 k8srepo_id = params.get("_id")
389 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
390 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100391 self.lcm_tasks.register(
392 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
393 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100394 return
395 elif command == "delete" or command == "deleted":
396 k8srepo_id = params.get("_id")
397 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100398 self.lcm_tasks.register(
399 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
400 )
calvinosanch9f9c6f22019-11-04 13:37:39 +0100401 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100402 elif topic == "ns":
tierno307425f2020-01-26 23:35:59 +0000403 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100404 # self.logger.debug("Deploying NS {}".format(nsr_id))
405 nslcmop = params
406 nslcmop_id = nslcmop["_id"]
407 nsr_id = nslcmop["nsInstanceId"]
408 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100409 self.lcm_tasks.register(
410 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
411 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100412 return
tierno307425f2020-01-26 23:35:59 +0000413 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100414 # self.logger.debug("Deleting NS {}".format(nsr_id))
415 nslcmop = params
416 nslcmop_id = nslcmop["_id"]
417 nsr_id = nslcmop["nsInstanceId"]
418 self.lcm_tasks.cancel(topic, nsr_id)
419 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
420 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
421 return
ksaikiranr3fde2c72021-03-15 10:39:06 +0530422 elif command == "vca_status_refresh":
423 nslcmop = params
424 nslcmop_id = nslcmop["_id"]
425 nsr_id = nslcmop["nsInstanceId"]
garciadeblas5697b8b2021-03-24 09:17:02 +0100426 task = asyncio.ensure_future(
427 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
428 )
429 self.lcm_tasks.register(
430 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
431 )
ksaikiranr3fde2c72021-03-15 10:39:06 +0530432 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100433 elif command == "action":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
440 return
aticigdffa6212022-04-12 15:27:53 +0300441 elif command == "update":
442 # self.logger.debug("Update NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
448 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100449 elif command == "scale":
450 # self.logger.debug("Update NS {}".format(nsr_id))
451 nslcmop = params
452 nslcmop_id = nslcmop["_id"]
453 nsr_id = nslcmop["nsInstanceId"]
454 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
455 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
456 return
garciadeblas07f4e4c2022-06-09 09:42:58 +0200457 elif command == "heal":
458 # self.logger.debug("Healing NS {}".format(nsr_id))
459 nslcmop = params
460 nslcmop_id = nslcmop["_id"]
461 nsr_id = nslcmop["nsInstanceId"]
462 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
463 self.lcm_tasks.register(
464 "ns", nsr_id, nslcmop_id, "ns_heal", task
465 )
466 return
elumalai80bcf1c2022-04-28 18:05:01 +0530467 elif command == "migrate":
468 nslcmop = params
469 nslcmop_id = nslcmop["_id"]
470 nsr_id = nslcmop["nsInstanceId"]
471 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
472 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
473 return
govindarajul4ff4b512022-05-02 20:02:41 +0530474 elif command == "verticalscale":
475 nslcmop = params
476 nslcmop_id = nslcmop["_id"]
477 nsr_id = nslcmop["nsInstanceId"]
478 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
479 self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task))
480 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task)
481 self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task))
482 return
gcalvinoed7f6d42018-12-14 14:44:56 +0100483 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000484 nsr_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100485 try:
486 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100487 print(
488 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
489 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
490 "".format(
491 nsr_id,
492 db_nsr["operational-status"],
493 db_nsr["config-status"],
494 db_nsr["detailed-status"],
495 db_nsr["_admin"]["deployed"],
496 self.lcm_ns_tasks.get(nsr_id),
497 )
498 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100499 except Exception as e:
500 print("nsr {} not found: {}".format(nsr_id, e))
501 sys.stdout.flush()
502 return
503 elif command == "deleted":
504 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100505 elif command in (
elumalaica7ece02022-04-12 12:47:32 +0530506 "vnf_terminated",
elumalaib9e357c2022-04-27 09:58:38 +0530507 "policy_updated",
garciadeblas5697b8b2021-03-24 09:17:02 +0100508 "terminated",
509 "instantiated",
510 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200511 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100512 "actioned",
aticigdffa6212022-04-12 15:27:53 +0300513 "updated",
elumalai80bcf1c2022-04-28 18:05:01 +0530514 "migrated",
govindarajul4ff4b512022-05-02 20:02:41 +0530515 "verticalscaled",
garciadeblas5697b8b2021-03-24 09:17:02 +0100516 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100517 return
elumalaica7ece02022-04-12 12:47:32 +0530518
gcalvinoed7f6d42018-12-14 14:44:56 +0100519 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
tierno307425f2020-01-26 23:35:59 +0000520 if command == "instantiate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100521 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
522 nsilcmop = params
523 nsilcmop_id = nsilcmop["_id"] # slice operation id
524 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
garciadeblas5697b8b2021-03-24 09:17:02 +0100525 task = asyncio.ensure_future(
526 self.netslice.instantiate(nsir_id, nsilcmop_id)
527 )
528 self.lcm_tasks.register(
529 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
530 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100531 return
tierno307425f2020-01-26 23:35:59 +0000532 elif command == "terminate":
gcalvinoed7f6d42018-12-14 14:44:56 +0100533 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
534 nsilcmop = params
535 nsilcmop_id = nsilcmop["_id"] # slice operation id
536 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
537 self.lcm_tasks.cancel(topic, nsir_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100538 task = asyncio.ensure_future(
539 self.netslice.terminate(nsir_id, nsilcmop_id)
540 )
541 self.lcm_tasks.register(
542 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
543 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100544 return
545 elif command == "show":
tiernoc2564fe2019-01-28 16:18:56 +0000546 nsir_id = params
gcalvinoed7f6d42018-12-14 14:44:56 +0100547 try:
548 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
garciadeblas5697b8b2021-03-24 09:17:02 +0100549 print(
550 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
551 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
552 "".format(
553 nsir_id,
554 db_nsir["operational-status"],
555 db_nsir["config-status"],
556 db_nsir["detailed-status"],
557 db_nsir["_admin"]["deployed"],
558 self.lcm_netslice_tasks.get(nsir_id),
559 )
560 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100561 except Exception as e:
562 print("nsir {} not found: {}".format(nsir_id, e))
563 sys.stdout.flush()
564 return
565 elif command == "deleted":
566 return # TODO cleaning of task just in case should be done
garciadeblas5697b8b2021-03-24 09:17:02 +0100567 elif command in (
568 "terminated",
569 "instantiated",
570 "scaled",
garciadeblas07f4e4c2022-06-09 09:42:58 +0200571 "healed",
garciadeblas5697b8b2021-03-24 09:17:02 +0100572 "actioned",
573 ): # "scaled-cooldown-time"
gcalvinoed7f6d42018-12-14 14:44:56 +0100574 return
575 elif topic == "vim_account":
576 vim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000577 if command in ("create", "created"):
tierno2357f4e2020-10-19 16:38:59 +0000578 if not self.config["ro_config"].get("ng"):
579 task = asyncio.ensure_future(self.vim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100580 self.lcm_tasks.register(
581 "vim_account", vim_id, order_id, "vim_create", task
582 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100583 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100584 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100585 self.lcm_tasks.cancel(topic, vim_id)
kuuse6a470c62019-07-10 13:52:45 +0200586 task = asyncio.ensure_future(self.vim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100587 self.lcm_tasks.register(
588 "vim_account", vim_id, order_id, "vim_delete", task
589 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100590 return
591 elif command == "show":
592 print("not implemented show with vim_account")
593 sys.stdout.flush()
594 return
tiernof210c1c2019-10-16 09:09:58 +0000595 elif command in ("edit", "edited"):
tierno2357f4e2020-10-19 16:38:59 +0000596 if not self.config["ro_config"].get("ng"):
597 task = asyncio.ensure_future(self.vim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100598 self.lcm_tasks.register(
599 "vim_account", vim_id, order_id, "vim_edit", task
600 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100601 return
tiernof210c1c2019-10-16 09:09:58 +0000602 elif command == "deleted":
603 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100604 elif topic == "wim_account":
605 wim_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000606 if command in ("create", "created"):
tierno2357f4e2020-10-19 16:38:59 +0000607 if not self.config["ro_config"].get("ng"):
608 task = asyncio.ensure_future(self.wim.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100609 self.lcm_tasks.register(
610 "wim_account", wim_id, order_id, "wim_create", task
611 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100612 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100613 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100614 self.lcm_tasks.cancel(topic, wim_id)
kuuse6a470c62019-07-10 13:52:45 +0200615 task = asyncio.ensure_future(self.wim.delete(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100616 self.lcm_tasks.register(
617 "wim_account", wim_id, order_id, "wim_delete", task
618 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100619 return
620 elif command == "show":
621 print("not implemented show with wim_account")
622 sys.stdout.flush()
623 return
tiernof210c1c2019-10-16 09:09:58 +0000624 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100625 task = asyncio.ensure_future(self.wim.edit(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100626 self.lcm_tasks.register(
627 "wim_account", wim_id, order_id, "wim_edit", task
628 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100629 return
tiernof210c1c2019-10-16 09:09:58 +0000630 elif command == "deleted":
631 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100632 elif topic == "sdn":
633 _sdn_id = params["_id"]
tiernof210c1c2019-10-16 09:09:58 +0000634 if command in ("create", "created"):
tierno2357f4e2020-10-19 16:38:59 +0000635 if not self.config["ro_config"].get("ng"):
636 task = asyncio.ensure_future(self.sdn.create(params, order_id))
garciadeblas5697b8b2021-03-24 09:17:02 +0100637 self.lcm_tasks.register(
638 "sdn", _sdn_id, order_id, "sdn_create", task
639 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100640 return
calvinosanch9f9c6f22019-11-04 13:37:39 +0100641 elif command == "delete" or command == "deleted":
gcalvinoed7f6d42018-12-14 14:44:56 +0100642 self.lcm_tasks.cancel(topic, _sdn_id)
kuuse6a470c62019-07-10 13:52:45 +0200643 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
gcalvinoed7f6d42018-12-14 14:44:56 +0100644 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
645 return
tiernof210c1c2019-10-16 09:09:58 +0000646 elif command in ("edit", "edited"):
gcalvinoed7f6d42018-12-14 14:44:56 +0100647 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
648 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
649 return
tiernof210c1c2019-10-16 09:09:58 +0000650 elif command == "deleted":
651 return # TODO cleaning of task just in case should be done
gcalvinoed7f6d42018-12-14 14:44:56 +0100652 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
653
tiernoc0e42e22018-05-11 11:36:10 +0200654 async def kafka_read(self):
garciadeblas5697b8b2021-03-24 09:17:02 +0100655 self.logger.debug(
656 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
657 )
tiernoc0e42e22018-05-11 11:36:10 +0200658 # future = asyncio.Future()
gcalvinoed7f6d42018-12-14 14:44:56 +0100659 self.consecutive_errors = 0
660 self.first_start = True
661 while self.consecutive_errors < 10:
tiernoc0e42e22018-05-11 11:36:10 +0200662 try:
garciadeblas5697b8b2021-03-24 09:17:02 +0100663 topics = (
664 "ns",
665 "vim_account",
666 "wim_account",
667 "sdn",
668 "nsi",
669 "k8scluster",
670 "vca",
671 "k8srepo",
672 "pla",
673 )
674 topics_admin = ("admin",)
tierno16427352019-04-22 11:37:36 +0000675 await asyncio.gather(
garciadeblas5697b8b2021-03-24 09:17:02 +0100676 self.msg.aioread(
677 topics, self.loop, self.kafka_read_callback, from_beginning=True
678 ),
679 self.msg_admin.aioread(
680 topics_admin,
681 self.loop,
682 self.kafka_read_callback,
683 group_id=False,
684 ),
tierno16427352019-04-22 11:37:36 +0000685 )
tiernoc0e42e22018-05-11 11:36:10 +0200686
gcalvinoed7f6d42018-12-14 14:44:56 +0100687 except LcmExceptionExit:
688 self.logger.debug("Bye!")
689 break
tiernoc0e42e22018-05-11 11:36:10 +0200690 except Exception as e:
691 # if not first_start is the first time after starting. So leave more time and wait
692 # to allow kafka starts
gcalvinoed7f6d42018-12-14 14:44:56 +0100693 if self.consecutive_errors == 8 if not self.first_start else 30:
garciadeblas5697b8b2021-03-24 09:17:02 +0100694 self.logger.error(
695 "Task kafka_read task exit error too many errors. Exception: {}".format(
696 e
697 )
698 )
tiernoc0e42e22018-05-11 11:36:10 +0200699 raise
gcalvinoed7f6d42018-12-14 14:44:56 +0100700 self.consecutive_errors += 1
garciadeblas5697b8b2021-03-24 09:17:02 +0100701 self.logger.error(
702 "Task kafka_read retrying after Exception {}".format(e)
703 )
gcalvinoed7f6d42018-12-14 14:44:56 +0100704 wait_time = 2 if not self.first_start else 5
tiernoc0e42e22018-05-11 11:36:10 +0200705 await asyncio.sleep(wait_time, loop=self.loop)
706
707 # self.logger.debug("Task kafka_read terminating")
708 self.logger.debug("Task kafka_read exit")
709
710 def start(self):
tierno22f4f9c2018-06-11 18:53:39 +0200711
712 # check RO version
713 self.loop.run_until_complete(self.check_RO_version())
714
aticig15db6142022-01-24 12:51:26 +0300715 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
garciadeblas5697b8b2021-03-24 09:17:02 +0100716 self.netslice = netslice.NetsliceLcm(
717 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
718 )
bravof922c4172020-11-24 21:21:43 -0300719 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
720 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
721 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
garciadeblas5697b8b2021-03-24 09:17:02 +0100722 self.k8scluster = vim_sdn.K8sClusterLcm(
723 self.msg, self.lcm_tasks, self.config, self.loop
724 )
David Garciac1fe90a2021-03-31 19:12:02 +0200725 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
garciadeblas5697b8b2021-03-24 09:17:02 +0100726 self.k8srepo = vim_sdn.K8sRepoLcm(
727 self.msg, self.lcm_tasks, self.config, self.loop
728 )
tierno2357f4e2020-10-19 16:38:59 +0000729
garciadeblas5697b8b2021-03-24 09:17:02 +0100730 self.loop.run_until_complete(
731 asyncio.gather(self.kafka_read(), self.kafka_ping())
732 )
bravof73bac502021-05-11 07:38:47 -0400733
tiernoc0e42e22018-05-11 11:36:10 +0200734 # TODO
735 # self.logger.debug("Terminating cancelling creation tasks")
tiernoca2e16a2018-06-29 15:25:24 +0200736 # self.lcm_tasks.cancel("ALL", "create")
tiernoc0e42e22018-05-11 11:36:10 +0200737 # timeout = 200
738 # while self.is_pending_tasks():
739 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
740 # await asyncio.sleep(2, loop=self.loop)
741 # timeout -= 2
742 # if not timeout:
tiernoca2e16a2018-06-29 15:25:24 +0200743 # self.lcm_tasks.cancel("ALL", "ALL")
tiernoc0e42e22018-05-11 11:36:10 +0200744 self.loop.close()
745 self.loop = None
746 if self.db:
747 self.db.db_disconnect()
748 if self.msg:
749 self.msg.disconnect()
tierno16427352019-04-22 11:37:36 +0000750 if self.msg_admin:
751 self.msg_admin.disconnect()
tiernoc0e42e22018-05-11 11:36:10 +0200752 if self.fs:
753 self.fs.fs_disconnect()
754
tiernoc0e42e22018-05-11 11:36:10 +0200755 def read_config_file(self, config_file):
756 # TODO make a [ini] + yaml inside parser
757 # the configparser library is not suitable, because it does not admit comments at the end of line,
758 # and not parse integer or boolean
759 try:
tierno744303e2020-01-13 16:46:31 +0000760 # read file as yaml format
tiernoc0e42e22018-05-11 11:36:10 +0200761 with open(config_file) as f:
tiernoda6fb102019-11-23 00:36:52 +0000762 conf = yaml.load(f, Loader=yaml.Loader)
tierno744303e2020-01-13 16:46:31 +0000763 # Ensure all sections are not empty
garciadeblas5697b8b2021-03-24 09:17:02 +0100764 for k in (
765 "global",
766 "timeout",
767 "RO",
768 "VCA",
769 "database",
770 "storage",
771 "message",
772 ):
tierno744303e2020-01-13 16:46:31 +0000773 if not conf.get(k):
774 conf[k] = {}
775
776 # read all environ that starts with OSMLCM_
tiernoc0e42e22018-05-11 11:36:10 +0200777 for k, v in environ.items():
778 if not k.startswith("OSMLCM_"):
779 continue
tierno744303e2020-01-13 16:46:31 +0000780 subject, _, item = k[7:].lower().partition("_")
781 if not item:
tierno17a612f2018-10-23 11:30:42 +0200782 continue
tierno744303e2020-01-13 16:46:31 +0000783 if subject in ("ro", "vca"):
tierno17a612f2018-10-23 11:30:42 +0200784 # put in capital letter
tierno744303e2020-01-13 16:46:31 +0000785 subject = subject.upper()
tiernoc0e42e22018-05-11 11:36:10 +0200786 try:
tierno744303e2020-01-13 16:46:31 +0000787 if item == "port" or subject == "timeout":
788 conf[subject][item] = int(v)
tiernoc0e42e22018-05-11 11:36:10 +0200789 else:
tierno744303e2020-01-13 16:46:31 +0000790 conf[subject][item] = v
tiernoc0e42e22018-05-11 11:36:10 +0200791 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100792 self.logger.warning(
793 "skipping environ '{}' on exception '{}'".format(k, e)
794 )
tierno744303e2020-01-13 16:46:31 +0000795
796 # backward compatibility of VCA parameters
797
garciadeblas5697b8b2021-03-24 09:17:02 +0100798 if "pubkey" in conf["VCA"]:
799 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
800 if "cacert" in conf["VCA"]:
801 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
802 if "apiproxy" in conf["VCA"]:
803 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
tierno744303e2020-01-13 16:46:31 +0000804
garciadeblas5697b8b2021-03-24 09:17:02 +0100805 if "enableosupgrade" in conf["VCA"]:
806 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
807 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
808 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
809 conf["VCA"]["enable_os_upgrade"] = False
810 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
811 conf["VCA"]["enable_os_upgrade"] = True
tierno744303e2020-01-13 16:46:31 +0000812
garciadeblas5697b8b2021-03-24 09:17:02 +0100813 if "aptmirror" in conf["VCA"]:
814 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
tiernoc0e42e22018-05-11 11:36:10 +0200815
816 return conf
817 except Exception as e:
818 self.logger.critical("At config file '{}': {}".format(config_file, e))
819 exit(1)
820
tierno16427352019-04-22 11:37:36 +0000821 @staticmethod
822 def get_process_id():
823 """
824 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
825 will provide a random one
826 :return: Obtained ID
827 """
828 # Try getting docker id. If fails, get pid
829 try:
830 with open("/proc/self/cgroup", "r") as f:
831 text_id_ = f.readline()
832 _, _, text_id = text_id_.rpartition("/")
garciadeblas5697b8b2021-03-24 09:17:02 +0100833 text_id = text_id.replace("\n", "")[:12]
tierno16427352019-04-22 11:37:36 +0000834 if text_id:
835 return text_id
836 except Exception:
837 pass
838 # Return a random id
garciadeblas5697b8b2021-03-24 09:17:02 +0100839 return "".join(random_choice("0123456789abcdef") for _ in range(12))
tierno16427352019-04-22 11:37:36 +0000840
tiernoc0e42e22018-05-11 11:36:10 +0200841
tierno275411e2018-05-16 14:33:32 +0200842def usage():
garciadeblas5697b8b2021-03-24 09:17:02 +0100843 print(
844 """Usage: {} [options]
quilesj7e13aeb2019-10-08 13:34:55 +0200845 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
tiernoa9843d82018-10-24 10:44:20 +0200846 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
tierno275411e2018-05-16 14:33:32 +0200847 -h|--help: shows this help
garciadeblas5697b8b2021-03-24 09:17:02 +0100848 """.format(
849 sys.argv[0]
850 )
851 )
tierno750b2452018-05-17 16:39:29 +0200852 # --log-socket-host HOST: send logs to this host")
853 # --log-socket-port PORT: send logs using this port (default: 9022)")
tierno275411e2018-05-16 14:33:32 +0200854
855
garciadeblas5697b8b2021-03-24 09:17:02 +0100856if __name__ == "__main__":
quilesj7e13aeb2019-10-08 13:34:55 +0200857
tierno275411e2018-05-16 14:33:32 +0200858 try:
tierno8c16b052020-02-05 15:08:32 +0000859 # print("SYS.PATH='{}'".format(sys.path))
tierno275411e2018-05-16 14:33:32 +0200860 # load parameters and configuration
quilesj7e13aeb2019-10-08 13:34:55 +0200861 # -h
862 # -c value
863 # --config value
864 # --help
865 # --health-check
garciadeblas5697b8b2021-03-24 09:17:02 +0100866 opts, args = getopt.getopt(
867 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
868 )
tierno275411e2018-05-16 14:33:32 +0200869 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
870 config_file = None
871 for o, a in opts:
872 if o in ("-h", "--help"):
873 usage()
874 sys.exit()
875 elif o in ("-c", "--config"):
876 config_file = a
tiernoa9843d82018-10-24 10:44:20 +0200877 elif o == "--health-check":
tierno94f06112020-02-11 12:38:19 +0000878 from osm_lcm.lcm_hc import health_check
garciadeblas5697b8b2021-03-24 09:17:02 +0100879
aticig3abb0ba2022-06-29 10:43:05 +0300880 health_check(config_file, Lcm.ping_interval_pace)
tierno275411e2018-05-16 14:33:32 +0200881 # elif o == "--log-socket-port":
882 # log_socket_port = a
883 # elif o == "--log-socket-host":
884 # log_socket_host = a
885 # elif o == "--log-file":
886 # log_file = a
887 else:
888 assert False, "Unhandled option"
quilesj7e13aeb2019-10-08 13:34:55 +0200889
tierno275411e2018-05-16 14:33:32 +0200890 if config_file:
891 if not path.isfile(config_file):
garciadeblas5697b8b2021-03-24 09:17:02 +0100892 print(
893 "configuration file '{}' does not exist".format(config_file),
894 file=sys.stderr,
895 )
tierno275411e2018-05-16 14:33:32 +0200896 exit(1)
897 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100898 for config_file in (
899 __file__[: __file__.rfind(".")] + ".cfg",
900 "./lcm.cfg",
901 "/etc/osm/lcm.cfg",
902 ):
tierno275411e2018-05-16 14:33:32 +0200903 if path.isfile(config_file):
904 break
905 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100906 print(
907 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
908 file=sys.stderr,
909 )
tierno275411e2018-05-16 14:33:32 +0200910 exit(1)
911 lcm = Lcm(config_file)
tierno3e359b12019-02-03 02:29:13 +0100912 lcm.start()
tierno22f4f9c2018-06-11 18:53:39 +0200913 except (LcmException, getopt.GetoptError) as e:
tierno275411e2018-05-16 14:33:32 +0200914 print(str(e), file=sys.stderr)
915 # usage()
916 exit(1)