Ubuntu 22.04 and Python 3.10 preparation
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path
50 from random import choice as random_choice
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70
71 main_config = LcmCfg()
72
73 def __init__(self, config_file):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 self.db = None
80 self.msg = None
81 self.msg_admin = None
82 self.fs = None
83 self.pings_not_received = 1
84 self.consecutive_errors = 0
85 self.first_start = False
86
87 # logging
88 self.logger = logging.getLogger("lcm")
89 # get id
90 self.worker_id = self.get_process_id()
91 # load configuration
92 config = self.read_config_file(config_file)
93 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
99 self.ns = (
100 self.netslice
101 ) = (
102 self.vim
103 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
104
105 # logging
106 log_format_simple = (
107 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 )
109 log_formatter_simple = logging.Formatter(
110 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
111 )
112 if self.main_config.globalConfig.logfile:
113 file_handler = logging.handlers.RotatingFileHandler(
114 self.main_config.globalConfig.logfile,
115 maxBytes=100e6,
116 backupCount=9,
117 delay=0,
118 )
119 file_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(file_handler)
121 if not self.main_config.globalConfig.to_dict()["nologging"]:
122 str_handler = logging.StreamHandler()
123 str_handler.setFormatter(log_formatter_simple)
124 self.logger.addHandler(str_handler)
125
126 if self.main_config.globalConfig.to_dict()["loglevel"]:
127 self.logger.setLevel(self.main_config.globalConfig.loglevel)
128
129 # logging other modules
130 for logger in ("message", "database", "storage", "tsdb"):
131 logger_config = self.main_config.to_dict()[logger]
132 logger_module = logging.getLogger(logger_config["logger_name"])
133 if logger_config["logfile"]:
134 file_handler = logging.handlers.RotatingFileHandler(
135 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
136 )
137 file_handler.setFormatter(log_formatter_simple)
138 logger_module.addHandler(file_handler)
139 if logger_config["loglevel"]:
140 logger_module.setLevel(logger_config["loglevel"])
141 self.logger.critical(
142 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
143 )
144
145 # check version of N2VC
146 # TODO enhance with int conversion or from distutils.version import LooseVersion
147 # or with list(map(int, version.split(".")))
148 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
149 raise LcmException(
150 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
151 n2vc_version, min_n2vc_version
152 )
153 )
154 # check version of common
155 if versiontuple(common_version) < versiontuple(min_common_version):
156 raise LcmException(
157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
158 common_version, min_common_version
159 )
160 )
161
162 try:
163 self.db = Database(self.main_config.to_dict()).instance.db
164
165 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
166 self.fs.sync()
167
168 # copy message configuration in order to remove 'group_id' for msg_admin
169 config_message = self.main_config.message.to_dict()
170 config_message["loop"] = asyncio.get_event_loop()
171 if config_message["driver"] == "local":
172 self.msg = msglocal.MsgLocal()
173 self.msg.connect(config_message)
174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 elif config_message["driver"] == "kafka":
178 self.msg = msgkafka.MsgKafka()
179 self.msg.connect(config_message)
180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
183 else:
184 raise LcmException(
185 "Invalid configuration param '{}' at '[message]':'driver'".format(
186 self.main_config.message.driver
187 )
188 )
189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
193 # contains created tasks/futures to be able to cancel
194 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
195
196 async def check_RO_version(self):
197 tries = 14
198 last_error = None
199 while True:
200 ro_uri = self.main_config.RO.uri
201 if not ro_uri:
202 ro_uri = ""
203 try:
204 # try new RO, if fail old RO
205 try:
206 self.main_config.RO.uri = ro_uri + "ro"
207 ro_server = NgRoClient(**self.main_config.RO.to_dict())
208 ro_version = await ro_server.get_version()
209 self.main_config.RO.ng = True
210 except Exception:
211 self.main_config.RO.uri = ro_uri + "openmano"
212 ro_server = ROClient(**self.main_config.RO.to_dict())
213 ro_version = await ro_server.get_version()
214 self.main_config.RO.ng = False
215 if versiontuple(ro_version) < versiontuple(min_RO_version):
216 raise LcmException(
217 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version
219 )
220 )
221 self.logger.info(
222 "Connected to RO version {} new-generation version {}".format(
223 ro_version, self.main_config.RO.ng
224 )
225 )
226 return
227 except (ROClientException, NgRoException) as e:
228 self.main_config.RO.uri = ro_uri
229 tries -= 1
230 traceback.print_tb(e.__traceback__)
231 error_text = "Error while connecting to RO on {}: {}".format(
232 self.main_config.RO.uri, e
233 )
234 if tries <= 0:
235 self.logger.critical(error_text)
236 raise LcmException(error_text)
237 if last_error != error_text:
238 last_error = error_text
239 self.logger.error(
240 error_text + ". Waiting until {} seconds".format(5 * tries)
241 )
242 await asyncio.sleep(5)
243
244 async def test(self, param=None):
245 self.logger.debug("Starting/Ending test task: {}".format(param))
246
247 async def kafka_ping(self):
248 self.logger.debug("Task kafka_ping Enter")
249 consecutive_errors = 0
250 first_start = True
251 kafka_has_received = False
252 self.pings_not_received = 1
253 while True:
254 try:
255 await self.msg_admin.aiowrite(
256 "admin",
257 "ping",
258 {
259 "from": "lcm",
260 "to": "lcm",
261 "worker_id": self.worker_id,
262 "version": lcm_version,
263 },
264 )
265 # time between pings are low when it is not received and at starting
266 wait_time = (
267 self.ping_interval_boot
268 if not kafka_has_received
269 else self.ping_interval_pace
270 )
271 if not self.pings_not_received:
272 kafka_has_received = True
273 self.pings_not_received += 1
274 await asyncio.sleep(wait_time)
275 if self.pings_not_received > 10:
276 raise LcmException("It is not receiving pings from Kafka bus")
277 consecutive_errors = 0
278 first_start = False
279 except LcmException:
280 raise
281 except Exception as e:
282 # if not first_start is the first time after starting. So leave more time and wait
283 # to allow kafka starts
284 if consecutive_errors == 8 if not first_start else 30:
285 self.logger.error(
286 "Task kafka_read task exit error too many errors. Exception: {}".format(
287 e
288 )
289 )
290 raise
291 consecutive_errors += 1
292 self.logger.error(
293 "Task kafka_read retrying after Exception {}".format(e)
294 )
295 wait_time = 2 if not first_start else 5
296 await asyncio.sleep(wait_time)
297
298 def kafka_read_callback(self, topic, command, params):
299 order_id = 1
300
301 if topic != "admin" and command != "ping":
302 self.logger.debug(
303 "Task kafka_read receives {} {}: {}".format(topic, command, params)
304 )
305 self.consecutive_errors = 0
306 self.first_start = False
307 order_id += 1
308 if command == "exit":
309 raise LcmExceptionExit
310 elif command.startswith("#"):
311 return
312 elif command == "echo":
313 # just for test
314 print(params)
315 sys.stdout.flush()
316 return
317 elif command == "test":
318 asyncio.Task(self.test(params))
319 return
320
321 if topic == "admin":
322 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
323 if params.get("worker_id") != self.worker_id:
324 return
325 self.pings_not_received = 0
326 try:
327 with open(self.health_check_file, "w") as f:
328 f.write(str(time()))
329 except Exception as e:
330 self.logger.error(
331 "Cannot write into '{}' for healthcheck: {}".format(
332 self.health_check_file, e
333 )
334 )
335 return
336 elif topic == "pla":
337 if command == "placement":
338 self.ns.update_nsrs_with_pla_result(params)
339 return
340 elif topic == "k8scluster":
341 if command == "create" or command == "created":
342 k8scluster_id = params.get("_id")
343 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
344 self.lcm_tasks.register(
345 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
346 )
347 return
348 elif command == "edit" or command == "edited":
349 k8scluster_id = params.get("_id")
350 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
351 self.lcm_tasks.register(
352 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
353 )
354 return
355 elif command == "delete" or command == "deleted":
356 k8scluster_id = params.get("_id")
357 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
358 self.lcm_tasks.register(
359 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
360 )
361 return
362 elif topic == "vca":
363 if command == "create" or command == "created":
364 vca_id = params.get("_id")
365 task = asyncio.ensure_future(self.vca.create(params, order_id))
366 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
367 return
368 elif command == "edit" or command == "edited":
369 vca_id = params.get("_id")
370 task = asyncio.ensure_future(self.vca.edit(params, order_id))
371 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
372 return
373 elif command == "delete" or command == "deleted":
374 vca_id = params.get("_id")
375 task = asyncio.ensure_future(self.vca.delete(params, order_id))
376 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
377 return
378 elif topic == "k8srepo":
379 if command == "create" or command == "created":
380 k8srepo_id = params.get("_id")
381 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
382 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
383 self.lcm_tasks.register(
384 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
385 )
386 return
387 elif command == "delete" or command == "deleted":
388 k8srepo_id = params.get("_id")
389 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
390 self.lcm_tasks.register(
391 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
392 )
393 return
394 elif topic == "ns":
395 if command == "instantiate":
396 # self.logger.debug("Deploying NS {}".format(nsr_id))
397 nslcmop = params
398 nslcmop_id = nslcmop["_id"]
399 nsr_id = nslcmop["nsInstanceId"]
400 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
401 self.lcm_tasks.register(
402 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
403 )
404 return
405 elif command == "terminate":
406 # self.logger.debug("Deleting NS {}".format(nsr_id))
407 nslcmop = params
408 nslcmop_id = nslcmop["_id"]
409 nsr_id = nslcmop["nsInstanceId"]
410 self.lcm_tasks.cancel(topic, nsr_id)
411 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
412 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
413 return
414 elif command == "vca_status_refresh":
415 nslcmop = params
416 nslcmop_id = nslcmop["_id"]
417 nsr_id = nslcmop["nsInstanceId"]
418 task = asyncio.ensure_future(
419 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
420 )
421 self.lcm_tasks.register(
422 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
423 )
424 return
425 elif command == "action":
426 # self.logger.debug("Update NS {}".format(nsr_id))
427 nslcmop = params
428 nslcmop_id = nslcmop["_id"]
429 nsr_id = nslcmop["nsInstanceId"]
430 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
431 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
432 return
433 elif command == "update":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
440 return
441 elif command == "scale":
442 # self.logger.debug("Update NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
448 return
449 elif command == "heal":
450 # self.logger.debug("Healing NS {}".format(nsr_id))
451 nslcmop = params
452 nslcmop_id = nslcmop["_id"]
453 nsr_id = nslcmop["nsInstanceId"]
454 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
455 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
456 return
457 elif command == "migrate":
458 nslcmop = params
459 nslcmop_id = nslcmop["_id"]
460 nsr_id = nslcmop["nsInstanceId"]
461 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
462 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
463 return
464 elif command == "verticalscale":
465 nslcmop = params
466 nslcmop_id = nslcmop["_id"]
467 nsr_id = nslcmop["nsInstanceId"]
468 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
469 self.logger.debug(
470 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
471 )
472 self.lcm_tasks.register(
473 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
474 )
475 self.logger.debug(
476 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
477 )
478 return
479 elif command == "show":
480 nsr_id = params
481 try:
482 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
483 print(
484 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
485 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
486 "".format(
487 nsr_id,
488 db_nsr["operational-status"],
489 db_nsr["config-status"],
490 db_nsr["detailed-status"],
491 db_nsr["_admin"]["deployed"],
492 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
493 )
494 )
495 except Exception as e:
496 print("nsr {} not found: {}".format(nsr_id, e))
497 sys.stdout.flush()
498 return
499 elif command == "deleted":
500 return # TODO cleaning of task just in case should be done
501 elif command in (
502 "vnf_terminated",
503 "policy_updated",
504 "terminated",
505 "instantiated",
506 "scaled",
507 "healed",
508 "actioned",
509 "updated",
510 "migrated",
511 "verticalscaled",
512 ): # "scaled-cooldown-time"
513 return
514
515 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
516 if command == "instantiate":
517 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
518 nsilcmop = params
519 nsilcmop_id = nsilcmop["_id"] # slice operation id
520 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
521 task = asyncio.ensure_future(
522 self.netslice.instantiate(nsir_id, nsilcmop_id)
523 )
524 self.lcm_tasks.register(
525 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
526 )
527 return
528 elif command == "terminate":
529 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
530 nsilcmop = params
531 nsilcmop_id = nsilcmop["_id"] # slice operation id
532 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
533 self.lcm_tasks.cancel(topic, nsir_id)
534 task = asyncio.ensure_future(
535 self.netslice.terminate(nsir_id, nsilcmop_id)
536 )
537 self.lcm_tasks.register(
538 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
539 )
540 return
541 elif command == "show":
542 nsir_id = params
543 try:
544 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
545 print(
546 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
547 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
548 "".format(
549 nsir_id,
550 db_nsir["operational-status"],
551 db_nsir["config-status"],
552 db_nsir["detailed-status"],
553 db_nsir["_admin"]["deployed"],
554 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
555 )
556 )
557 except Exception as e:
558 print("nsir {} not found: {}".format(nsir_id, e))
559 sys.stdout.flush()
560 return
561 elif command == "deleted":
562 return # TODO cleaning of task just in case should be done
563 elif command in (
564 "terminated",
565 "instantiated",
566 "scaled",
567 "healed",
568 "actioned",
569 ): # "scaled-cooldown-time"
570 return
571 elif topic == "vim_account":
572 vim_id = params["_id"]
573 if command in ("create", "created"):
574 if not self.main_config.RO.ng:
575 task = asyncio.ensure_future(self.vim.create(params, order_id))
576 self.lcm_tasks.register(
577 "vim_account", vim_id, order_id, "vim_create", task
578 )
579 return
580 elif command == "delete" or command == "deleted":
581 self.lcm_tasks.cancel(topic, vim_id)
582 task = asyncio.ensure_future(self.vim.delete(params, order_id))
583 self.lcm_tasks.register(
584 "vim_account", vim_id, order_id, "vim_delete", task
585 )
586 return
587 elif command == "show":
588 print("not implemented show with vim_account")
589 sys.stdout.flush()
590 return
591 elif command in ("edit", "edited"):
592 if not self.main_config.RO.ng:
593 task = asyncio.ensure_future(self.vim.edit(params, order_id))
594 self.lcm_tasks.register(
595 "vim_account", vim_id, order_id, "vim_edit", task
596 )
597 return
598 elif command == "deleted":
599 return # TODO cleaning of task just in case should be done
600 elif topic == "wim_account":
601 wim_id = params["_id"]
602 if command in ("create", "created"):
603 if not self.main_config.RO.ng:
604 task = asyncio.ensure_future(self.wim.create(params, order_id))
605 self.lcm_tasks.register(
606 "wim_account", wim_id, order_id, "wim_create", task
607 )
608 return
609 elif command == "delete" or command == "deleted":
610 self.lcm_tasks.cancel(topic, wim_id)
611 task = asyncio.ensure_future(self.wim.delete(params, order_id))
612 self.lcm_tasks.register(
613 "wim_account", wim_id, order_id, "wim_delete", task
614 )
615 return
616 elif command == "show":
617 print("not implemented show with wim_account")
618 sys.stdout.flush()
619 return
620 elif command in ("edit", "edited"):
621 task = asyncio.ensure_future(self.wim.edit(params, order_id))
622 self.lcm_tasks.register(
623 "wim_account", wim_id, order_id, "wim_edit", task
624 )
625 return
626 elif command == "deleted":
627 return # TODO cleaning of task just in case should be done
628 elif topic == "sdn":
629 _sdn_id = params["_id"]
630 if command in ("create", "created"):
631 if not self.main_config.RO.ng:
632 task = asyncio.ensure_future(self.sdn.create(params, order_id))
633 self.lcm_tasks.register(
634 "sdn", _sdn_id, order_id, "sdn_create", task
635 )
636 return
637 elif command == "delete" or command == "deleted":
638 self.lcm_tasks.cancel(topic, _sdn_id)
639 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
640 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
641 return
642 elif command in ("edit", "edited"):
643 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
644 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
645 return
646 elif command == "deleted":
647 return # TODO cleaning of task just in case should be done
648 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
649
650 async def kafka_read(self):
651 self.logger.debug(
652 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
653 )
654 # future = asyncio.Future()
655 self.consecutive_errors = 0
656 self.first_start = True
657 while self.consecutive_errors < 10:
658 try:
659 topics = (
660 "ns",
661 "vim_account",
662 "wim_account",
663 "sdn",
664 "nsi",
665 "k8scluster",
666 "vca",
667 "k8srepo",
668 "pla",
669 )
670 topics_admin = ("admin",)
671 await asyncio.gather(
672 self.msg.aioread(
673 topics, self.kafka_read_callback, from_beginning=True
674 ),
675 self.msg_admin.aioread(
676 topics_admin,
677 self.kafka_read_callback,
678 group_id=False,
679 ),
680 )
681
682 except LcmExceptionExit:
683 self.logger.debug("Bye!")
684 break
685 except Exception as e:
686 # if not first_start is the first time after starting. So leave more time and wait
687 # to allow kafka starts
688 if self.consecutive_errors == 8 if not self.first_start else 30:
689 self.logger.error(
690 "Task kafka_read task exit error too many errors. Exception: {}".format(
691 e
692 )
693 )
694 raise
695 self.consecutive_errors += 1
696 self.logger.error(
697 "Task kafka_read retrying after Exception {}".format(e)
698 )
699 wait_time = 2 if not self.first_start else 5
700 await asyncio.sleep(wait_time)
701
702 # self.logger.debug("Task kafka_read terminating")
703 self.logger.debug("Task kafka_read exit")
704
705 async def kafka_read_ping(self):
706 await asyncio.gather(self.kafka_read(), self.kafka_ping())
707
708 def start(self):
709 # check RO version
710 asyncio.run(self.check_RO_version())
711
712 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
713 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
714 self.netslice = netslice.NetsliceLcm(
715 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
716 )
717 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
718 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
719 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
720 self.k8scluster = vim_sdn.K8sClusterLcm(
721 self.msg, self.lcm_tasks, self.main_config.to_dict()
722 )
723 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
724 self.k8srepo = vim_sdn.K8sRepoLcm(
725 self.msg, self.lcm_tasks, self.main_config.to_dict()
726 )
727
728 asyncio.run(self.kafka_read_ping())
729
730 # TODO
731 # self.logger.debug("Terminating cancelling creation tasks")
732 # self.lcm_tasks.cancel("ALL", "create")
733 # timeout = 200
734 # while self.is_pending_tasks():
735 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
736 # await asyncio.sleep(2)
737 # timeout -= 2
738 # if not timeout:
739 # self.lcm_tasks.cancel("ALL", "ALL")
740 if self.db:
741 self.db.db_disconnect()
742 if self.msg:
743 self.msg.disconnect()
744 if self.msg_admin:
745 self.msg_admin.disconnect()
746 if self.fs:
747 self.fs.fs_disconnect()
748
749 def read_config_file(self, config_file):
750 try:
751 with open(config_file) as f:
752 return yaml.safe_load(f)
753 except Exception as e:
754 self.logger.critical("At config file '{}': {}".format(config_file, e))
755 exit(1)
756
757 @staticmethod
758 def get_process_id():
759 """
760 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
761 will provide a random one
762 :return: Obtained ID
763 """
764 # Try getting docker id. If fails, get pid
765 try:
766 with open("/proc/self/cgroup", "r") as f:
767 text_id_ = f.readline()
768 _, _, text_id = text_id_.rpartition("/")
769 text_id = text_id.replace("\n", "")[:12]
770 if text_id:
771 return text_id
772 except Exception:
773 pass
774 # Return a random id
775 return "".join(random_choice("0123456789abcdef") for _ in range(12))
776
777
778 def usage():
779 print(
780 """Usage: {} [options]
781 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
782 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
783 -h|--help: shows this help
784 """.format(
785 sys.argv[0]
786 )
787 )
788 # --log-socket-host HOST: send logs to this host")
789 # --log-socket-port PORT: send logs using this port (default: 9022)")
790
791
792 if __name__ == "__main__":
793 try:
794 # print("SYS.PATH='{}'".format(sys.path))
795 # load parameters and configuration
796 # -h
797 # -c value
798 # --config value
799 # --help
800 # --health-check
801 opts, args = getopt.getopt(
802 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
803 )
804 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
805 config_file = None
806 for o, a in opts:
807 if o in ("-h", "--help"):
808 usage()
809 sys.exit()
810 elif o in ("-c", "--config"):
811 config_file = a
812 elif o == "--health-check":
813 from osm_lcm.lcm_hc import health_check
814
815 health_check(config_file, Lcm.ping_interval_pace)
816 # elif o == "--log-socket-port":
817 # log_socket_port = a
818 # elif o == "--log-socket-host":
819 # log_socket_host = a
820 # elif o == "--log-file":
821 # log_file = a
822 else:
823 assert False, "Unhandled option"
824
825 if config_file:
826 if not path.isfile(config_file):
827 print(
828 "configuration file '{}' does not exist".format(config_file),
829 file=sys.stderr,
830 )
831 exit(1)
832 else:
833 for config_file in (
834 __file__[: __file__.rfind(".")] + ".cfg",
835 "./lcm.cfg",
836 "/etc/osm/lcm.cfg",
837 ):
838 if path.isfile(config_file):
839 break
840 else:
841 print(
842 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
843 file=sys.stderr,
844 )
845 exit(1)
846 lcm = Lcm(config_file)
847 lcm.start()
848 except (LcmException, getopt.GetoptError) as e:
849 print(str(e), file=sys.stderr)
850 # usage()
851 exit(1)