5ed39abcf89c26acc1d25462256a2a217d89feb3
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import pdb
23
24 import asyncio
25 import yaml
26 import logging
27 import logging.handlers
28 import getopt
29 import sys
30 from random import SystemRandom
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path, getenv
50 from n2vc import version as n2vc_version
51 import traceback
52
53 if getenv("OSMLCM_PDB_DEBUG", None) is not None:
54 pdb.set_trace()
55
56
57 __author__ = "Alfonso Tierno"
58 min_RO_version = "6.0.2"
59 min_n2vc_version = "0.0.2"
60
61 min_common_version = "0.1.19"
62
63
64 class Lcm:
65 ping_interval_pace = (
66 120 # how many time ping is send once is confirmed all is running
67 )
68 ping_interval_boot = 5 # how many time ping is sent when booting
69
70 main_config = LcmCfg()
71
72 def __init__(self, config_file):
73 """
74 Init, Connect to database, filesystem storage, and messaging
75 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
76 :return: None
77 """
78 self.db = None
79 self.msg = None
80 self.msg_admin = None
81 self.fs = None
82 self.pings_not_received = 1
83 self.consecutive_errors = 0
84 self.first_start = False
85
86 # logging
87 self.logger = logging.getLogger("lcm")
88 # get id
89 self.worker_id = self.get_process_id()
90 # load configuration
91 config = self.read_config_file(config_file)
92 self.main_config.set_from_dict(config)
93 self.main_config.transform()
94 self.main_config.load_from_env()
95 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
96 # TODO: check if lcm_hc.py is necessary
97 self.health_check_file = get_health_check_file(self.main_config.to_dict())
98 self.ns = (
99 self.netslice
100 ) = (
101 self.vim
102 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
103
104 # logging
105 log_format_simple = (
106 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 )
108 log_formatter_simple = logging.Formatter(
109 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
110 )
111 if self.main_config.globalConfig.logfile:
112 file_handler = logging.handlers.RotatingFileHandler(
113 self.main_config.globalConfig.logfile,
114 maxBytes=100e6,
115 backupCount=9,
116 delay=0,
117 )
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not self.main_config.globalConfig.to_dict()["nologging"]:
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if self.main_config.globalConfig.to_dict()["loglevel"]:
126 self.logger.setLevel(self.main_config.globalConfig.loglevel)
127
128 # logging other modules
129 for logger in ("message", "database", "storage", "tsdb"):
130 logger_config = self.main_config.to_dict()[logger]
131 logger_module = logging.getLogger(logger_config["logger_name"])
132 if logger_config["logfile"]:
133 file_handler = logging.handlers.RotatingFileHandler(
134 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
135 )
136 file_handler.setFormatter(log_formatter_simple)
137 logger_module.addHandler(file_handler)
138 if logger_config["loglevel"]:
139 logger_module.setLevel(logger_config["loglevel"])
140 self.logger.critical(
141 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
142 )
143
144 # check version of N2VC
145 # TODO enhance with int conversion or from distutils.version import LooseVersion
146 # or with list(map(int, version.split(".")))
147 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
148 raise LcmException(
149 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
150 n2vc_version, min_n2vc_version
151 )
152 )
153 # check version of common
154 if versiontuple(common_version) < versiontuple(min_common_version):
155 raise LcmException(
156 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
157 common_version, min_common_version
158 )
159 )
160
161 try:
162 self.db = Database(self.main_config.to_dict()).instance.db
163
164 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
165 self.fs.sync()
166
167 # copy message configuration in order to remove 'group_id' for msg_admin
168 config_message = self.main_config.message.to_dict()
169 config_message["loop"] = asyncio.get_event_loop()
170 if config_message["driver"] == "local":
171 self.msg = msglocal.MsgLocal()
172 self.msg.connect(config_message)
173 self.msg_admin = msglocal.MsgLocal()
174 config_message.pop("group_id", None)
175 self.msg_admin.connect(config_message)
176 elif config_message["driver"] == "kafka":
177 self.msg = msgkafka.MsgKafka()
178 self.msg.connect(config_message)
179 self.msg_admin = msgkafka.MsgKafka()
180 config_message.pop("group_id", None)
181 self.msg_admin.connect(config_message)
182 else:
183 raise LcmException(
184 "Invalid configuration param '{}' at '[message]':'driver'".format(
185 self.main_config.message.driver
186 )
187 )
188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
192 # contains created tasks/futures to be able to cancel
193 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
194
195 async def check_RO_version(self):
196 tries = 14
197 last_error = None
198 while True:
199 ro_uri = self.main_config.RO.uri
200 if not ro_uri:
201 ro_uri = ""
202 try:
203 # try new RO, if fail old RO
204 try:
205 self.main_config.RO.uri = ro_uri + "ro"
206 ro_server = NgRoClient(**self.main_config.RO.to_dict())
207 ro_version = await ro_server.get_version()
208 self.main_config.RO.ng = True
209 except Exception:
210 self.main_config.RO.uri = ro_uri + "openmano"
211 ro_server = ROClient(**self.main_config.RO.to_dict())
212 ro_version = await ro_server.get_version()
213 self.main_config.RO.ng = False
214 if versiontuple(ro_version) < versiontuple(min_RO_version):
215 raise LcmException(
216 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
217 ro_version, min_RO_version
218 )
219 )
220 self.logger.info(
221 "Connected to RO version {} new-generation version {}".format(
222 ro_version, self.main_config.RO.ng
223 )
224 )
225 return
226 except (ROClientException, NgRoException) as e:
227 self.main_config.RO.uri = ro_uri
228 tries -= 1
229 traceback.print_tb(e.__traceback__)
230 error_text = "Error while connecting to RO on {}: {}".format(
231 self.main_config.RO.uri, e
232 )
233 if tries <= 0:
234 self.logger.critical(error_text)
235 raise LcmException(error_text)
236 if last_error != error_text:
237 last_error = error_text
238 self.logger.error(
239 error_text + ". Waiting until {} seconds".format(5 * tries)
240 )
241 await asyncio.sleep(5)
242
243 async def test(self, param=None):
244 self.logger.debug("Starting/Ending test task: {}".format(param))
245
246 async def kafka_ping(self):
247 self.logger.debug("Task kafka_ping Enter")
248 consecutive_errors = 0
249 first_start = True
250 kafka_has_received = False
251 self.pings_not_received = 1
252 while True:
253 try:
254 await self.msg_admin.aiowrite(
255 "admin",
256 "ping",
257 {
258 "from": "lcm",
259 "to": "lcm",
260 "worker_id": self.worker_id,
261 "version": lcm_version,
262 },
263 )
264 # time between pings are low when it is not received and at starting
265 wait_time = (
266 self.ping_interval_boot
267 if not kafka_has_received
268 else self.ping_interval_pace
269 )
270 if not self.pings_not_received:
271 kafka_has_received = True
272 self.pings_not_received += 1
273 await asyncio.sleep(wait_time)
274 if self.pings_not_received > 10:
275 raise LcmException("It is not receiving pings from Kafka bus")
276 consecutive_errors = 0
277 first_start = False
278 except LcmException:
279 raise
280 except Exception as e:
281 # if not first_start is the first time after starting. So leave more time and wait
282 # to allow kafka starts
283 if consecutive_errors == 8 if not first_start else 30:
284 self.logger.error(
285 "Task kafka_read task exit error too many errors. Exception: {}".format(
286 e
287 )
288 )
289 raise
290 consecutive_errors += 1
291 self.logger.error(
292 "Task kafka_read retrying after Exception {}".format(e)
293 )
294 wait_time = 2 if not first_start else 5
295 await asyncio.sleep(wait_time)
296
297 def kafka_read_callback(self, topic, command, params):
298 order_id = 1
299
300 if topic != "admin" and command != "ping":
301 self.logger.debug(
302 "Task kafka_read receives {} {}: {}".format(topic, command, params)
303 )
304 self.consecutive_errors = 0
305 self.first_start = False
306 order_id += 1
307 if command == "exit":
308 raise LcmExceptionExit
309 elif command.startswith("#"):
310 return
311 elif command == "echo":
312 # just for test
313 print(params)
314 sys.stdout.flush()
315 return
316 elif command == "test":
317 asyncio.Task(self.test(params))
318 return
319
320 if topic == "admin":
321 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
322 if params.get("worker_id") != self.worker_id:
323 return
324 self.pings_not_received = 0
325 try:
326 with open(self.health_check_file, "w") as f:
327 f.write(str(time()))
328 except Exception as e:
329 self.logger.error(
330 "Cannot write into '{}' for healthcheck: {}".format(
331 self.health_check_file, e
332 )
333 )
334 return
335 elif topic == "pla":
336 if command == "placement":
337 self.ns.update_nsrs_with_pla_result(params)
338 return
339 elif topic == "k8scluster":
340 if command == "create" or command == "created":
341 k8scluster_id = params.get("_id")
342 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
343 self.lcm_tasks.register(
344 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
345 )
346 return
347 elif command == "edit" or command == "edited":
348 k8scluster_id = params.get("_id")
349 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
350 self.lcm_tasks.register(
351 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
352 )
353 return
354 elif command == "delete" or command == "deleted":
355 k8scluster_id = params.get("_id")
356 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
357 self.lcm_tasks.register(
358 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
359 )
360 return
361 elif topic == "vca":
362 if command == "create" or command == "created":
363 vca_id = params.get("_id")
364 task = asyncio.ensure_future(self.vca.create(params, order_id))
365 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
366 return
367 elif command == "edit" or command == "edited":
368 vca_id = params.get("_id")
369 task = asyncio.ensure_future(self.vca.edit(params, order_id))
370 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
371 return
372 elif command == "delete" or command == "deleted":
373 vca_id = params.get("_id")
374 task = asyncio.ensure_future(self.vca.delete(params, order_id))
375 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
376 return
377 elif topic == "k8srepo":
378 if command == "create" or command == "created":
379 k8srepo_id = params.get("_id")
380 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
381 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
382 self.lcm_tasks.register(
383 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
384 )
385 return
386 elif command == "delete" or command == "deleted":
387 k8srepo_id = params.get("_id")
388 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
389 self.lcm_tasks.register(
390 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
391 )
392 return
393 elif topic == "ns":
394 if command == "instantiate":
395 # self.logger.debug("Deploying NS {}".format(nsr_id))
396 nslcmop = params
397 nslcmop_id = nslcmop["_id"]
398 nsr_id = nslcmop["nsInstanceId"]
399 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
400 self.lcm_tasks.register(
401 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
402 )
403 return
404 elif command == "terminate":
405 # self.logger.debug("Deleting NS {}".format(nsr_id))
406 nslcmop = params
407 nslcmop_id = nslcmop["_id"]
408 nsr_id = nslcmop["nsInstanceId"]
409 self.lcm_tasks.cancel(topic, nsr_id)
410 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
411 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
412 return
413 elif command == "vca_status_refresh":
414 nslcmop = params
415 nslcmop_id = nslcmop["_id"]
416 nsr_id = nslcmop["nsInstanceId"]
417 task = asyncio.ensure_future(
418 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
419 )
420 self.lcm_tasks.register(
421 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
422 )
423 return
424 elif command == "action":
425 # self.logger.debug("Update NS {}".format(nsr_id))
426 nslcmop = params
427 nslcmop_id = nslcmop["_id"]
428 nsr_id = nslcmop["nsInstanceId"]
429 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
430 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
431 return
432 elif command == "update":
433 # self.logger.debug("Update NS {}".format(nsr_id))
434 nslcmop = params
435 nslcmop_id = nslcmop["_id"]
436 nsr_id = nslcmop["nsInstanceId"]
437 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
438 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
439 return
440 elif command == "scale":
441 # self.logger.debug("Update NS {}".format(nsr_id))
442 nslcmop = params
443 nslcmop_id = nslcmop["_id"]
444 nsr_id = nslcmop["nsInstanceId"]
445 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
446 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
447 return
448 elif command == "heal":
449 # self.logger.debug("Healing NS {}".format(nsr_id))
450 nslcmop = params
451 nslcmop_id = nslcmop["_id"]
452 nsr_id = nslcmop["nsInstanceId"]
453 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
454 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
455 return
456 elif command == "migrate":
457 nslcmop = params
458 nslcmop_id = nslcmop["_id"]
459 nsr_id = nslcmop["nsInstanceId"]
460 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
461 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
462 return
463 elif command == "verticalscale":
464 nslcmop = params
465 nslcmop_id = nslcmop["_id"]
466 nsr_id = nslcmop["nsInstanceId"]
467 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
468 self.logger.debug(
469 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
470 )
471 self.lcm_tasks.register(
472 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
473 )
474 self.logger.debug(
475 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
476 )
477 return
478 elif command == "show":
479 nsr_id = params
480 try:
481 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
482 print(
483 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
484 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
485 "".format(
486 nsr_id,
487 db_nsr["operational-status"],
488 db_nsr["config-status"],
489 db_nsr["detailed-status"],
490 db_nsr["_admin"]["deployed"],
491 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
492 )
493 )
494 except Exception as e:
495 print("nsr {} not found: {}".format(nsr_id, e))
496 sys.stdout.flush()
497 return
498 elif command == "deleted":
499 return # TODO cleaning of task just in case should be done
500 elif command in (
501 "vnf_terminated",
502 "policy_updated",
503 "terminated",
504 "instantiated",
505 "scaled",
506 "healed",
507 "actioned",
508 "updated",
509 "migrated",
510 "verticalscaled",
511 ): # "scaled-cooldown-time"
512 return
513
514 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
515 if command == "instantiate":
516 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
517 nsilcmop = params
518 nsilcmop_id = nsilcmop["_id"] # slice operation id
519 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
520 task = asyncio.ensure_future(
521 self.netslice.instantiate(nsir_id, nsilcmop_id)
522 )
523 self.lcm_tasks.register(
524 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
525 )
526 return
527 elif command == "terminate":
528 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
529 nsilcmop = params
530 nsilcmop_id = nsilcmop["_id"] # slice operation id
531 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
532 self.lcm_tasks.cancel(topic, nsir_id)
533 task = asyncio.ensure_future(
534 self.netslice.terminate(nsir_id, nsilcmop_id)
535 )
536 self.lcm_tasks.register(
537 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
538 )
539 return
540 elif command == "show":
541 nsir_id = params
542 try:
543 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
544 print(
545 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
546 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
547 "".format(
548 nsir_id,
549 db_nsir["operational-status"],
550 db_nsir["config-status"],
551 db_nsir["detailed-status"],
552 db_nsir["_admin"]["deployed"],
553 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
554 )
555 )
556 except Exception as e:
557 print("nsir {} not found: {}".format(nsir_id, e))
558 sys.stdout.flush()
559 return
560 elif command == "deleted":
561 return # TODO cleaning of task just in case should be done
562 elif command in (
563 "terminated",
564 "instantiated",
565 "scaled",
566 "healed",
567 "actioned",
568 ): # "scaled-cooldown-time"
569 return
570 elif topic == "vim_account":
571 vim_id = params["_id"]
572 if command in ("create", "created"):
573 if not self.main_config.RO.ng:
574 task = asyncio.ensure_future(self.vim.create(params, order_id))
575 self.lcm_tasks.register(
576 "vim_account", vim_id, order_id, "vim_create", task
577 )
578 return
579 elif command == "delete" or command == "deleted":
580 self.lcm_tasks.cancel(topic, vim_id)
581 task = asyncio.ensure_future(self.vim.delete(params, order_id))
582 self.lcm_tasks.register(
583 "vim_account", vim_id, order_id, "vim_delete", task
584 )
585 return
586 elif command == "show":
587 print("not implemented show with vim_account")
588 sys.stdout.flush()
589 return
590 elif command in ("edit", "edited"):
591 if not self.main_config.RO.ng:
592 task = asyncio.ensure_future(self.vim.edit(params, order_id))
593 self.lcm_tasks.register(
594 "vim_account", vim_id, order_id, "vim_edit", task
595 )
596 return
597 elif command == "deleted":
598 return # TODO cleaning of task just in case should be done
599 elif topic == "wim_account":
600 wim_id = params["_id"]
601 if command in ("create", "created"):
602 if not self.main_config.RO.ng:
603 task = asyncio.ensure_future(self.wim.create(params, order_id))
604 self.lcm_tasks.register(
605 "wim_account", wim_id, order_id, "wim_create", task
606 )
607 return
608 elif command == "delete" or command == "deleted":
609 self.lcm_tasks.cancel(topic, wim_id)
610 task = asyncio.ensure_future(self.wim.delete(params, order_id))
611 self.lcm_tasks.register(
612 "wim_account", wim_id, order_id, "wim_delete", task
613 )
614 return
615 elif command == "show":
616 print("not implemented show with wim_account")
617 sys.stdout.flush()
618 return
619 elif command in ("edit", "edited"):
620 task = asyncio.ensure_future(self.wim.edit(params, order_id))
621 self.lcm_tasks.register(
622 "wim_account", wim_id, order_id, "wim_edit", task
623 )
624 return
625 elif command == "deleted":
626 return # TODO cleaning of task just in case should be done
627 elif topic == "sdn":
628 _sdn_id = params["_id"]
629 if command in ("create", "created"):
630 if not self.main_config.RO.ng:
631 task = asyncio.ensure_future(self.sdn.create(params, order_id))
632 self.lcm_tasks.register(
633 "sdn", _sdn_id, order_id, "sdn_create", task
634 )
635 return
636 elif command == "delete" or command == "deleted":
637 self.lcm_tasks.cancel(topic, _sdn_id)
638 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
639 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
640 return
641 elif command in ("edit", "edited"):
642 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
643 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
644 return
645 elif command == "deleted":
646 return # TODO cleaning of task just in case should be done
647 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
648
649 async def kafka_read(self):
650 self.logger.debug(
651 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
652 )
653 # future = asyncio.Future()
654 self.consecutive_errors = 0
655 self.first_start = True
656 while self.consecutive_errors < 10:
657 try:
658 topics = (
659 "ns",
660 "vim_account",
661 "wim_account",
662 "sdn",
663 "nsi",
664 "k8scluster",
665 "vca",
666 "k8srepo",
667 "pla",
668 )
669 topics_admin = ("admin",)
670 await asyncio.gather(
671 self.msg.aioread(
672 topics, self.kafka_read_callback, from_beginning=True
673 ),
674 self.msg_admin.aioread(
675 topics_admin,
676 self.kafka_read_callback,
677 group_id=False,
678 ),
679 )
680
681 except LcmExceptionExit:
682 self.logger.debug("Bye!")
683 break
684 except Exception as e:
685 # if not first_start is the first time after starting. So leave more time and wait
686 # to allow kafka starts
687 if self.consecutive_errors == 8 if not self.first_start else 30:
688 self.logger.error(
689 "Task kafka_read task exit error too many errors. Exception: {}".format(
690 e
691 )
692 )
693 raise
694 self.consecutive_errors += 1
695 self.logger.error(
696 "Task kafka_read retrying after Exception {}".format(e)
697 )
698 wait_time = 2 if not self.first_start else 5
699 await asyncio.sleep(wait_time)
700
701 # self.logger.debug("Task kafka_read terminating")
702 self.logger.debug("Task kafka_read exit")
703
704 async def kafka_read_ping(self):
705 await asyncio.gather(self.kafka_read(), self.kafka_ping())
706
707 async def start(self):
708 # check RO version
709 await self.check_RO_version()
710
711 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
712 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
713 self.netslice = netslice.NetsliceLcm(
714 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
715 )
716 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
717 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
718 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
719 self.k8scluster = vim_sdn.K8sClusterLcm(
720 self.msg, self.lcm_tasks, self.main_config.to_dict()
721 )
722 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
723 self.k8srepo = vim_sdn.K8sRepoLcm(
724 self.msg, self.lcm_tasks, self.main_config.to_dict()
725 )
726
727 await self.kafka_read_ping()
728
729 # TODO
730 # self.logger.debug("Terminating cancelling creation tasks")
731 # self.lcm_tasks.cancel("ALL", "create")
732 # timeout = 200
733 # while self.is_pending_tasks():
734 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
735 # await asyncio.sleep(2)
736 # timeout -= 2
737 # if not timeout:
738 # self.lcm_tasks.cancel("ALL", "ALL")
739 if self.db:
740 self.db.db_disconnect()
741 if self.msg:
742 self.msg.disconnect()
743 if self.msg_admin:
744 self.msg_admin.disconnect()
745 if self.fs:
746 self.fs.fs_disconnect()
747
748 def read_config_file(self, config_file):
749 try:
750 with open(config_file) as f:
751 return yaml.safe_load(f)
752 except Exception as e:
753 self.logger.critical("At config file '{}': {}".format(config_file, e))
754 exit(1)
755
756 @staticmethod
757 def get_process_id():
758 """
759 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
760 will provide a random one
761 :return: Obtained ID
762 """
763
764 def get_docker_id():
765 try:
766 with open("/proc/self/cgroup", "r") as f:
767 text_id_ = f.readline()
768 _, _, text_id = text_id_.rpartition("/")
769 return text_id.replace("\n", "")[:12]
770 except Exception:
771 return None
772
773 def generate_random_id():
774 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
775
776 # Try getting docker id. If it fails, generate a random id
777 docker_id = get_docker_id()
778 return docker_id if docker_id else generate_random_id()
779
780
781 def usage():
782 print(
783 """Usage: {} [options]
784 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
785 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
786 -h|--help: shows this help
787 """.format(
788 sys.argv[0]
789 )
790 )
791 # --log-socket-host HOST: send logs to this host")
792 # --log-socket-port PORT: send logs using this port (default: 9022)")
793
794
795 if __name__ == "__main__":
796 try:
797 # print("SYS.PATH='{}'".format(sys.path))
798 # load parameters and configuration
799 # -h
800 # -c value
801 # --config value
802 # --help
803 # --health-check
804 opts, args = getopt.getopt(
805 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
806 )
807 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
808 config_file = None
809 for o, a in opts:
810 if o in ("-h", "--help"):
811 usage()
812 sys.exit()
813 elif o in ("-c", "--config"):
814 config_file = a
815 elif o == "--health-check":
816 from osm_lcm.lcm_hc import health_check
817
818 health_check(config_file, Lcm.ping_interval_pace)
819 else:
820 print(f"Unhandled option: {o}")
821 exit(1)
822
823 if config_file:
824 if not path.isfile(config_file):
825 print(
826 "configuration file '{}' does not exist".format(config_file),
827 file=sys.stderr,
828 )
829 exit(1)
830 else:
831 for config_file in (
832 __file__[: __file__.rfind(".")] + ".cfg",
833 "./lcm.cfg",
834 "/etc/osm/lcm.cfg",
835 ):
836 if path.isfile(config_file):
837 break
838 else:
839 print(
840 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
841 file=sys.stderr,
842 )
843 exit(1)
844 lcm = Lcm(config_file)
845 asyncio.run(lcm.start())
846 except (LcmException, getopt.GetoptError) as e:
847 print(str(e), file=sys.stderr)
848 # usage()
849 exit(1)