Fix bug 1606 - Adding fs.sync() at startup
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = (
62 path.expanduser("~") + "/time_last_ping"
63 ) # TODO find better location for this file
64
65
66 class Lcm:
67
68 ping_interval_pace = (
69 120 # how many time ping is send once is confirmed all is running
70 )
71 ping_interval_boot = 5 # how many time ping is sent when booting
72 cfg_logger_name = {
73 "message": "lcm.msg",
74 "database": "lcm.db",
75 "storage": "lcm.fs",
76 "tsdb": "lcm.prometheus",
77 }
78 # ^ contains for each section at lcm.cfg the used logger name
79
80 def __init__(self, config_file, loop=None):
81 """
82 Init, Connect to database, filesystem storage, and messaging
83 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 :return: None
85 """
86 self.db = None
87 self.msg = None
88 self.msg_admin = None
89 self.fs = None
90 self.pings_not_received = 1
91 self.consecutive_errors = 0
92 self.first_start = False
93
94 # logging
95 self.logger = logging.getLogger("lcm")
96 # get id
97 self.worker_id = self.get_process_id()
98 # load configuration
99 config = self.read_config_file(config_file)
100 self.config = config
101 self.config["ro_config"] = {
102 "ng": config["RO"].get("ng", False),
103 "uri": config["RO"].get("uri"),
104 "tenant": config.get("tenant", "osm"),
105 "logger_name": "lcm.roclient",
106 "loglevel": config["RO"].get("loglevel", "ERROR"),
107 }
108 if not self.config["ro_config"]["uri"]:
109 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
110 config["RO"]["host"], config["RO"]["port"]
111 )
112 elif (
113 "/ro" in self.config["ro_config"]["uri"][-4:]
114 or "/openmano" in self.config["ro_config"]["uri"][-10:]
115 ):
116 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
117 index = self.config["ro_config"]["uri"][-1].rfind("/")
118 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
119
120 self.loop = loop or asyncio.get_event_loop()
121 self.ns = (
122 self.netslice
123 ) = (
124 self.vim
125 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
126
127 # logging
128 log_format_simple = (
129 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
130 )
131 log_formatter_simple = logging.Formatter(
132 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
133 )
134 config["database"]["logger_name"] = "lcm.db"
135 config["storage"]["logger_name"] = "lcm.fs"
136 config["message"]["logger_name"] = "lcm.msg"
137 if config["global"].get("logfile"):
138 file_handler = logging.handlers.RotatingFileHandler(
139 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
140 )
141 file_handler.setFormatter(log_formatter_simple)
142 self.logger.addHandler(file_handler)
143 if not config["global"].get("nologging"):
144 str_handler = logging.StreamHandler()
145 str_handler.setFormatter(log_formatter_simple)
146 self.logger.addHandler(str_handler)
147
148 if config["global"].get("loglevel"):
149 self.logger.setLevel(config["global"]["loglevel"])
150
151 # logging other modules
152 for k1, logname in self.cfg_logger_name.items():
153 config[k1]["logger_name"] = logname
154 logger_module = logging.getLogger(logname)
155 if config[k1].get("logfile"):
156 file_handler = logging.handlers.RotatingFileHandler(
157 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
158 )
159 file_handler.setFormatter(log_formatter_simple)
160 logger_module.addHandler(file_handler)
161 if config[k1].get("loglevel"):
162 logger_module.setLevel(config[k1]["loglevel"])
163 self.logger.critical(
164 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
165 )
166
167 # check version of N2VC
168 # TODO enhance with int conversion or from distutils.version import LooseVersion
169 # or with list(map(int, version.split(".")))
170 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
171 raise LcmException(
172 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
173 n2vc_version, min_n2vc_version
174 )
175 )
176 # check version of common
177 if versiontuple(common_version) < versiontuple(min_common_version):
178 raise LcmException(
179 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
180 common_version, min_common_version
181 )
182 )
183
184 try:
185 self.db = Database(config).instance.db
186
187 self.fs = Filesystem(config).instance.fs
188 self.fs.sync()
189
190 # copy message configuration in order to remove 'group_id' for msg_admin
191 config_message = config["message"].copy()
192 config_message["loop"] = self.loop
193 if config_message["driver"] == "local":
194 self.msg = msglocal.MsgLocal()
195 self.msg.connect(config_message)
196 self.msg_admin = msglocal.MsgLocal()
197 config_message.pop("group_id", None)
198 self.msg_admin.connect(config_message)
199 elif config_message["driver"] == "kafka":
200 self.msg = msgkafka.MsgKafka()
201 self.msg.connect(config_message)
202 self.msg_admin = msgkafka.MsgKafka()
203 config_message.pop("group_id", None)
204 self.msg_admin.connect(config_message)
205 else:
206 raise LcmException(
207 "Invalid configuration param '{}' at '[message]':'driver'".format(
208 config["message"]["driver"]
209 )
210 )
211 except (DbException, FsException, MsgException) as e:
212 self.logger.critical(str(e), exc_info=True)
213 raise LcmException(str(e))
214
215 # contains created tasks/futures to be able to cancel
216 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
217
218 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
219 if self.config["tsdb"]["driver"] == "prometheus":
220 self.prometheus = prometheus.Prometheus(
221 self.config["tsdb"], self.worker_id, self.loop
222 )
223 else:
224 raise LcmException(
225 "Invalid configuration param '{}' at '[tsdb]':'driver'".format(
226 config["tsdb"]["driver"]
227 )
228 )
229 else:
230 self.prometheus = None
231
232 async def check_RO_version(self):
233 tries = 14
234 last_error = None
235 while True:
236 ro_uri = self.config["ro_config"]["uri"]
237 try:
238 # try new RO, if fail old RO
239 try:
240 self.config["ro_config"]["uri"] = ro_uri + "ro"
241 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
242 ro_version = await ro_server.get_version()
243 self.config["ro_config"]["ng"] = True
244 except Exception:
245 self.config["ro_config"]["uri"] = ro_uri + "openmano"
246 ro_server = ROClient(self.loop, **self.config["ro_config"])
247 ro_version = await ro_server.get_version()
248 self.config["ro_config"]["ng"] = False
249 if versiontuple(ro_version) < versiontuple(min_RO_version):
250 raise LcmException(
251 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
252 ro_version, min_RO_version
253 )
254 )
255 self.logger.info(
256 "Connected to RO version {} new-generation version {}".format(
257 ro_version, self.config["ro_config"]["ng"]
258 )
259 )
260 return
261 except (ROClientException, NgRoException) as e:
262 self.config["ro_config"]["uri"] = ro_uri
263 tries -= 1
264 traceback.print_tb(e.__traceback__)
265 error_text = "Error while connecting to RO on {}: {}".format(
266 self.config["ro_config"]["uri"], e
267 )
268 if tries <= 0:
269 self.logger.critical(error_text)
270 raise LcmException(error_text)
271 if last_error != error_text:
272 last_error = error_text
273 self.logger.error(
274 error_text + ". Waiting until {} seconds".format(5 * tries)
275 )
276 await asyncio.sleep(5)
277
278 async def test(self, param=None):
279 self.logger.debug("Starting/Ending test task: {}".format(param))
280
281 async def kafka_ping(self):
282 self.logger.debug("Task kafka_ping Enter")
283 consecutive_errors = 0
284 first_start = True
285 kafka_has_received = False
286 self.pings_not_received = 1
287 while True:
288 try:
289 await self.msg_admin.aiowrite(
290 "admin",
291 "ping",
292 {
293 "from": "lcm",
294 "to": "lcm",
295 "worker_id": self.worker_id,
296 "version": lcm_version,
297 },
298 self.loop,
299 )
300 # time between pings are low when it is not received and at starting
301 wait_time = (
302 self.ping_interval_boot
303 if not kafka_has_received
304 else self.ping_interval_pace
305 )
306 if not self.pings_not_received:
307 kafka_has_received = True
308 self.pings_not_received += 1
309 await asyncio.sleep(wait_time, loop=self.loop)
310 if self.pings_not_received > 10:
311 raise LcmException("It is not receiving pings from Kafka bus")
312 consecutive_errors = 0
313 first_start = False
314 except LcmException:
315 raise
316 except Exception as e:
317 # if not first_start is the first time after starting. So leave more time and wait
318 # to allow kafka starts
319 if consecutive_errors == 8 if not first_start else 30:
320 self.logger.error(
321 "Task kafka_read task exit error too many errors. Exception: {}".format(
322 e
323 )
324 )
325 raise
326 consecutive_errors += 1
327 self.logger.error(
328 "Task kafka_read retrying after Exception {}".format(e)
329 )
330 wait_time = 2 if not first_start else 5
331 await asyncio.sleep(wait_time, loop=self.loop)
332
333 def kafka_read_callback(self, topic, command, params):
334 order_id = 1
335
336 if topic != "admin" and command != "ping":
337 self.logger.debug(
338 "Task kafka_read receives {} {}: {}".format(topic, command, params)
339 )
340 self.consecutive_errors = 0
341 self.first_start = False
342 order_id += 1
343 if command == "exit":
344 raise LcmExceptionExit
345 elif command.startswith("#"):
346 return
347 elif command == "echo":
348 # just for test
349 print(params)
350 sys.stdout.flush()
351 return
352 elif command == "test":
353 asyncio.Task(self.test(params), loop=self.loop)
354 return
355
356 if topic == "admin":
357 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
358 if params.get("worker_id") != self.worker_id:
359 return
360 self.pings_not_received = 0
361 try:
362 with open(health_check_file, "w") as f:
363 f.write(str(time()))
364 except Exception as e:
365 self.logger.error(
366 "Cannot write into '{}' for healthcheck: {}".format(
367 health_check_file, e
368 )
369 )
370 return
371 elif topic == "pla":
372 if command == "placement":
373 self.ns.update_nsrs_with_pla_result(params)
374 return
375 elif topic == "k8scluster":
376 if command == "create" or command == "created":
377 k8scluster_id = params.get("_id")
378 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
379 self.lcm_tasks.register(
380 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
381 )
382 return
383 elif command == "delete" or command == "deleted":
384 k8scluster_id = params.get("_id")
385 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
386 self.lcm_tasks.register(
387 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
388 )
389 return
390 elif topic == "vca":
391 if command == "create" or command == "created":
392 vca_id = params.get("_id")
393 task = asyncio.ensure_future(self.vca.create(params, order_id))
394 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
395 return
396 elif command == "delete" or command == "deleted":
397 vca_id = params.get("_id")
398 task = asyncio.ensure_future(self.vca.delete(params, order_id))
399 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
400 return
401 elif topic == "k8srepo":
402 if command == "create" or command == "created":
403 k8srepo_id = params.get("_id")
404 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
405 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
406 self.lcm_tasks.register(
407 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
408 )
409 return
410 elif command == "delete" or command == "deleted":
411 k8srepo_id = params.get("_id")
412 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
413 self.lcm_tasks.register(
414 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
415 )
416 return
417 elif topic == "ns":
418 if command == "instantiate":
419 # self.logger.debug("Deploying NS {}".format(nsr_id))
420 nslcmop = params
421 nslcmop_id = nslcmop["_id"]
422 nsr_id = nslcmop["nsInstanceId"]
423 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
424 self.lcm_tasks.register(
425 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
426 )
427 return
428 elif command == "terminate":
429 # self.logger.debug("Deleting NS {}".format(nsr_id))
430 nslcmop = params
431 nslcmop_id = nslcmop["_id"]
432 nsr_id = nslcmop["nsInstanceId"]
433 self.lcm_tasks.cancel(topic, nsr_id)
434 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
435 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
436 return
437 elif command == "vca_status_refresh":
438 nslcmop = params
439 nslcmop_id = nslcmop["_id"]
440 nsr_id = nslcmop["nsInstanceId"]
441 task = asyncio.ensure_future(
442 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
443 )
444 self.lcm_tasks.register(
445 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
446 )
447 return
448 elif command == "action":
449 # self.logger.debug("Update NS {}".format(nsr_id))
450 nslcmop = params
451 nslcmop_id = nslcmop["_id"]
452 nsr_id = nslcmop["nsInstanceId"]
453 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
454 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
455 return
456 elif command == "scale":
457 # self.logger.debug("Update NS {}".format(nsr_id))
458 nslcmop = params
459 nslcmop_id = nslcmop["_id"]
460 nsr_id = nslcmop["nsInstanceId"]
461 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
462 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
463 return
464 elif command == "show":
465 nsr_id = params
466 try:
467 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
468 print(
469 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
470 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
471 "".format(
472 nsr_id,
473 db_nsr["operational-status"],
474 db_nsr["config-status"],
475 db_nsr["detailed-status"],
476 db_nsr["_admin"]["deployed"],
477 self.lcm_ns_tasks.get(nsr_id),
478 )
479 )
480 except Exception as e:
481 print("nsr {} not found: {}".format(nsr_id, e))
482 sys.stdout.flush()
483 return
484 elif command == "deleted":
485 return # TODO cleaning of task just in case should be done
486 elif command in (
487 "terminated",
488 "instantiated",
489 "scaled",
490 "actioned",
491 ): # "scaled-cooldown-time"
492 return
493 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
494 if command == "instantiate":
495 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
496 nsilcmop = params
497 nsilcmop_id = nsilcmop["_id"] # slice operation id
498 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
499 task = asyncio.ensure_future(
500 self.netslice.instantiate(nsir_id, nsilcmop_id)
501 )
502 self.lcm_tasks.register(
503 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
504 )
505 return
506 elif command == "terminate":
507 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
508 nsilcmop = params
509 nsilcmop_id = nsilcmop["_id"] # slice operation id
510 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
511 self.lcm_tasks.cancel(topic, nsir_id)
512 task = asyncio.ensure_future(
513 self.netslice.terminate(nsir_id, nsilcmop_id)
514 )
515 self.lcm_tasks.register(
516 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
517 )
518 return
519 elif command == "show":
520 nsir_id = params
521 try:
522 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
523 print(
524 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
525 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
526 "".format(
527 nsir_id,
528 db_nsir["operational-status"],
529 db_nsir["config-status"],
530 db_nsir["detailed-status"],
531 db_nsir["_admin"]["deployed"],
532 self.lcm_netslice_tasks.get(nsir_id),
533 )
534 )
535 except Exception as e:
536 print("nsir {} not found: {}".format(nsir_id, e))
537 sys.stdout.flush()
538 return
539 elif command == "deleted":
540 return # TODO cleaning of task just in case should be done
541 elif command in (
542 "terminated",
543 "instantiated",
544 "scaled",
545 "actioned",
546 ): # "scaled-cooldown-time"
547 return
548 elif topic == "vim_account":
549 vim_id = params["_id"]
550 if command in ("create", "created"):
551 if not self.config["ro_config"].get("ng"):
552 task = asyncio.ensure_future(self.vim.create(params, order_id))
553 self.lcm_tasks.register(
554 "vim_account", vim_id, order_id, "vim_create", task
555 )
556 return
557 elif command == "delete" or command == "deleted":
558 self.lcm_tasks.cancel(topic, vim_id)
559 task = asyncio.ensure_future(self.vim.delete(params, order_id))
560 self.lcm_tasks.register(
561 "vim_account", vim_id, order_id, "vim_delete", task
562 )
563 return
564 elif command == "show":
565 print("not implemented show with vim_account")
566 sys.stdout.flush()
567 return
568 elif command in ("edit", "edited"):
569 if not self.config["ro_config"].get("ng"):
570 task = asyncio.ensure_future(self.vim.edit(params, order_id))
571 self.lcm_tasks.register(
572 "vim_account", vim_id, order_id, "vim_edit", task
573 )
574 return
575 elif command == "deleted":
576 return # TODO cleaning of task just in case should be done
577 elif topic == "wim_account":
578 wim_id = params["_id"]
579 if command in ("create", "created"):
580 if not self.config["ro_config"].get("ng"):
581 task = asyncio.ensure_future(self.wim.create(params, order_id))
582 self.lcm_tasks.register(
583 "wim_account", wim_id, order_id, "wim_create", task
584 )
585 return
586 elif command == "delete" or command == "deleted":
587 self.lcm_tasks.cancel(topic, wim_id)
588 task = asyncio.ensure_future(self.wim.delete(params, order_id))
589 self.lcm_tasks.register(
590 "wim_account", wim_id, order_id, "wim_delete", task
591 )
592 return
593 elif command == "show":
594 print("not implemented show with wim_account")
595 sys.stdout.flush()
596 return
597 elif command in ("edit", "edited"):
598 task = asyncio.ensure_future(self.wim.edit(params, order_id))
599 self.lcm_tasks.register(
600 "wim_account", wim_id, order_id, "wim_edit", task
601 )
602 return
603 elif command == "deleted":
604 return # TODO cleaning of task just in case should be done
605 elif topic == "sdn":
606 _sdn_id = params["_id"]
607 if command in ("create", "created"):
608 if not self.config["ro_config"].get("ng"):
609 task = asyncio.ensure_future(self.sdn.create(params, order_id))
610 self.lcm_tasks.register(
611 "sdn", _sdn_id, order_id, "sdn_create", task
612 )
613 return
614 elif command == "delete" or command == "deleted":
615 self.lcm_tasks.cancel(topic, _sdn_id)
616 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
617 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
618 return
619 elif command in ("edit", "edited"):
620 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
621 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
622 return
623 elif command == "deleted":
624 return # TODO cleaning of task just in case should be done
625 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
626
627 async def kafka_read(self):
628 self.logger.debug(
629 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
630 )
631 # future = asyncio.Future()
632 self.consecutive_errors = 0
633 self.first_start = True
634 while self.consecutive_errors < 10:
635 try:
636 topics = (
637 "ns",
638 "vim_account",
639 "wim_account",
640 "sdn",
641 "nsi",
642 "k8scluster",
643 "vca",
644 "k8srepo",
645 "pla",
646 )
647 topics_admin = ("admin",)
648 await asyncio.gather(
649 self.msg.aioread(
650 topics, self.loop, self.kafka_read_callback, from_beginning=True
651 ),
652 self.msg_admin.aioread(
653 topics_admin,
654 self.loop,
655 self.kafka_read_callback,
656 group_id=False,
657 ),
658 )
659
660 except LcmExceptionExit:
661 self.logger.debug("Bye!")
662 break
663 except Exception as e:
664 # if not first_start is the first time after starting. So leave more time and wait
665 # to allow kafka starts
666 if self.consecutive_errors == 8 if not self.first_start else 30:
667 self.logger.error(
668 "Task kafka_read task exit error too many errors. Exception: {}".format(
669 e
670 )
671 )
672 raise
673 self.consecutive_errors += 1
674 self.logger.error(
675 "Task kafka_read retrying after Exception {}".format(e)
676 )
677 wait_time = 2 if not self.first_start else 5
678 await asyncio.sleep(wait_time, loop=self.loop)
679
680 # self.logger.debug("Task kafka_read terminating")
681 self.logger.debug("Task kafka_read exit")
682
683 def start(self):
684
685 # check RO version
686 self.loop.run_until_complete(self.check_RO_version())
687
688 self.ns = ns.NsLcm(
689 self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus
690 )
691 self.netslice = netslice.NetsliceLcm(
692 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
693 )
694 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
695 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
696 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
697 self.k8scluster = vim_sdn.K8sClusterLcm(
698 self.msg, self.lcm_tasks, self.config, self.loop
699 )
700 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
701 self.k8srepo = vim_sdn.K8sRepoLcm(
702 self.msg, self.lcm_tasks, self.config, self.loop
703 )
704
705 # configure tsdb prometheus
706 if self.prometheus:
707 self.loop.run_until_complete(self.prometheus.start())
708
709 self.loop.run_until_complete(
710 asyncio.gather(self.kafka_read(), self.kafka_ping())
711 )
712 # TODO
713 # self.logger.debug("Terminating cancelling creation tasks")
714 # self.lcm_tasks.cancel("ALL", "create")
715 # timeout = 200
716 # while self.is_pending_tasks():
717 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
718 # await asyncio.sleep(2, loop=self.loop)
719 # timeout -= 2
720 # if not timeout:
721 # self.lcm_tasks.cancel("ALL", "ALL")
722 self.loop.close()
723 self.loop = None
724 if self.db:
725 self.db.db_disconnect()
726 if self.msg:
727 self.msg.disconnect()
728 if self.msg_admin:
729 self.msg_admin.disconnect()
730 if self.fs:
731 self.fs.fs_disconnect()
732
733 def read_config_file(self, config_file):
734 # TODO make a [ini] + yaml inside parser
735 # the configparser library is not suitable, because it does not admit comments at the end of line,
736 # and not parse integer or boolean
737 try:
738 # read file as yaml format
739 with open(config_file) as f:
740 conf = yaml.load(f, Loader=yaml.Loader)
741 # Ensure all sections are not empty
742 for k in (
743 "global",
744 "timeout",
745 "RO",
746 "VCA",
747 "database",
748 "storage",
749 "message",
750 ):
751 if not conf.get(k):
752 conf[k] = {}
753
754 # read all environ that starts with OSMLCM_
755 for k, v in environ.items():
756 if not k.startswith("OSMLCM_"):
757 continue
758 subject, _, item = k[7:].lower().partition("_")
759 if not item:
760 continue
761 if subject in ("ro", "vca"):
762 # put in capital letter
763 subject = subject.upper()
764 try:
765 if item == "port" or subject == "timeout":
766 conf[subject][item] = int(v)
767 else:
768 conf[subject][item] = v
769 except Exception as e:
770 self.logger.warning(
771 "skipping environ '{}' on exception '{}'".format(k, e)
772 )
773
774 # backward compatibility of VCA parameters
775
776 if "pubkey" in conf["VCA"]:
777 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
778 if "cacert" in conf["VCA"]:
779 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
780 if "apiproxy" in conf["VCA"]:
781 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
782
783 if "enableosupgrade" in conf["VCA"]:
784 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
785 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
786 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
787 conf["VCA"]["enable_os_upgrade"] = False
788 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
789 conf["VCA"]["enable_os_upgrade"] = True
790
791 if "aptmirror" in conf["VCA"]:
792 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
793
794 return conf
795 except Exception as e:
796 self.logger.critical("At config file '{}': {}".format(config_file, e))
797 exit(1)
798
799 @staticmethod
800 def get_process_id():
801 """
802 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
803 will provide a random one
804 :return: Obtained ID
805 """
806 # Try getting docker id. If fails, get pid
807 try:
808 with open("/proc/self/cgroup", "r") as f:
809 text_id_ = f.readline()
810 _, _, text_id = text_id_.rpartition("/")
811 text_id = text_id.replace("\n", "")[:12]
812 if text_id:
813 return text_id
814 except Exception:
815 pass
816 # Return a random id
817 return "".join(random_choice("0123456789abcdef") for _ in range(12))
818
819
820 def usage():
821 print(
822 """Usage: {} [options]
823 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
824 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
825 -h|--help: shows this help
826 """.format(
827 sys.argv[0]
828 )
829 )
830 # --log-socket-host HOST: send logs to this host")
831 # --log-socket-port PORT: send logs using this port (default: 9022)")
832
833
834 if __name__ == "__main__":
835
836 try:
837 # print("SYS.PATH='{}'".format(sys.path))
838 # load parameters and configuration
839 # -h
840 # -c value
841 # --config value
842 # --help
843 # --health-check
844 opts, args = getopt.getopt(
845 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
846 )
847 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
848 config_file = None
849 for o, a in opts:
850 if o in ("-h", "--help"):
851 usage()
852 sys.exit()
853 elif o in ("-c", "--config"):
854 config_file = a
855 elif o == "--health-check":
856 from osm_lcm.lcm_hc import health_check
857
858 health_check(health_check_file, Lcm.ping_interval_pace)
859 # elif o == "--log-socket-port":
860 # log_socket_port = a
861 # elif o == "--log-socket-host":
862 # log_socket_host = a
863 # elif o == "--log-file":
864 # log_file = a
865 else:
866 assert False, "Unhandled option"
867
868 if config_file:
869 if not path.isfile(config_file):
870 print(
871 "configuration file '{}' does not exist".format(config_file),
872 file=sys.stderr,
873 )
874 exit(1)
875 else:
876 for config_file in (
877 __file__[: __file__.rfind(".")] + ".cfg",
878 "./lcm.cfg",
879 "/etc/osm/lcm.cfg",
880 ):
881 if path.isfile(config_file):
882 break
883 else:
884 print(
885 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
886 file=sys.stderr,
887 )
888 exit(1)
889 lcm = Lcm(config_file)
890 lcm.start()
891 except (LcmException, getopt.GetoptError) as e:
892 print(str(e), file=sys.stderr)
893 # usage()
894 exit(1)