Fix loading of boolean values in configuration and set missing default values
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path
50 from random import choice as random_choice
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66
67 ping_interval_pace = (
68 120 # how many time ping is send once is confirmed all is running
69 )
70 ping_interval_boot = 5 # how many time ping is sent when booting
71
72 main_config = LcmCfg()
73
74 def __init__(self, config_file, loop=None):
75 """
76 Init, Connect to database, filesystem storage, and messaging
77 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
78 :return: None
79 """
80 self.db = None
81 self.msg = None
82 self.msg_admin = None
83 self.fs = None
84 self.pings_not_received = 1
85 self.consecutive_errors = 0
86 self.first_start = False
87
88 # logging
89 self.logger = logging.getLogger("lcm")
90 # get id
91 self.worker_id = self.get_process_id()
92 # load configuration
93 config = self.read_config_file(config_file)
94 self.main_config.set_from_dict(config)
95 self.main_config.transform()
96 self.main_config.load_from_env()
97 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
98 # TODO: check if lcm_hc.py is necessary
99 self.health_check_file = get_health_check_file(self.main_config.to_dict())
100 self.loop = loop or asyncio.get_event_loop()
101 self.ns = (
102 self.netslice
103 ) = (
104 self.vim
105 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
106
107 # logging
108 log_format_simple = (
109 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
110 )
111 log_formatter_simple = logging.Formatter(
112 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
113 )
114 if self.main_config.globalConfig.logfile:
115 file_handler = logging.handlers.RotatingFileHandler(
116 self.main_config.globalConfig.logfile,
117 maxBytes=100e6,
118 backupCount=9,
119 delay=0,
120 )
121 file_handler.setFormatter(log_formatter_simple)
122 self.logger.addHandler(file_handler)
123 if not self.main_config.globalConfig.to_dict()["nologging"]:
124 str_handler = logging.StreamHandler()
125 str_handler.setFormatter(log_formatter_simple)
126 self.logger.addHandler(str_handler)
127
128 if self.main_config.globalConfig.to_dict()["loglevel"]:
129 self.logger.setLevel(self.main_config.globalConfig.loglevel)
130
131 # logging other modules
132 for logger in ("message", "database", "storage", "tsdb"):
133 logger_config = self.main_config.to_dict()[logger]
134 logger_module = logging.getLogger(logger_config["logger_name"])
135 if logger_config["logfile"]:
136 file_handler = logging.handlers.RotatingFileHandler(
137 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
138 )
139 file_handler.setFormatter(log_formatter_simple)
140 logger_module.addHandler(file_handler)
141 if logger_config["loglevel"]:
142 logger_module.setLevel(logger_config["loglevel"])
143 self.logger.critical(
144 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
145 )
146
147 # check version of N2VC
148 # TODO enhance with int conversion or from distutils.version import LooseVersion
149 # or with list(map(int, version.split(".")))
150 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
151 raise LcmException(
152 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
153 n2vc_version, min_n2vc_version
154 )
155 )
156 # check version of common
157 if versiontuple(common_version) < versiontuple(min_common_version):
158 raise LcmException(
159 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
160 common_version, min_common_version
161 )
162 )
163
164 try:
165 self.db = Database(self.main_config.to_dict()).instance.db
166
167 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
168 self.fs.sync()
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = self.main_config.message.to_dict()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException(
187 "Invalid configuration param '{}' at '[message]':'driver'".format(
188 self.main_config.message.driver
189 )
190 )
191 except (DbException, FsException, MsgException) as e:
192 self.logger.critical(str(e), exc_info=True)
193 raise LcmException(str(e))
194
195 # contains created tasks/futures to be able to cancel
196 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
197
198 async def check_RO_version(self):
199 tries = 14
200 last_error = None
201 while True:
202 ro_uri = self.main_config.RO.uri
203 if not ro_uri:
204 ro_uri = ""
205 try:
206 # try new RO, if fail old RO
207 try:
208 self.main_config.RO.uri = ro_uri + "ro"
209 ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
210 ro_version = await ro_server.get_version()
211 self.main_config.RO.ng = True
212 except Exception:
213 self.main_config.RO.uri = ro_uri + "openmano"
214 ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
215 ro_version = await ro_server.get_version()
216 self.main_config.RO.ng = False
217 if versiontuple(ro_version) < versiontuple(min_RO_version):
218 raise LcmException(
219 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
220 ro_version, min_RO_version
221 )
222 )
223 self.logger.info(
224 "Connected to RO version {} new-generation version {}".format(
225 ro_version, self.main_config.RO.ng
226 )
227 )
228 return
229 except (ROClientException, NgRoException) as e:
230 self.main_config.RO.uri = ro_uri
231 tries -= 1
232 traceback.print_tb(e.__traceback__)
233 error_text = "Error while connecting to RO on {}: {}".format(
234 self.main_config.RO.uri, e
235 )
236 if tries <= 0:
237 self.logger.critical(error_text)
238 raise LcmException(error_text)
239 if last_error != error_text:
240 last_error = error_text
241 self.logger.error(
242 error_text + ". Waiting until {} seconds".format(5 * tries)
243 )
244 await asyncio.sleep(5)
245
246 async def test(self, param=None):
247 self.logger.debug("Starting/Ending test task: {}".format(param))
248
249 async def kafka_ping(self):
250 self.logger.debug("Task kafka_ping Enter")
251 consecutive_errors = 0
252 first_start = True
253 kafka_has_received = False
254 self.pings_not_received = 1
255 while True:
256 try:
257 await self.msg_admin.aiowrite(
258 "admin",
259 "ping",
260 {
261 "from": "lcm",
262 "to": "lcm",
263 "worker_id": self.worker_id,
264 "version": lcm_version,
265 },
266 self.loop,
267 )
268 # time between pings are low when it is not received and at starting
269 wait_time = (
270 self.ping_interval_boot
271 if not kafka_has_received
272 else self.ping_interval_pace
273 )
274 if not self.pings_not_received:
275 kafka_has_received = True
276 self.pings_not_received += 1
277 await asyncio.sleep(wait_time, loop=self.loop)
278 if self.pings_not_received > 10:
279 raise LcmException("It is not receiving pings from Kafka bus")
280 consecutive_errors = 0
281 first_start = False
282 except LcmException:
283 raise
284 except Exception as e:
285 # if not first_start is the first time after starting. So leave more time and wait
286 # to allow kafka starts
287 if consecutive_errors == 8 if not first_start else 30:
288 self.logger.error(
289 "Task kafka_read task exit error too many errors. Exception: {}".format(
290 e
291 )
292 )
293 raise
294 consecutive_errors += 1
295 self.logger.error(
296 "Task kafka_read retrying after Exception {}".format(e)
297 )
298 wait_time = 2 if not first_start else 5
299 await asyncio.sleep(wait_time, loop=self.loop)
300
301 def kafka_read_callback(self, topic, command, params):
302 order_id = 1
303
304 if topic != "admin" and command != "ping":
305 self.logger.debug(
306 "Task kafka_read receives {} {}: {}".format(topic, command, params)
307 )
308 self.consecutive_errors = 0
309 self.first_start = False
310 order_id += 1
311 if command == "exit":
312 raise LcmExceptionExit
313 elif command.startswith("#"):
314 return
315 elif command == "echo":
316 # just for test
317 print(params)
318 sys.stdout.flush()
319 return
320 elif command == "test":
321 asyncio.Task(self.test(params), loop=self.loop)
322 return
323
324 if topic == "admin":
325 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
326 if params.get("worker_id") != self.worker_id:
327 return
328 self.pings_not_received = 0
329 try:
330 with open(self.health_check_file, "w") as f:
331 f.write(str(time()))
332 except Exception as e:
333 self.logger.error(
334 "Cannot write into '{}' for healthcheck: {}".format(
335 self.health_check_file, e
336 )
337 )
338 return
339 elif topic == "pla":
340 if command == "placement":
341 self.ns.update_nsrs_with_pla_result(params)
342 return
343 elif topic == "k8scluster":
344 if command == "create" or command == "created":
345 k8scluster_id = params.get("_id")
346 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
347 self.lcm_tasks.register(
348 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
349 )
350 return
351 elif command == "delete" or command == "deleted":
352 k8scluster_id = params.get("_id")
353 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
354 self.lcm_tasks.register(
355 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
356 )
357 return
358 elif topic == "vca":
359 if command == "create" or command == "created":
360 vca_id = params.get("_id")
361 task = asyncio.ensure_future(self.vca.create(params, order_id))
362 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
363 return
364 elif command == "delete" or command == "deleted":
365 vca_id = params.get("_id")
366 task = asyncio.ensure_future(self.vca.delete(params, order_id))
367 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
368 return
369 elif topic == "k8srepo":
370 if command == "create" or command == "created":
371 k8srepo_id = params.get("_id")
372 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
373 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
374 self.lcm_tasks.register(
375 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
376 )
377 return
378 elif command == "delete" or command == "deleted":
379 k8srepo_id = params.get("_id")
380 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
381 self.lcm_tasks.register(
382 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
383 )
384 return
385 elif topic == "ns":
386 if command == "instantiate":
387 # self.logger.debug("Deploying NS {}".format(nsr_id))
388 nslcmop = params
389 nslcmop_id = nslcmop["_id"]
390 nsr_id = nslcmop["nsInstanceId"]
391 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
392 self.lcm_tasks.register(
393 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
394 )
395 return
396 elif command == "terminate":
397 # self.logger.debug("Deleting NS {}".format(nsr_id))
398 nslcmop = params
399 nslcmop_id = nslcmop["_id"]
400 nsr_id = nslcmop["nsInstanceId"]
401 self.lcm_tasks.cancel(topic, nsr_id)
402 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
403 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
404 return
405 elif command == "vca_status_refresh":
406 nslcmop = params
407 nslcmop_id = nslcmop["_id"]
408 nsr_id = nslcmop["nsInstanceId"]
409 task = asyncio.ensure_future(
410 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
411 )
412 self.lcm_tasks.register(
413 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
414 )
415 return
416 elif command == "action":
417 # self.logger.debug("Update NS {}".format(nsr_id))
418 nslcmop = params
419 nslcmop_id = nslcmop["_id"]
420 nsr_id = nslcmop["nsInstanceId"]
421 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
422 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
423 return
424 elif command == "update":
425 # self.logger.debug("Update NS {}".format(nsr_id))
426 nslcmop = params
427 nslcmop_id = nslcmop["_id"]
428 nsr_id = nslcmop["nsInstanceId"]
429 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
430 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
431 return
432 elif command == "scale":
433 # self.logger.debug("Update NS {}".format(nsr_id))
434 nslcmop = params
435 nslcmop_id = nslcmop["_id"]
436 nsr_id = nslcmop["nsInstanceId"]
437 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
438 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
439 return
440 elif command == "heal":
441 # self.logger.debug("Healing NS {}".format(nsr_id))
442 nslcmop = params
443 nslcmop_id = nslcmop["_id"]
444 nsr_id = nslcmop["nsInstanceId"]
445 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
446 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
447 return
448 elif command == "migrate":
449 nslcmop = params
450 nslcmop_id = nslcmop["_id"]
451 nsr_id = nslcmop["nsInstanceId"]
452 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
453 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
454 return
455 elif command == "verticalscale":
456 nslcmop = params
457 nslcmop_id = nslcmop["_id"]
458 nsr_id = nslcmop["nsInstanceId"]
459 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
460 self.logger.debug(
461 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
462 )
463 self.lcm_tasks.register(
464 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
465 )
466 self.logger.debug(
467 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
468 )
469 return
470 elif command == "show":
471 nsr_id = params
472 try:
473 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
474 print(
475 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
476 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
477 "".format(
478 nsr_id,
479 db_nsr["operational-status"],
480 db_nsr["config-status"],
481 db_nsr["detailed-status"],
482 db_nsr["_admin"]["deployed"],
483 self.lcm_ns_tasks.get(nsr_id),
484 )
485 )
486 except Exception as e:
487 print("nsr {} not found: {}".format(nsr_id, e))
488 sys.stdout.flush()
489 return
490 elif command == "deleted":
491 return # TODO cleaning of task just in case should be done
492 elif command in (
493 "vnf_terminated",
494 "policy_updated",
495 "terminated",
496 "instantiated",
497 "scaled",
498 "healed",
499 "actioned",
500 "updated",
501 "migrated",
502 "verticalscaled",
503 ): # "scaled-cooldown-time"
504 return
505
506 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
507 if command == "instantiate":
508 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
509 nsilcmop = params
510 nsilcmop_id = nsilcmop["_id"] # slice operation id
511 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
512 task = asyncio.ensure_future(
513 self.netslice.instantiate(nsir_id, nsilcmop_id)
514 )
515 self.lcm_tasks.register(
516 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
517 )
518 return
519 elif command == "terminate":
520 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
521 nsilcmop = params
522 nsilcmop_id = nsilcmop["_id"] # slice operation id
523 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
524 self.lcm_tasks.cancel(topic, nsir_id)
525 task = asyncio.ensure_future(
526 self.netslice.terminate(nsir_id, nsilcmop_id)
527 )
528 self.lcm_tasks.register(
529 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
530 )
531 return
532 elif command == "show":
533 nsir_id = params
534 try:
535 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
536 print(
537 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
538 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
539 "".format(
540 nsir_id,
541 db_nsir["operational-status"],
542 db_nsir["config-status"],
543 db_nsir["detailed-status"],
544 db_nsir["_admin"]["deployed"],
545 self.lcm_netslice_tasks.get(nsir_id),
546 )
547 )
548 except Exception as e:
549 print("nsir {} not found: {}".format(nsir_id, e))
550 sys.stdout.flush()
551 return
552 elif command == "deleted":
553 return # TODO cleaning of task just in case should be done
554 elif command in (
555 "terminated",
556 "instantiated",
557 "scaled",
558 "healed",
559 "actioned",
560 ): # "scaled-cooldown-time"
561 return
562 elif topic == "vim_account":
563 vim_id = params["_id"]
564 if command in ("create", "created"):
565 if not self.main_config.RO.ng:
566 task = asyncio.ensure_future(self.vim.create(params, order_id))
567 self.lcm_tasks.register(
568 "vim_account", vim_id, order_id, "vim_create", task
569 )
570 return
571 elif command == "delete" or command == "deleted":
572 self.lcm_tasks.cancel(topic, vim_id)
573 task = asyncio.ensure_future(self.vim.delete(params, order_id))
574 self.lcm_tasks.register(
575 "vim_account", vim_id, order_id, "vim_delete", task
576 )
577 return
578 elif command == "show":
579 print("not implemented show with vim_account")
580 sys.stdout.flush()
581 return
582 elif command in ("edit", "edited"):
583 if not self.main_config.RO.ng:
584 task = asyncio.ensure_future(self.vim.edit(params, order_id))
585 self.lcm_tasks.register(
586 "vim_account", vim_id, order_id, "vim_edit", task
587 )
588 return
589 elif command == "deleted":
590 return # TODO cleaning of task just in case should be done
591 elif topic == "wim_account":
592 wim_id = params["_id"]
593 if command in ("create", "created"):
594 if not self.main_config.RO.ng:
595 task = asyncio.ensure_future(self.wim.create(params, order_id))
596 self.lcm_tasks.register(
597 "wim_account", wim_id, order_id, "wim_create", task
598 )
599 return
600 elif command == "delete" or command == "deleted":
601 self.lcm_tasks.cancel(topic, wim_id)
602 task = asyncio.ensure_future(self.wim.delete(params, order_id))
603 self.lcm_tasks.register(
604 "wim_account", wim_id, order_id, "wim_delete", task
605 )
606 return
607 elif command == "show":
608 print("not implemented show with wim_account")
609 sys.stdout.flush()
610 return
611 elif command in ("edit", "edited"):
612 task = asyncio.ensure_future(self.wim.edit(params, order_id))
613 self.lcm_tasks.register(
614 "wim_account", wim_id, order_id, "wim_edit", task
615 )
616 return
617 elif command == "deleted":
618 return # TODO cleaning of task just in case should be done
619 elif topic == "sdn":
620 _sdn_id = params["_id"]
621 if command in ("create", "created"):
622 if not self.main_config.RO.ng:
623 task = asyncio.ensure_future(self.sdn.create(params, order_id))
624 self.lcm_tasks.register(
625 "sdn", _sdn_id, order_id, "sdn_create", task
626 )
627 return
628 elif command == "delete" or command == "deleted":
629 self.lcm_tasks.cancel(topic, _sdn_id)
630 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
631 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
632 return
633 elif command in ("edit", "edited"):
634 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
635 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
636 return
637 elif command == "deleted":
638 return # TODO cleaning of task just in case should be done
639 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
640
641 async def kafka_read(self):
642 self.logger.debug(
643 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
644 )
645 # future = asyncio.Future()
646 self.consecutive_errors = 0
647 self.first_start = True
648 while self.consecutive_errors < 10:
649 try:
650 topics = (
651 "ns",
652 "vim_account",
653 "wim_account",
654 "sdn",
655 "nsi",
656 "k8scluster",
657 "vca",
658 "k8srepo",
659 "pla",
660 )
661 topics_admin = ("admin",)
662 await asyncio.gather(
663 self.msg.aioread(
664 topics, self.loop, self.kafka_read_callback, from_beginning=True
665 ),
666 self.msg_admin.aioread(
667 topics_admin,
668 self.loop,
669 self.kafka_read_callback,
670 group_id=False,
671 ),
672 )
673
674 except LcmExceptionExit:
675 self.logger.debug("Bye!")
676 break
677 except Exception as e:
678 # if not first_start is the first time after starting. So leave more time and wait
679 # to allow kafka starts
680 if self.consecutive_errors == 8 if not self.first_start else 30:
681 self.logger.error(
682 "Task kafka_read task exit error too many errors. Exception: {}".format(
683 e
684 )
685 )
686 raise
687 self.consecutive_errors += 1
688 self.logger.error(
689 "Task kafka_read retrying after Exception {}".format(e)
690 )
691 wait_time = 2 if not self.first_start else 5
692 await asyncio.sleep(wait_time, loop=self.loop)
693
694 # self.logger.debug("Task kafka_read terminating")
695 self.logger.debug("Task kafka_read exit")
696
697 def start(self):
698
699 # check RO version
700 self.loop.run_until_complete(self.check_RO_version())
701
702 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
703 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
704 self.netslice = netslice.NetsliceLcm(
705 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
706 )
707 self.vim = vim_sdn.VimLcm(
708 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
709 )
710 self.wim = vim_sdn.WimLcm(
711 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
712 )
713 self.sdn = vim_sdn.SdnLcm(
714 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
715 )
716 self.k8scluster = vim_sdn.K8sClusterLcm(
717 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
718 )
719 self.vca = vim_sdn.VcaLcm(
720 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
721 )
722 self.k8srepo = vim_sdn.K8sRepoLcm(
723 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
724 )
725
726 self.loop.run_until_complete(
727 asyncio.gather(self.kafka_read(), self.kafka_ping())
728 )
729
730 # TODO
731 # self.logger.debug("Terminating cancelling creation tasks")
732 # self.lcm_tasks.cancel("ALL", "create")
733 # timeout = 200
734 # while self.is_pending_tasks():
735 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
736 # await asyncio.sleep(2, loop=self.loop)
737 # timeout -= 2
738 # if not timeout:
739 # self.lcm_tasks.cancel("ALL", "ALL")
740 self.loop.close()
741 self.loop = None
742 if self.db:
743 self.db.db_disconnect()
744 if self.msg:
745 self.msg.disconnect()
746 if self.msg_admin:
747 self.msg_admin.disconnect()
748 if self.fs:
749 self.fs.fs_disconnect()
750
751 def read_config_file(self, config_file):
752 try:
753 with open(config_file) as f:
754 return yaml.safe_load(f)
755 except Exception as e:
756 self.logger.critical("At config file '{}': {}".format(config_file, e))
757 exit(1)
758
759 @staticmethod
760 def get_process_id():
761 """
762 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
763 will provide a random one
764 :return: Obtained ID
765 """
766 # Try getting docker id. If fails, get pid
767 try:
768 with open("/proc/self/cgroup", "r") as f:
769 text_id_ = f.readline()
770 _, _, text_id = text_id_.rpartition("/")
771 text_id = text_id.replace("\n", "")[:12]
772 if text_id:
773 return text_id
774 except Exception:
775 pass
776 # Return a random id
777 return "".join(random_choice("0123456789abcdef") for _ in range(12))
778
779
780 def usage():
781 print(
782 """Usage: {} [options]
783 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
784 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
785 -h|--help: shows this help
786 """.format(
787 sys.argv[0]
788 )
789 )
790 # --log-socket-host HOST: send logs to this host")
791 # --log-socket-port PORT: send logs using this port (default: 9022)")
792
793
794 if __name__ == "__main__":
795
796 try:
797 # print("SYS.PATH='{}'".format(sys.path))
798 # load parameters and configuration
799 # -h
800 # -c value
801 # --config value
802 # --help
803 # --health-check
804 opts, args = getopt.getopt(
805 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
806 )
807 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
808 config_file = None
809 for o, a in opts:
810 if o in ("-h", "--help"):
811 usage()
812 sys.exit()
813 elif o in ("-c", "--config"):
814 config_file = a
815 elif o == "--health-check":
816 from osm_lcm.lcm_hc import health_check
817
818 health_check(config_file, Lcm.ping_interval_pace)
819 # elif o == "--log-socket-port":
820 # log_socket_port = a
821 # elif o == "--log-socket-host":
822 # log_socket_host = a
823 # elif o == "--log-file":
824 # log_file = a
825 else:
826 assert False, "Unhandled option"
827
828 if config_file:
829 if not path.isfile(config_file):
830 print(
831 "configuration file '{}' does not exist".format(config_file),
832 file=sys.stderr,
833 )
834 exit(1)
835 else:
836 for config_file in (
837 __file__[: __file__.rfind(".")] + ".cfg",
838 "./lcm.cfg",
839 "/etc/osm/lcm.cfg",
840 ):
841 if path.isfile(config_file):
842 break
843 else:
844 print(
845 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
846 file=sys.stderr,
847 )
848 exit(1)
849 lcm = Lcm(config_file)
850 lcm.start()
851 except (LcmException, getopt.GetoptError) as e:
852 print(str(e), file=sys.stderr)
853 # usage()
854 exit(1)