Reformat LCM to standardized format
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = (
62 path.expanduser("~") + "/time_last_ping"
63 ) # TODO find better location for this file
64
65
66 class Lcm:
67
68 ping_interval_pace = (
69 120 # how many time ping is send once is confirmed all is running
70 )
71 ping_interval_boot = 5 # how many time ping is sent when booting
72 cfg_logger_name = {
73 "message": "lcm.msg",
74 "database": "lcm.db",
75 "storage": "lcm.fs",
76 "tsdb": "lcm.prometheus",
77 }
78 # ^ contains for each section at lcm.cfg the used logger name
79
80 def __init__(self, config_file, loop=None):
81 """
82 Init, Connect to database, filesystem storage, and messaging
83 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 :return: None
85 """
86 self.db = None
87 self.msg = None
88 self.msg_admin = None
89 self.fs = None
90 self.pings_not_received = 1
91 self.consecutive_errors = 0
92 self.first_start = False
93
94 # logging
95 self.logger = logging.getLogger("lcm")
96 # get id
97 self.worker_id = self.get_process_id()
98 # load configuration
99 config = self.read_config_file(config_file)
100 self.config = config
101 self.config["ro_config"] = {
102 "ng": config["RO"].get("ng", False),
103 "uri": config["RO"].get("uri"),
104 "tenant": config.get("tenant", "osm"),
105 "logger_name": "lcm.roclient",
106 "loglevel": config["RO"].get("loglevel", "ERROR"),
107 }
108 if not self.config["ro_config"]["uri"]:
109 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
110 config["RO"]["host"], config["RO"]["port"]
111 )
112 elif (
113 "/ro" in self.config["ro_config"]["uri"][-4:]
114 or "/openmano" in self.config["ro_config"]["uri"][-10:]
115 ):
116 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
117 index = self.config["ro_config"]["uri"][-1].rfind("/")
118 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
119
120 self.loop = loop or asyncio.get_event_loop()
121 self.ns = (
122 self.netslice
123 ) = (
124 self.vim
125 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
126
127 # logging
128 log_format_simple = (
129 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
130 )
131 log_formatter_simple = logging.Formatter(
132 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
133 )
134 config["database"]["logger_name"] = "lcm.db"
135 config["storage"]["logger_name"] = "lcm.fs"
136 config["message"]["logger_name"] = "lcm.msg"
137 if config["global"].get("logfile"):
138 file_handler = logging.handlers.RotatingFileHandler(
139 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
140 )
141 file_handler.setFormatter(log_formatter_simple)
142 self.logger.addHandler(file_handler)
143 if not config["global"].get("nologging"):
144 str_handler = logging.StreamHandler()
145 str_handler.setFormatter(log_formatter_simple)
146 self.logger.addHandler(str_handler)
147
148 if config["global"].get("loglevel"):
149 self.logger.setLevel(config["global"]["loglevel"])
150
151 # logging other modules
152 for k1, logname in self.cfg_logger_name.items():
153 config[k1]["logger_name"] = logname
154 logger_module = logging.getLogger(logname)
155 if config[k1].get("logfile"):
156 file_handler = logging.handlers.RotatingFileHandler(
157 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
158 )
159 file_handler.setFormatter(log_formatter_simple)
160 logger_module.addHandler(file_handler)
161 if config[k1].get("loglevel"):
162 logger_module.setLevel(config[k1]["loglevel"])
163 self.logger.critical(
164 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
165 )
166
167 # check version of N2VC
168 # TODO enhance with int conversion or from distutils.version import LooseVersion
169 # or with list(map(int, version.split(".")))
170 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
171 raise LcmException(
172 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
173 n2vc_version, min_n2vc_version
174 )
175 )
176 # check version of common
177 if versiontuple(common_version) < versiontuple(min_common_version):
178 raise LcmException(
179 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
180 common_version, min_common_version
181 )
182 )
183
184 try:
185 self.db = Database(config).instance.db
186
187 self.fs = Filesystem(config).instance.fs
188
189 # copy message configuration in order to remove 'group_id' for msg_admin
190 config_message = config["message"].copy()
191 config_message["loop"] = self.loop
192 if config_message["driver"] == "local":
193 self.msg = msglocal.MsgLocal()
194 self.msg.connect(config_message)
195 self.msg_admin = msglocal.MsgLocal()
196 config_message.pop("group_id", None)
197 self.msg_admin.connect(config_message)
198 elif config_message["driver"] == "kafka":
199 self.msg = msgkafka.MsgKafka()
200 self.msg.connect(config_message)
201 self.msg_admin = msgkafka.MsgKafka()
202 config_message.pop("group_id", None)
203 self.msg_admin.connect(config_message)
204 else:
205 raise LcmException(
206 "Invalid configuration param '{}' at '[message]':'driver'".format(
207 config["message"]["driver"]
208 )
209 )
210 except (DbException, FsException, MsgException) as e:
211 self.logger.critical(str(e), exc_info=True)
212 raise LcmException(str(e))
213
214 # contains created tasks/futures to be able to cancel
215 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
216
217 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
218 if self.config["tsdb"]["driver"] == "prometheus":
219 self.prometheus = prometheus.Prometheus(
220 self.config["tsdb"], self.worker_id, self.loop
221 )
222 else:
223 raise LcmException(
224 "Invalid configuration param '{}' at '[tsdb]':'driver'".format(
225 config["tsdb"]["driver"]
226 )
227 )
228 else:
229 self.prometheus = None
230
231 async def check_RO_version(self):
232 tries = 14
233 last_error = None
234 while True:
235 ro_uri = self.config["ro_config"]["uri"]
236 try:
237 # try new RO, if fail old RO
238 try:
239 self.config["ro_config"]["uri"] = ro_uri + "ro"
240 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
241 ro_version = await ro_server.get_version()
242 self.config["ro_config"]["ng"] = True
243 except Exception:
244 self.config["ro_config"]["uri"] = ro_uri + "openmano"
245 ro_server = ROClient(self.loop, **self.config["ro_config"])
246 ro_version = await ro_server.get_version()
247 self.config["ro_config"]["ng"] = False
248 if versiontuple(ro_version) < versiontuple(min_RO_version):
249 raise LcmException(
250 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
251 ro_version, min_RO_version
252 )
253 )
254 self.logger.info(
255 "Connected to RO version {} new-generation version {}".format(
256 ro_version, self.config["ro_config"]["ng"]
257 )
258 )
259 return
260 except (ROClientException, NgRoException) as e:
261 self.config["ro_config"]["uri"] = ro_uri
262 tries -= 1
263 traceback.print_tb(e.__traceback__)
264 error_text = "Error while connecting to RO on {}: {}".format(
265 self.config["ro_config"]["uri"], e
266 )
267 if tries <= 0:
268 self.logger.critical(error_text)
269 raise LcmException(error_text)
270 if last_error != error_text:
271 last_error = error_text
272 self.logger.error(
273 error_text + ". Waiting until {} seconds".format(5 * tries)
274 )
275 await asyncio.sleep(5)
276
277 async def test(self, param=None):
278 self.logger.debug("Starting/Ending test task: {}".format(param))
279
280 async def kafka_ping(self):
281 self.logger.debug("Task kafka_ping Enter")
282 consecutive_errors = 0
283 first_start = True
284 kafka_has_received = False
285 self.pings_not_received = 1
286 while True:
287 try:
288 await self.msg_admin.aiowrite(
289 "admin",
290 "ping",
291 {
292 "from": "lcm",
293 "to": "lcm",
294 "worker_id": self.worker_id,
295 "version": lcm_version,
296 },
297 self.loop,
298 )
299 # time between pings are low when it is not received and at starting
300 wait_time = (
301 self.ping_interval_boot
302 if not kafka_has_received
303 else self.ping_interval_pace
304 )
305 if not self.pings_not_received:
306 kafka_has_received = True
307 self.pings_not_received += 1
308 await asyncio.sleep(wait_time, loop=self.loop)
309 if self.pings_not_received > 10:
310 raise LcmException("It is not receiving pings from Kafka bus")
311 consecutive_errors = 0
312 first_start = False
313 except LcmException:
314 raise
315 except Exception as e:
316 # if not first_start is the first time after starting. So leave more time and wait
317 # to allow kafka starts
318 if consecutive_errors == 8 if not first_start else 30:
319 self.logger.error(
320 "Task kafka_read task exit error too many errors. Exception: {}".format(
321 e
322 )
323 )
324 raise
325 consecutive_errors += 1
326 self.logger.error(
327 "Task kafka_read retrying after Exception {}".format(e)
328 )
329 wait_time = 2 if not first_start else 5
330 await asyncio.sleep(wait_time, loop=self.loop)
331
332 def kafka_read_callback(self, topic, command, params):
333 order_id = 1
334
335 if topic != "admin" and command != "ping":
336 self.logger.debug(
337 "Task kafka_read receives {} {}: {}".format(topic, command, params)
338 )
339 self.consecutive_errors = 0
340 self.first_start = False
341 order_id += 1
342 if command == "exit":
343 raise LcmExceptionExit
344 elif command.startswith("#"):
345 return
346 elif command == "echo":
347 # just for test
348 print(params)
349 sys.stdout.flush()
350 return
351 elif command == "test":
352 asyncio.Task(self.test(params), loop=self.loop)
353 return
354
355 if topic == "admin":
356 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
357 if params.get("worker_id") != self.worker_id:
358 return
359 self.pings_not_received = 0
360 try:
361 with open(health_check_file, "w") as f:
362 f.write(str(time()))
363 except Exception as e:
364 self.logger.error(
365 "Cannot write into '{}' for healthcheck: {}".format(
366 health_check_file, e
367 )
368 )
369 return
370 elif topic == "pla":
371 if command == "placement":
372 self.ns.update_nsrs_with_pla_result(params)
373 return
374 elif topic == "k8scluster":
375 if command == "create" or command == "created":
376 k8scluster_id = params.get("_id")
377 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
378 self.lcm_tasks.register(
379 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
380 )
381 return
382 elif command == "delete" or command == "deleted":
383 k8scluster_id = params.get("_id")
384 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
385 self.lcm_tasks.register(
386 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
387 )
388 return
389 elif topic == "vca":
390 if command == "create" or command == "created":
391 vca_id = params.get("_id")
392 task = asyncio.ensure_future(self.vca.create(params, order_id))
393 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
394 return
395 elif command == "delete" or command == "deleted":
396 vca_id = params.get("_id")
397 task = asyncio.ensure_future(self.vca.delete(params, order_id))
398 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
399 return
400 elif topic == "k8srepo":
401 if command == "create" or command == "created":
402 k8srepo_id = params.get("_id")
403 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
404 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
405 self.lcm_tasks.register(
406 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
407 )
408 return
409 elif command == "delete" or command == "deleted":
410 k8srepo_id = params.get("_id")
411 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
412 self.lcm_tasks.register(
413 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
414 )
415 return
416 elif topic == "ns":
417 if command == "instantiate":
418 # self.logger.debug("Deploying NS {}".format(nsr_id))
419 nslcmop = params
420 nslcmop_id = nslcmop["_id"]
421 nsr_id = nslcmop["nsInstanceId"]
422 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
423 self.lcm_tasks.register(
424 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
425 )
426 return
427 elif command == "terminate":
428 # self.logger.debug("Deleting NS {}".format(nsr_id))
429 nslcmop = params
430 nslcmop_id = nslcmop["_id"]
431 nsr_id = nslcmop["nsInstanceId"]
432 self.lcm_tasks.cancel(topic, nsr_id)
433 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
434 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
435 return
436 elif command == "vca_status_refresh":
437 nslcmop = params
438 nslcmop_id = nslcmop["_id"]
439 nsr_id = nslcmop["nsInstanceId"]
440 task = asyncio.ensure_future(
441 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
442 )
443 self.lcm_tasks.register(
444 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
445 )
446 return
447 elif command == "action":
448 # self.logger.debug("Update NS {}".format(nsr_id))
449 nslcmop = params
450 nslcmop_id = nslcmop["_id"]
451 nsr_id = nslcmop["nsInstanceId"]
452 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
453 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
454 return
455 elif command == "scale":
456 # self.logger.debug("Update NS {}".format(nsr_id))
457 nslcmop = params
458 nslcmop_id = nslcmop["_id"]
459 nsr_id = nslcmop["nsInstanceId"]
460 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
461 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
462 return
463 elif command == "show":
464 nsr_id = params
465 try:
466 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
467 print(
468 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
469 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
470 "".format(
471 nsr_id,
472 db_nsr["operational-status"],
473 db_nsr["config-status"],
474 db_nsr["detailed-status"],
475 db_nsr["_admin"]["deployed"],
476 self.lcm_ns_tasks.get(nsr_id),
477 )
478 )
479 except Exception as e:
480 print("nsr {} not found: {}".format(nsr_id, e))
481 sys.stdout.flush()
482 return
483 elif command == "deleted":
484 return # TODO cleaning of task just in case should be done
485 elif command in (
486 "terminated",
487 "instantiated",
488 "scaled",
489 "actioned",
490 ): # "scaled-cooldown-time"
491 return
492 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
493 if command == "instantiate":
494 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
495 nsilcmop = params
496 nsilcmop_id = nsilcmop["_id"] # slice operation id
497 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
498 task = asyncio.ensure_future(
499 self.netslice.instantiate(nsir_id, nsilcmop_id)
500 )
501 self.lcm_tasks.register(
502 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
503 )
504 return
505 elif command == "terminate":
506 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
507 nsilcmop = params
508 nsilcmop_id = nsilcmop["_id"] # slice operation id
509 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
510 self.lcm_tasks.cancel(topic, nsir_id)
511 task = asyncio.ensure_future(
512 self.netslice.terminate(nsir_id, nsilcmop_id)
513 )
514 self.lcm_tasks.register(
515 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
516 )
517 return
518 elif command == "show":
519 nsir_id = params
520 try:
521 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
522 print(
523 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
524 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
525 "".format(
526 nsir_id,
527 db_nsir["operational-status"],
528 db_nsir["config-status"],
529 db_nsir["detailed-status"],
530 db_nsir["_admin"]["deployed"],
531 self.lcm_netslice_tasks.get(nsir_id),
532 )
533 )
534 except Exception as e:
535 print("nsir {} not found: {}".format(nsir_id, e))
536 sys.stdout.flush()
537 return
538 elif command == "deleted":
539 return # TODO cleaning of task just in case should be done
540 elif command in (
541 "terminated",
542 "instantiated",
543 "scaled",
544 "actioned",
545 ): # "scaled-cooldown-time"
546 return
547 elif topic == "vim_account":
548 vim_id = params["_id"]
549 if command in ("create", "created"):
550 if not self.config["ro_config"].get("ng"):
551 task = asyncio.ensure_future(self.vim.create(params, order_id))
552 self.lcm_tasks.register(
553 "vim_account", vim_id, order_id, "vim_create", task
554 )
555 return
556 elif command == "delete" or command == "deleted":
557 self.lcm_tasks.cancel(topic, vim_id)
558 task = asyncio.ensure_future(self.vim.delete(params, order_id))
559 self.lcm_tasks.register(
560 "vim_account", vim_id, order_id, "vim_delete", task
561 )
562 return
563 elif command == "show":
564 print("not implemented show with vim_account")
565 sys.stdout.flush()
566 return
567 elif command in ("edit", "edited"):
568 if not self.config["ro_config"].get("ng"):
569 task = asyncio.ensure_future(self.vim.edit(params, order_id))
570 self.lcm_tasks.register(
571 "vim_account", vim_id, order_id, "vim_edit", task
572 )
573 return
574 elif command == "deleted":
575 return # TODO cleaning of task just in case should be done
576 elif topic == "wim_account":
577 wim_id = params["_id"]
578 if command in ("create", "created"):
579 if not self.config["ro_config"].get("ng"):
580 task = asyncio.ensure_future(self.wim.create(params, order_id))
581 self.lcm_tasks.register(
582 "wim_account", wim_id, order_id, "wim_create", task
583 )
584 return
585 elif command == "delete" or command == "deleted":
586 self.lcm_tasks.cancel(topic, wim_id)
587 task = asyncio.ensure_future(self.wim.delete(params, order_id))
588 self.lcm_tasks.register(
589 "wim_account", wim_id, order_id, "wim_delete", task
590 )
591 return
592 elif command == "show":
593 print("not implemented show with wim_account")
594 sys.stdout.flush()
595 return
596 elif command in ("edit", "edited"):
597 task = asyncio.ensure_future(self.wim.edit(params, order_id))
598 self.lcm_tasks.register(
599 "wim_account", wim_id, order_id, "wim_edit", task
600 )
601 return
602 elif command == "deleted":
603 return # TODO cleaning of task just in case should be done
604 elif topic == "sdn":
605 _sdn_id = params["_id"]
606 if command in ("create", "created"):
607 if not self.config["ro_config"].get("ng"):
608 task = asyncio.ensure_future(self.sdn.create(params, order_id))
609 self.lcm_tasks.register(
610 "sdn", _sdn_id, order_id, "sdn_create", task
611 )
612 return
613 elif command == "delete" or command == "deleted":
614 self.lcm_tasks.cancel(topic, _sdn_id)
615 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
616 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
617 return
618 elif command in ("edit", "edited"):
619 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
620 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
621 return
622 elif command == "deleted":
623 return # TODO cleaning of task just in case should be done
624 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
625
626 async def kafka_read(self):
627 self.logger.debug(
628 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
629 )
630 # future = asyncio.Future()
631 self.consecutive_errors = 0
632 self.first_start = True
633 while self.consecutive_errors < 10:
634 try:
635 topics = (
636 "ns",
637 "vim_account",
638 "wim_account",
639 "sdn",
640 "nsi",
641 "k8scluster",
642 "vca",
643 "k8srepo",
644 "pla",
645 )
646 topics_admin = ("admin",)
647 await asyncio.gather(
648 self.msg.aioread(
649 topics, self.loop, self.kafka_read_callback, from_beginning=True
650 ),
651 self.msg_admin.aioread(
652 topics_admin,
653 self.loop,
654 self.kafka_read_callback,
655 group_id=False,
656 ),
657 )
658
659 except LcmExceptionExit:
660 self.logger.debug("Bye!")
661 break
662 except Exception as e:
663 # if not first_start is the first time after starting. So leave more time and wait
664 # to allow kafka starts
665 if self.consecutive_errors == 8 if not self.first_start else 30:
666 self.logger.error(
667 "Task kafka_read task exit error too many errors. Exception: {}".format(
668 e
669 )
670 )
671 raise
672 self.consecutive_errors += 1
673 self.logger.error(
674 "Task kafka_read retrying after Exception {}".format(e)
675 )
676 wait_time = 2 if not self.first_start else 5
677 await asyncio.sleep(wait_time, loop=self.loop)
678
679 # self.logger.debug("Task kafka_read terminating")
680 self.logger.debug("Task kafka_read exit")
681
682 def start(self):
683
684 # check RO version
685 self.loop.run_until_complete(self.check_RO_version())
686
687 self.ns = ns.NsLcm(
688 self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus
689 )
690 self.netslice = netslice.NetsliceLcm(
691 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
692 )
693 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
694 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
695 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
696 self.k8scluster = vim_sdn.K8sClusterLcm(
697 self.msg, self.lcm_tasks, self.config, self.loop
698 )
699 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
700 self.k8srepo = vim_sdn.K8sRepoLcm(
701 self.msg, self.lcm_tasks, self.config, self.loop
702 )
703
704 # configure tsdb prometheus
705 if self.prometheus:
706 self.loop.run_until_complete(self.prometheus.start())
707
708 self.loop.run_until_complete(
709 asyncio.gather(self.kafka_read(), self.kafka_ping())
710 )
711 # TODO
712 # self.logger.debug("Terminating cancelling creation tasks")
713 # self.lcm_tasks.cancel("ALL", "create")
714 # timeout = 200
715 # while self.is_pending_tasks():
716 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
717 # await asyncio.sleep(2, loop=self.loop)
718 # timeout -= 2
719 # if not timeout:
720 # self.lcm_tasks.cancel("ALL", "ALL")
721 self.loop.close()
722 self.loop = None
723 if self.db:
724 self.db.db_disconnect()
725 if self.msg:
726 self.msg.disconnect()
727 if self.msg_admin:
728 self.msg_admin.disconnect()
729 if self.fs:
730 self.fs.fs_disconnect()
731
732 def read_config_file(self, config_file):
733 # TODO make a [ini] + yaml inside parser
734 # the configparser library is not suitable, because it does not admit comments at the end of line,
735 # and not parse integer or boolean
736 try:
737 # read file as yaml format
738 with open(config_file) as f:
739 conf = yaml.load(f, Loader=yaml.Loader)
740 # Ensure all sections are not empty
741 for k in (
742 "global",
743 "timeout",
744 "RO",
745 "VCA",
746 "database",
747 "storage",
748 "message",
749 ):
750 if not conf.get(k):
751 conf[k] = {}
752
753 # read all environ that starts with OSMLCM_
754 for k, v in environ.items():
755 if not k.startswith("OSMLCM_"):
756 continue
757 subject, _, item = k[7:].lower().partition("_")
758 if not item:
759 continue
760 if subject in ("ro", "vca"):
761 # put in capital letter
762 subject = subject.upper()
763 try:
764 if item == "port" or subject == "timeout":
765 conf[subject][item] = int(v)
766 else:
767 conf[subject][item] = v
768 except Exception as e:
769 self.logger.warning(
770 "skipping environ '{}' on exception '{}'".format(k, e)
771 )
772
773 # backward compatibility of VCA parameters
774
775 if "pubkey" in conf["VCA"]:
776 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
777 if "cacert" in conf["VCA"]:
778 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
779 if "apiproxy" in conf["VCA"]:
780 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
781
782 if "enableosupgrade" in conf["VCA"]:
783 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
784 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
785 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
786 conf["VCA"]["enable_os_upgrade"] = False
787 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
788 conf["VCA"]["enable_os_upgrade"] = True
789
790 if "aptmirror" in conf["VCA"]:
791 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
792
793 return conf
794 except Exception as e:
795 self.logger.critical("At config file '{}': {}".format(config_file, e))
796 exit(1)
797
798 @staticmethod
799 def get_process_id():
800 """
801 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
802 will provide a random one
803 :return: Obtained ID
804 """
805 # Try getting docker id. If fails, get pid
806 try:
807 with open("/proc/self/cgroup", "r") as f:
808 text_id_ = f.readline()
809 _, _, text_id = text_id_.rpartition("/")
810 text_id = text_id.replace("\n", "")[:12]
811 if text_id:
812 return text_id
813 except Exception:
814 pass
815 # Return a random id
816 return "".join(random_choice("0123456789abcdef") for _ in range(12))
817
818
819 def usage():
820 print(
821 """Usage: {} [options]
822 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
823 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
824 -h|--help: shows this help
825 """.format(
826 sys.argv[0]
827 )
828 )
829 # --log-socket-host HOST: send logs to this host")
830 # --log-socket-port PORT: send logs using this port (default: 9022)")
831
832
833 if __name__ == "__main__":
834
835 try:
836 # print("SYS.PATH='{}'".format(sys.path))
837 # load parameters and configuration
838 # -h
839 # -c value
840 # --config value
841 # --help
842 # --health-check
843 opts, args = getopt.getopt(
844 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
845 )
846 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
847 config_file = None
848 for o, a in opts:
849 if o in ("-h", "--help"):
850 usage()
851 sys.exit()
852 elif o in ("-c", "--config"):
853 config_file = a
854 elif o == "--health-check":
855 from osm_lcm.lcm_hc import health_check
856
857 health_check(health_check_file, Lcm.ping_interval_pace)
858 # elif o == "--log-socket-port":
859 # log_socket_port = a
860 # elif o == "--log-socket-host":
861 # log_socket_host = a
862 # elif o == "--log-file":
863 # log_file = a
864 else:
865 assert False, "Unhandled option"
866
867 if config_file:
868 if not path.isfile(config_file):
869 print(
870 "configuration file '{}' does not exist".format(config_file),
871 file=sys.stderr,
872 )
873 exit(1)
874 else:
875 for config_file in (
876 __file__[: __file__.rfind(".")] + ".cfg",
877 "./lcm.cfg",
878 "/etc/osm/lcm.cfg",
879 ):
880 if path.isfile(config_file):
881 break
882 else:
883 print(
884 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
885 file=sys.stderr,
886 )
887 exit(1)
888 lcm = Lcm(config_file)
889 lcm.start()
890 except (LcmException, getopt.GetoptError) as e:
891 print(str(e), file=sys.stderr)
892 # usage()
893 exit(1)