Feature 10929: LCM saga, Milestone 1.
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31 import configparser
32
33 from osm_lcm import ns, vim_sdn, netslice
34 from osm_lcm.ng_ro import NgRoException, NgRoClient
35 from osm_lcm.ROclient import ROClient, ROClientException
36
37 from time import time
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48 from osm_lcm.data_utils.lcm_config import LcmCfg
49 from osm_lcm.lcm_hc import get_health_check_file
50 from os import path
51 from random import choice as random_choice
52 from n2vc import version as n2vc_version
53 import traceback
54
55 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
56 pdb.set_trace()
57
58
59 __author__ = "Alfonso Tierno"
60 min_RO_version = "6.0.2"
61 min_n2vc_version = "0.0.2"
62
63 min_common_version = "0.1.19"
64
65
66 class Lcm:
67
68 ping_interval_pace = (
69 120 # how many time ping is send once is confirmed all is running
70 )
71 ping_interval_boot = 5 # how many time ping is sent when booting
72
73 main_config = LcmCfg()
74
75 def __init__(self, config_file, loop=None):
76 """
77 Init, Connect to database, filesystem storage, and messaging
78 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
79 :return: None
80 """
81 self.db = None
82 self.msg = None
83 self.msg_admin = None
84 self.fs = None
85 self.pings_not_received = 1
86 self.consecutive_errors = 0
87 self.first_start = False
88
89 # logging
90 self.logger = logging.getLogger("lcm")
91 # get id
92 self.worker_id = self.get_process_id()
93 # load configuration
94 config = self.read_config_file(config_file)
95 self.main_config.set_from_dict(config)
96 self.main_config.transform()
97 self.main_config.load_from_env()
98 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
99 # TODO: check if lcm_hc.py is necessary
100 self.health_check_file = get_health_check_file(self.main_config.to_dict())
101 self.loop = loop or asyncio.get_event_loop()
102 self.ns = (
103 self.netslice
104 ) = (
105 self.vim
106 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
107
108 # logging
109 log_format_simple = (
110 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
111 )
112 log_formatter_simple = logging.Formatter(
113 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
114 )
115 if self.main_config.globalConfig.logfile:
116 file_handler = logging.handlers.RotatingFileHandler(
117 self.main_config.globalConfig.logfile,
118 maxBytes=100e6,
119 backupCount=9,
120 delay=0,
121 )
122 file_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(file_handler)
124 if not self.main_config.globalConfig.to_dict()["nologging"]:
125 str_handler = logging.StreamHandler()
126 str_handler.setFormatter(log_formatter_simple)
127 self.logger.addHandler(str_handler)
128
129 if self.main_config.globalConfig.to_dict()["loglevel"]:
130 self.logger.setLevel(self.main_config.globalConfig.loglevel)
131
132 # logging other modules
133 for logger in ("message", "database", "storage", "tsdb"):
134 logger_config = self.main_config.to_dict()[logger]
135 logger_module = logging.getLogger(logger_config["logger_name"])
136 if logger_config["logfile"]:
137 file_handler = logging.handlers.RotatingFileHandler(
138 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
139 )
140 file_handler.setFormatter(log_formatter_simple)
141 logger_module.addHandler(file_handler)
142 if logger_config["loglevel"]:
143 logger_module.setLevel(logger_config["loglevel"])
144 self.logger.critical(
145 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
146 )
147
148 # check version of N2VC
149 # TODO enhance with int conversion or from distutils.version import LooseVersion
150 # or with list(map(int, version.split(".")))
151 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
152 raise LcmException(
153 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
154 n2vc_version, min_n2vc_version
155 )
156 )
157 # check version of common
158 if versiontuple(common_version) < versiontuple(min_common_version):
159 raise LcmException(
160 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
161 common_version, min_common_version
162 )
163 )
164
165 try:
166 self.db = Database(self.main_config.to_dict()).instance.db
167
168 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
169 self.fs.sync()
170
171 # copy message configuration in order to remove 'group_id' for msg_admin
172 config_message = self.main_config.message.to_dict()
173 config_message["loop"] = self.loop
174 if config_message["driver"] == "local":
175 self.msg = msglocal.MsgLocal()
176 self.msg.connect(config_message)
177 self.msg_admin = msglocal.MsgLocal()
178 config_message.pop("group_id", None)
179 self.msg_admin.connect(config_message)
180 elif config_message["driver"] == "kafka":
181 self.msg = msgkafka.MsgKafka()
182 self.msg.connect(config_message)
183 self.msg_admin = msgkafka.MsgKafka()
184 config_message.pop("group_id", None)
185 self.msg_admin.connect(config_message)
186 else:
187 raise LcmException(
188 "Invalid configuration param '{}' at '[message]':'driver'".format(
189 self.main_config.message.driver
190 )
191 )
192 except (DbException, FsException, MsgException) as e:
193 self.logger.critical(str(e), exc_info=True)
194 raise LcmException(str(e))
195
196 # contains created tasks/futures to be able to cancel
197 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
198
199 async def check_RO_version(self):
200 tries = 14
201 last_error = None
202 while True:
203 ro_uri = self.main_config.RO.uri
204 if not ro_uri:
205 ro_uri = ""
206 try:
207 # try new RO, if fail old RO
208 try:
209 self.main_config.RO.uri = ro_uri + "ro"
210 ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
211 ro_version = await ro_server.get_version()
212 self.main_config.RO.ng = True
213 except Exception:
214 self.main_config.RO.uri = ro_uri + "openmano"
215 ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
216 ro_version = await ro_server.get_version()
217 self.main_config.RO.ng = False
218 if versiontuple(ro_version) < versiontuple(min_RO_version):
219 raise LcmException(
220 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
221 ro_version, min_RO_version
222 )
223 )
224 self.logger.info(
225 "Connected to RO version {} new-generation version {}".format(
226 ro_version, self.main_config.RO.ng
227 )
228 )
229 return
230 except (ROClientException, NgRoException) as e:
231 self.main_config.RO.uri = ro_uri
232 tries -= 1
233 traceback.print_tb(e.__traceback__)
234 error_text = "Error while connecting to RO on {}: {}".format(
235 self.main_config.RO.uri, e
236 )
237 if tries <= 0:
238 self.logger.critical(error_text)
239 raise LcmException(error_text)
240 if last_error != error_text:
241 last_error = error_text
242 self.logger.error(
243 error_text + ". Waiting until {} seconds".format(5 * tries)
244 )
245 await asyncio.sleep(5)
246
247 async def test(self, param=None):
248 self.logger.debug("Starting/Ending test task: {}".format(param))
249
250 async def kafka_ping(self):
251 self.logger.debug("Task kafka_ping Enter")
252 consecutive_errors = 0
253 first_start = True
254 kafka_has_received = False
255 self.pings_not_received = 1
256 while True:
257 try:
258 await self.msg_admin.aiowrite(
259 "admin",
260 "ping",
261 {
262 "from": "lcm",
263 "to": "lcm",
264 "worker_id": self.worker_id,
265 "version": lcm_version,
266 },
267 self.loop,
268 )
269 # time between pings are low when it is not received and at starting
270 wait_time = (
271 self.ping_interval_boot
272 if not kafka_has_received
273 else self.ping_interval_pace
274 )
275 if not self.pings_not_received:
276 kafka_has_received = True
277 self.pings_not_received += 1
278 await asyncio.sleep(wait_time, loop=self.loop)
279 if self.pings_not_received > 10:
280 raise LcmException("It is not receiving pings from Kafka bus")
281 consecutive_errors = 0
282 first_start = False
283 except LcmException:
284 raise
285 except Exception as e:
286 # if not first_start is the first time after starting. So leave more time and wait
287 # to allow kafka starts
288 if consecutive_errors == 8 if not first_start else 30:
289 self.logger.error(
290 "Task kafka_read task exit error too many errors. Exception: {}".format(
291 e
292 )
293 )
294 raise
295 consecutive_errors += 1
296 self.logger.error(
297 "Task kafka_read retrying after Exception {}".format(e)
298 )
299 wait_time = 2 if not first_start else 5
300 await asyncio.sleep(wait_time, loop=self.loop)
301
302 def kafka_read_callback(self, topic, command, params):
303 order_id = 1
304
305 if topic != "admin" and command != "ping":
306 self.logger.debug(
307 "Task kafka_read receives {} {}: {}".format(topic, command, params)
308 )
309 self.consecutive_errors = 0
310 self.first_start = False
311 order_id += 1
312 if command == "exit":
313 raise LcmExceptionExit
314 elif command.startswith("#"):
315 return
316 elif command == "echo":
317 # just for test
318 print(params)
319 sys.stdout.flush()
320 return
321 elif command == "test":
322 asyncio.Task(self.test(params), loop=self.loop)
323 return
324
325 if topic == "admin":
326 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
327 if params.get("worker_id") != self.worker_id:
328 return
329 self.pings_not_received = 0
330 try:
331 with open(self.health_check_file, "w") as f:
332 f.write(str(time()))
333 except Exception as e:
334 self.logger.error(
335 "Cannot write into '{}' for healthcheck: {}".format(
336 self.health_check_file, e
337 )
338 )
339 return
340 elif topic == "pla":
341 if command == "placement":
342 self.ns.update_nsrs_with_pla_result(params)
343 return
344 elif topic == "k8scluster":
345 if command == "create" or command == "created":
346 k8scluster_id = params.get("_id")
347 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
348 self.lcm_tasks.register(
349 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
350 )
351 return
352 elif command == "delete" or command == "deleted":
353 k8scluster_id = params.get("_id")
354 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
355 self.lcm_tasks.register(
356 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
357 )
358 return
359 elif topic == "vca":
360 if command == "create" or command == "created":
361 vca_id = params.get("_id")
362 task = asyncio.ensure_future(self.vca.create(params, order_id))
363 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
364 return
365 elif command == "delete" or command == "deleted":
366 vca_id = params.get("_id")
367 task = asyncio.ensure_future(self.vca.delete(params, order_id))
368 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
369 return
370 elif topic == "k8srepo":
371 if command == "create" or command == "created":
372 k8srepo_id = params.get("_id")
373 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
374 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
375 self.lcm_tasks.register(
376 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
377 )
378 return
379 elif command == "delete" or command == "deleted":
380 k8srepo_id = params.get("_id")
381 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
382 self.lcm_tasks.register(
383 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
384 )
385 return
386 elif topic == "ns":
387 if command == "instantiate":
388 # self.logger.debug("Deploying NS {}".format(nsr_id))
389 nslcmop = params
390 nslcmop_id = nslcmop["_id"]
391 nsr_id = nslcmop["nsInstanceId"]
392 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
393 self.lcm_tasks.register(
394 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
395 )
396 return
397 elif command == "terminate":
398 # self.logger.debug("Deleting NS {}".format(nsr_id))
399 nslcmop = params
400 nslcmop_id = nslcmop["_id"]
401 nsr_id = nslcmop["nsInstanceId"]
402 self.lcm_tasks.cancel(topic, nsr_id)
403 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
404 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
405 return
406 elif command == "vca_status_refresh":
407 nslcmop = params
408 nslcmop_id = nslcmop["_id"]
409 nsr_id = nslcmop["nsInstanceId"]
410 task = asyncio.ensure_future(
411 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
412 )
413 self.lcm_tasks.register(
414 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
415 )
416 return
417 elif command == "action":
418 # self.logger.debug("Update NS {}".format(nsr_id))
419 nslcmop = params
420 nslcmop_id = nslcmop["_id"]
421 nsr_id = nslcmop["nsInstanceId"]
422 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
423 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
424 return
425 elif command == "update":
426 # self.logger.debug("Update NS {}".format(nsr_id))
427 nslcmop = params
428 nslcmop_id = nslcmop["_id"]
429 nsr_id = nslcmop["nsInstanceId"]
430 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
431 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
432 return
433 elif command == "scale":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
440 return
441 elif command == "heal":
442 # self.logger.debug("Healing NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
448 return
449 elif command == "migrate":
450 nslcmop = params
451 nslcmop_id = nslcmop["_id"]
452 nsr_id = nslcmop["nsInstanceId"]
453 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
454 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
455 return
456 elif command == "verticalscale":
457 nslcmop = params
458 nslcmop_id = nslcmop["_id"]
459 nsr_id = nslcmop["nsInstanceId"]
460 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
461 self.logger.debug(
462 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
463 )
464 self.lcm_tasks.register(
465 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
466 )
467 self.logger.debug(
468 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
469 )
470 return
471 elif command == "show":
472 nsr_id = params
473 try:
474 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
475 print(
476 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
477 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
478 "".format(
479 nsr_id,
480 db_nsr["operational-status"],
481 db_nsr["config-status"],
482 db_nsr["detailed-status"],
483 db_nsr["_admin"]["deployed"],
484 self.lcm_ns_tasks.get(nsr_id),
485 )
486 )
487 except Exception as e:
488 print("nsr {} not found: {}".format(nsr_id, e))
489 sys.stdout.flush()
490 return
491 elif command == "deleted":
492 return # TODO cleaning of task just in case should be done
493 elif command in (
494 "vnf_terminated",
495 "policy_updated",
496 "terminated",
497 "instantiated",
498 "scaled",
499 "healed",
500 "actioned",
501 "updated",
502 "migrated",
503 "verticalscaled",
504 ): # "scaled-cooldown-time"
505 return
506
507 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
508 if command == "instantiate":
509 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
510 nsilcmop = params
511 nsilcmop_id = nsilcmop["_id"] # slice operation id
512 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
513 task = asyncio.ensure_future(
514 self.netslice.instantiate(nsir_id, nsilcmop_id)
515 )
516 self.lcm_tasks.register(
517 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
518 )
519 return
520 elif command == "terminate":
521 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
522 nsilcmop = params
523 nsilcmop_id = nsilcmop["_id"] # slice operation id
524 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
525 self.lcm_tasks.cancel(topic, nsir_id)
526 task = asyncio.ensure_future(
527 self.netslice.terminate(nsir_id, nsilcmop_id)
528 )
529 self.lcm_tasks.register(
530 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
531 )
532 return
533 elif command == "show":
534 nsir_id = params
535 try:
536 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
537 print(
538 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
539 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
540 "".format(
541 nsir_id,
542 db_nsir["operational-status"],
543 db_nsir["config-status"],
544 db_nsir["detailed-status"],
545 db_nsir["_admin"]["deployed"],
546 self.lcm_netslice_tasks.get(nsir_id),
547 )
548 )
549 except Exception as e:
550 print("nsir {} not found: {}".format(nsir_id, e))
551 sys.stdout.flush()
552 return
553 elif command == "deleted":
554 return # TODO cleaning of task just in case should be done
555 elif command in (
556 "terminated",
557 "instantiated",
558 "scaled",
559 "healed",
560 "actioned",
561 ): # "scaled-cooldown-time"
562 return
563 elif topic == "vim_account":
564 vim_id = params["_id"]
565 if command in ("create", "created"):
566 if not self.main_config.RO.ng:
567 task = asyncio.ensure_future(self.vim.create(params, order_id))
568 self.lcm_tasks.register(
569 "vim_account", vim_id, order_id, "vim_create", task
570 )
571 return
572 elif command == "delete" or command == "deleted":
573 self.lcm_tasks.cancel(topic, vim_id)
574 task = asyncio.ensure_future(self.vim.delete(params, order_id))
575 self.lcm_tasks.register(
576 "vim_account", vim_id, order_id, "vim_delete", task
577 )
578 return
579 elif command == "show":
580 print("not implemented show with vim_account")
581 sys.stdout.flush()
582 return
583 elif command in ("edit", "edited"):
584 if not self.main_config.RO.ng:
585 task = asyncio.ensure_future(self.vim.edit(params, order_id))
586 self.lcm_tasks.register(
587 "vim_account", vim_id, order_id, "vim_edit", task
588 )
589 return
590 elif command == "deleted":
591 return # TODO cleaning of task just in case should be done
592 elif topic == "wim_account":
593 wim_id = params["_id"]
594 if command in ("create", "created"):
595 if not self.main_config.RO.ng:
596 task = asyncio.ensure_future(self.wim.create(params, order_id))
597 self.lcm_tasks.register(
598 "wim_account", wim_id, order_id, "wim_create", task
599 )
600 return
601 elif command == "delete" or command == "deleted":
602 self.lcm_tasks.cancel(topic, wim_id)
603 task = asyncio.ensure_future(self.wim.delete(params, order_id))
604 self.lcm_tasks.register(
605 "wim_account", wim_id, order_id, "wim_delete", task
606 )
607 return
608 elif command == "show":
609 print("not implemented show with wim_account")
610 sys.stdout.flush()
611 return
612 elif command in ("edit", "edited"):
613 task = asyncio.ensure_future(self.wim.edit(params, order_id))
614 self.lcm_tasks.register(
615 "wim_account", wim_id, order_id, "wim_edit", task
616 )
617 return
618 elif command == "deleted":
619 return # TODO cleaning of task just in case should be done
620 elif topic == "sdn":
621 _sdn_id = params["_id"]
622 if command in ("create", "created"):
623 if not self.main_config.RO.ng:
624 task = asyncio.ensure_future(self.sdn.create(params, order_id))
625 self.lcm_tasks.register(
626 "sdn", _sdn_id, order_id, "sdn_create", task
627 )
628 return
629 elif command == "delete" or command == "deleted":
630 self.lcm_tasks.cancel(topic, _sdn_id)
631 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
632 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
633 return
634 elif command in ("edit", "edited"):
635 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
636 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
637 return
638 elif command == "deleted":
639 return # TODO cleaning of task just in case should be done
640 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
641
642 async def kafka_read(self):
643 self.logger.debug(
644 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
645 )
646 # future = asyncio.Future()
647 self.consecutive_errors = 0
648 self.first_start = True
649 while self.consecutive_errors < 10:
650 try:
651 topics = (
652 "ns",
653 "vim_account",
654 "wim_account",
655 "sdn",
656 "nsi",
657 "k8scluster",
658 "vca",
659 "k8srepo",
660 "pla",
661 )
662 topics_admin = ("admin",)
663 await asyncio.gather(
664 self.msg.aioread(
665 topics, self.loop, self.kafka_read_callback, from_beginning=True
666 ),
667 self.msg_admin.aioread(
668 topics_admin,
669 self.loop,
670 self.kafka_read_callback,
671 group_id=False,
672 ),
673 )
674
675 except LcmExceptionExit:
676 self.logger.debug("Bye!")
677 break
678 except Exception as e:
679 # if not first_start is the first time after starting. So leave more time and wait
680 # to allow kafka starts
681 if self.consecutive_errors == 8 if not self.first_start else 30:
682 self.logger.error(
683 "Task kafka_read task exit error too many errors. Exception: {}".format(
684 e
685 )
686 )
687 raise
688 self.consecutive_errors += 1
689 self.logger.error(
690 "Task kafka_read retrying after Exception {}".format(e)
691 )
692 wait_time = 2 if not self.first_start else 5
693 await asyncio.sleep(wait_time, loop=self.loop)
694
695 # self.logger.debug("Task kafka_read terminating")
696 self.logger.debug("Task kafka_read exit")
697
698 def start(self):
699
700 # check RO version
701 self.loop.run_until_complete(self.check_RO_version())
702
703 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
704 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
705 self.netslice = netslice.NetsliceLcm(
706 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
707 )
708 self.vim = vim_sdn.VimLcm(
709 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
710 )
711 self.wim = vim_sdn.WimLcm(
712 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
713 )
714 self.sdn = vim_sdn.SdnLcm(
715 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
716 )
717 self.k8scluster = vim_sdn.K8sClusterLcm(
718 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
719 )
720 self.vca = vim_sdn.VcaLcm(
721 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
722 )
723 self.k8srepo = vim_sdn.K8sRepoLcm(
724 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
725 )
726
727 self.loop.run_until_complete(
728 asyncio.gather(self.kafka_read(), self.kafka_ping())
729 )
730
731 # TODO
732 # self.logger.debug("Terminating cancelling creation tasks")
733 # self.lcm_tasks.cancel("ALL", "create")
734 # timeout = 200
735 # while self.is_pending_tasks():
736 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
737 # await asyncio.sleep(2, loop=self.loop)
738 # timeout -= 2
739 # if not timeout:
740 # self.lcm_tasks.cancel("ALL", "ALL")
741 self.loop.close()
742 self.loop = None
743 if self.db:
744 self.db.db_disconnect()
745 if self.msg:
746 self.msg.disconnect()
747 if self.msg_admin:
748 self.msg_admin.disconnect()
749 if self.fs:
750 self.fs.fs_disconnect()
751
752 def read_config_file(self, config_file):
753 # TODO make a [ini] + yaml inside parser
754 # the configparser library is not suitable, because it does not admit comments at the end of line,
755 # and not parse integer or boolean
756 conf = {}
757 try:
758 # read file as yaml format
759 config = configparser.ConfigParser(inline_comment_prefixes="#")
760 config.read(config_file)
761 conf = {s: dict(config.items(s)) for s in config.sections()}
762 except Exception as e:
763 self.logger.critical("At config file '{}': {}".format(config_file, e))
764 self.logger.critical("Trying to load config as legacy mode")
765 try:
766 with open(config_file) as f:
767 conf = yaml.safe_load(f)
768 # Ensure all sections are not empty
769 for k in (
770 "global",
771 "timeout",
772 "RO",
773 "VCA",
774 "database",
775 "storage",
776 "message",
777 ):
778 if not conf.get(k):
779 conf[k] = {}
780 except Exception as e:
781 self.logger.critical("At config file '{}': {}".format(config_file, e))
782 exit(1)
783 return conf
784
785 @staticmethod
786 def get_process_id():
787 """
788 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
789 will provide a random one
790 :return: Obtained ID
791 """
792 # Try getting docker id. If fails, get pid
793 try:
794 with open("/proc/self/cgroup", "r") as f:
795 text_id_ = f.readline()
796 _, _, text_id = text_id_.rpartition("/")
797 text_id = text_id.replace("\n", "")[:12]
798 if text_id:
799 return text_id
800 except Exception:
801 pass
802 # Return a random id
803 return "".join(random_choice("0123456789abcdef") for _ in range(12))
804
805
806 def usage():
807 print(
808 """Usage: {} [options]
809 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
810 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
811 -h|--help: shows this help
812 """.format(
813 sys.argv[0]
814 )
815 )
816 # --log-socket-host HOST: send logs to this host")
817 # --log-socket-port PORT: send logs using this port (default: 9022)")
818
819
820 if __name__ == "__main__":
821
822 try:
823 # print("SYS.PATH='{}'".format(sys.path))
824 # load parameters and configuration
825 # -h
826 # -c value
827 # --config value
828 # --help
829 # --health-check
830 opts, args = getopt.getopt(
831 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
832 )
833 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
834 config_file = None
835 for o, a in opts:
836 if o in ("-h", "--help"):
837 usage()
838 sys.exit()
839 elif o in ("-c", "--config"):
840 config_file = a
841 elif o == "--health-check":
842 from osm_lcm.lcm_hc import health_check
843
844 health_check(config_file, Lcm.ping_interval_pace)
845 # elif o == "--log-socket-port":
846 # log_socket_port = a
847 # elif o == "--log-socket-host":
848 # log_socket_host = a
849 # elif o == "--log-file":
850 # log_file = a
851 else:
852 assert False, "Unhandled option"
853
854 if config_file:
855 if not path.isfile(config_file):
856 print(
857 "configuration file '{}' does not exist".format(config_file),
858 file=sys.stderr,
859 )
860 exit(1)
861 else:
862 for config_file in (
863 __file__[: __file__.rfind(".")] + ".cfg",
864 "./lcm.cfg",
865 "/etc/osm/lcm.cfg",
866 ):
867 if path.isfile(config_file):
868 break
869 else:
870 print(
871 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
872 file=sys.stderr,
873 )
874 exit(1)
875 lcm = Lcm(config_file)
876 lcm.start()
877 except (LcmException, getopt.GetoptError) as e:
878 print(str(e), file=sys.stderr)
879 # usage()
880 exit(1)