Reformat files according to new black validation
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path
50 from random import choice as random_choice
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70
71 main_config = LcmCfg()
72
73 def __init__(self, config_file, loop=None):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 self.db = None
80 self.msg = None
81 self.msg_admin = None
82 self.fs = None
83 self.pings_not_received = 1
84 self.consecutive_errors = 0
85 self.first_start = False
86
87 # logging
88 self.logger = logging.getLogger("lcm")
89 # get id
90 self.worker_id = self.get_process_id()
91 # load configuration
92 config = self.read_config_file(config_file)
93 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
99 self.loop = loop or asyncio.get_event_loop()
100 self.ns = (
101 self.netslice
102 ) = (
103 self.vim
104 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
105
106 # logging
107 log_format_simple = (
108 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
109 )
110 log_formatter_simple = logging.Formatter(
111 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
112 )
113 if self.main_config.globalConfig.logfile:
114 file_handler = logging.handlers.RotatingFileHandler(
115 self.main_config.globalConfig.logfile,
116 maxBytes=100e6,
117 backupCount=9,
118 delay=0,
119 )
120 file_handler.setFormatter(log_formatter_simple)
121 self.logger.addHandler(file_handler)
122 if not self.main_config.globalConfig.to_dict()["nologging"]:
123 str_handler = logging.StreamHandler()
124 str_handler.setFormatter(log_formatter_simple)
125 self.logger.addHandler(str_handler)
126
127 if self.main_config.globalConfig.to_dict()["loglevel"]:
128 self.logger.setLevel(self.main_config.globalConfig.loglevel)
129
130 # logging other modules
131 for logger in ("message", "database", "storage", "tsdb"):
132 logger_config = self.main_config.to_dict()[logger]
133 logger_module = logging.getLogger(logger_config["logger_name"])
134 if logger_config["logfile"]:
135 file_handler = logging.handlers.RotatingFileHandler(
136 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
137 )
138 file_handler.setFormatter(log_formatter_simple)
139 logger_module.addHandler(file_handler)
140 if logger_config["loglevel"]:
141 logger_module.setLevel(logger_config["loglevel"])
142 self.logger.critical(
143 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
144 )
145
146 # check version of N2VC
147 # TODO enhance with int conversion or from distutils.version import LooseVersion
148 # or with list(map(int, version.split(".")))
149 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
150 raise LcmException(
151 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
152 n2vc_version, min_n2vc_version
153 )
154 )
155 # check version of common
156 if versiontuple(common_version) < versiontuple(min_common_version):
157 raise LcmException(
158 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
159 common_version, min_common_version
160 )
161 )
162
163 try:
164 self.db = Database(self.main_config.to_dict()).instance.db
165
166 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
167 self.fs.sync()
168
169 # copy message configuration in order to remove 'group_id' for msg_admin
170 config_message = self.main_config.message.to_dict()
171 config_message["loop"] = self.loop
172 if config_message["driver"] == "local":
173 self.msg = msglocal.MsgLocal()
174 self.msg.connect(config_message)
175 self.msg_admin = msglocal.MsgLocal()
176 config_message.pop("group_id", None)
177 self.msg_admin.connect(config_message)
178 elif config_message["driver"] == "kafka":
179 self.msg = msgkafka.MsgKafka()
180 self.msg.connect(config_message)
181 self.msg_admin = msgkafka.MsgKafka()
182 config_message.pop("group_id", None)
183 self.msg_admin.connect(config_message)
184 else:
185 raise LcmException(
186 "Invalid configuration param '{}' at '[message]':'driver'".format(
187 self.main_config.message.driver
188 )
189 )
190 except (DbException, FsException, MsgException) as e:
191 self.logger.critical(str(e), exc_info=True)
192 raise LcmException(str(e))
193
194 # contains created tasks/futures to be able to cancel
195 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
196
197 async def check_RO_version(self):
198 tries = 14
199 last_error = None
200 while True:
201 ro_uri = self.main_config.RO.uri
202 if not ro_uri:
203 ro_uri = ""
204 try:
205 # try new RO, if fail old RO
206 try:
207 self.main_config.RO.uri = ro_uri + "ro"
208 ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
209 ro_version = await ro_server.get_version()
210 self.main_config.RO.ng = True
211 except Exception:
212 self.main_config.RO.uri = ro_uri + "openmano"
213 ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
214 ro_version = await ro_server.get_version()
215 self.main_config.RO.ng = False
216 if versiontuple(ro_version) < versiontuple(min_RO_version):
217 raise LcmException(
218 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
219 ro_version, min_RO_version
220 )
221 )
222 self.logger.info(
223 "Connected to RO version {} new-generation version {}".format(
224 ro_version, self.main_config.RO.ng
225 )
226 )
227 return
228 except (ROClientException, NgRoException) as e:
229 self.main_config.RO.uri = ro_uri
230 tries -= 1
231 traceback.print_tb(e.__traceback__)
232 error_text = "Error while connecting to RO on {}: {}".format(
233 self.main_config.RO.uri, e
234 )
235 if tries <= 0:
236 self.logger.critical(error_text)
237 raise LcmException(error_text)
238 if last_error != error_text:
239 last_error = error_text
240 self.logger.error(
241 error_text + ". Waiting until {} seconds".format(5 * tries)
242 )
243 await asyncio.sleep(5)
244
245 async def test(self, param=None):
246 self.logger.debug("Starting/Ending test task: {}".format(param))
247
248 async def kafka_ping(self):
249 self.logger.debug("Task kafka_ping Enter")
250 consecutive_errors = 0
251 first_start = True
252 kafka_has_received = False
253 self.pings_not_received = 1
254 while True:
255 try:
256 await self.msg_admin.aiowrite(
257 "admin",
258 "ping",
259 {
260 "from": "lcm",
261 "to": "lcm",
262 "worker_id": self.worker_id,
263 "version": lcm_version,
264 },
265 self.loop,
266 )
267 # time between pings are low when it is not received and at starting
268 wait_time = (
269 self.ping_interval_boot
270 if not kafka_has_received
271 else self.ping_interval_pace
272 )
273 if not self.pings_not_received:
274 kafka_has_received = True
275 self.pings_not_received += 1
276 await asyncio.sleep(wait_time, loop=self.loop)
277 if self.pings_not_received > 10:
278 raise LcmException("It is not receiving pings from Kafka bus")
279 consecutive_errors = 0
280 first_start = False
281 except LcmException:
282 raise
283 except Exception as e:
284 # if not first_start is the first time after starting. So leave more time and wait
285 # to allow kafka starts
286 if consecutive_errors == 8 if not first_start else 30:
287 self.logger.error(
288 "Task kafka_read task exit error too many errors. Exception: {}".format(
289 e
290 )
291 )
292 raise
293 consecutive_errors += 1
294 self.logger.error(
295 "Task kafka_read retrying after Exception {}".format(e)
296 )
297 wait_time = 2 if not first_start else 5
298 await asyncio.sleep(wait_time, loop=self.loop)
299
300 def kafka_read_callback(self, topic, command, params):
301 order_id = 1
302
303 if topic != "admin" and command != "ping":
304 self.logger.debug(
305 "Task kafka_read receives {} {}: {}".format(topic, command, params)
306 )
307 self.consecutive_errors = 0
308 self.first_start = False
309 order_id += 1
310 if command == "exit":
311 raise LcmExceptionExit
312 elif command.startswith("#"):
313 return
314 elif command == "echo":
315 # just for test
316 print(params)
317 sys.stdout.flush()
318 return
319 elif command == "test":
320 asyncio.Task(self.test(params), loop=self.loop)
321 return
322
323 if topic == "admin":
324 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
325 if params.get("worker_id") != self.worker_id:
326 return
327 self.pings_not_received = 0
328 try:
329 with open(self.health_check_file, "w") as f:
330 f.write(str(time()))
331 except Exception as e:
332 self.logger.error(
333 "Cannot write into '{}' for healthcheck: {}".format(
334 self.health_check_file, e
335 )
336 )
337 return
338 elif topic == "pla":
339 if command == "placement":
340 self.ns.update_nsrs_with_pla_result(params)
341 return
342 elif topic == "k8scluster":
343 if command == "create" or command == "created":
344 k8scluster_id = params.get("_id")
345 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
346 self.lcm_tasks.register(
347 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
348 )
349 return
350 elif command == "edit" or command == "edited":
351 k8scluster_id = params.get("_id")
352 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
353 self.lcm_tasks.register(
354 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
355 )
356 return
357 elif command == "delete" or command == "deleted":
358 k8scluster_id = params.get("_id")
359 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
360 self.lcm_tasks.register(
361 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
362 )
363 return
364 elif topic == "vca":
365 if command == "create" or command == "created":
366 vca_id = params.get("_id")
367 task = asyncio.ensure_future(self.vca.create(params, order_id))
368 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
369 return
370 elif command == "edit" or command == "edited":
371 vca_id = params.get("_id")
372 task = asyncio.ensure_future(self.vca.edit(params, order_id))
373 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
374 return
375 elif command == "delete" or command == "deleted":
376 vca_id = params.get("_id")
377 task = asyncio.ensure_future(self.vca.delete(params, order_id))
378 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
379 return
380 elif topic == "k8srepo":
381 if command == "create" or command == "created":
382 k8srepo_id = params.get("_id")
383 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
384 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
385 self.lcm_tasks.register(
386 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
387 )
388 return
389 elif command == "delete" or command == "deleted":
390 k8srepo_id = params.get("_id")
391 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
392 self.lcm_tasks.register(
393 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
394 )
395 return
396 elif topic == "ns":
397 if command == "instantiate":
398 # self.logger.debug("Deploying NS {}".format(nsr_id))
399 nslcmop = params
400 nslcmop_id = nslcmop["_id"]
401 nsr_id = nslcmop["nsInstanceId"]
402 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
403 self.lcm_tasks.register(
404 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
405 )
406 return
407 elif command == "terminate":
408 # self.logger.debug("Deleting NS {}".format(nsr_id))
409 nslcmop = params
410 nslcmop_id = nslcmop["_id"]
411 nsr_id = nslcmop["nsInstanceId"]
412 self.lcm_tasks.cancel(topic, nsr_id)
413 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
414 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
415 return
416 elif command == "vca_status_refresh":
417 nslcmop = params
418 nslcmop_id = nslcmop["_id"]
419 nsr_id = nslcmop["nsInstanceId"]
420 task = asyncio.ensure_future(
421 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
422 )
423 self.lcm_tasks.register(
424 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
425 )
426 return
427 elif command == "action":
428 # self.logger.debug("Update NS {}".format(nsr_id))
429 nslcmop = params
430 nslcmop_id = nslcmop["_id"]
431 nsr_id = nslcmop["nsInstanceId"]
432 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
433 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
434 return
435 elif command == "update":
436 # self.logger.debug("Update NS {}".format(nsr_id))
437 nslcmop = params
438 nslcmop_id = nslcmop["_id"]
439 nsr_id = nslcmop["nsInstanceId"]
440 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
441 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
442 return
443 elif command == "scale":
444 # self.logger.debug("Update NS {}".format(nsr_id))
445 nslcmop = params
446 nslcmop_id = nslcmop["_id"]
447 nsr_id = nslcmop["nsInstanceId"]
448 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
449 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
450 return
451 elif command == "heal":
452 # self.logger.debug("Healing NS {}".format(nsr_id))
453 nslcmop = params
454 nslcmop_id = nslcmop["_id"]
455 nsr_id = nslcmop["nsInstanceId"]
456 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
457 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
458 return
459 elif command == "migrate":
460 nslcmop = params
461 nslcmop_id = nslcmop["_id"]
462 nsr_id = nslcmop["nsInstanceId"]
463 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
464 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
465 return
466 elif command == "verticalscale":
467 nslcmop = params
468 nslcmop_id = nslcmop["_id"]
469 nsr_id = nslcmop["nsInstanceId"]
470 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
471 self.logger.debug(
472 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
473 )
474 self.lcm_tasks.register(
475 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
476 )
477 self.logger.debug(
478 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
479 )
480 return
481 elif command == "show":
482 nsr_id = params
483 try:
484 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
485 print(
486 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
487 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
488 "".format(
489 nsr_id,
490 db_nsr["operational-status"],
491 db_nsr["config-status"],
492 db_nsr["detailed-status"],
493 db_nsr["_admin"]["deployed"],
494 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
495 )
496 )
497 except Exception as e:
498 print("nsr {} not found: {}".format(nsr_id, e))
499 sys.stdout.flush()
500 return
501 elif command == "deleted":
502 return # TODO cleaning of task just in case should be done
503 elif command in (
504 "vnf_terminated",
505 "policy_updated",
506 "terminated",
507 "instantiated",
508 "scaled",
509 "healed",
510 "actioned",
511 "updated",
512 "migrated",
513 "verticalscaled",
514 ): # "scaled-cooldown-time"
515 return
516
517 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
518 if command == "instantiate":
519 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
520 nsilcmop = params
521 nsilcmop_id = nsilcmop["_id"] # slice operation id
522 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
523 task = asyncio.ensure_future(
524 self.netslice.instantiate(nsir_id, nsilcmop_id)
525 )
526 self.lcm_tasks.register(
527 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
528 )
529 return
530 elif command == "terminate":
531 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
532 nsilcmop = params
533 nsilcmop_id = nsilcmop["_id"] # slice operation id
534 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
535 self.lcm_tasks.cancel(topic, nsir_id)
536 task = asyncio.ensure_future(
537 self.netslice.terminate(nsir_id, nsilcmop_id)
538 )
539 self.lcm_tasks.register(
540 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
541 )
542 return
543 elif command == "show":
544 nsir_id = params
545 try:
546 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
547 print(
548 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
549 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
550 "".format(
551 nsir_id,
552 db_nsir["operational-status"],
553 db_nsir["config-status"],
554 db_nsir["detailed-status"],
555 db_nsir["_admin"]["deployed"],
556 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
557 )
558 )
559 except Exception as e:
560 print("nsir {} not found: {}".format(nsir_id, e))
561 sys.stdout.flush()
562 return
563 elif command == "deleted":
564 return # TODO cleaning of task just in case should be done
565 elif command in (
566 "terminated",
567 "instantiated",
568 "scaled",
569 "healed",
570 "actioned",
571 ): # "scaled-cooldown-time"
572 return
573 elif topic == "vim_account":
574 vim_id = params["_id"]
575 if command in ("create", "created"):
576 if not self.main_config.RO.ng:
577 task = asyncio.ensure_future(self.vim.create(params, order_id))
578 self.lcm_tasks.register(
579 "vim_account", vim_id, order_id, "vim_create", task
580 )
581 return
582 elif command == "delete" or command == "deleted":
583 self.lcm_tasks.cancel(topic, vim_id)
584 task = asyncio.ensure_future(self.vim.delete(params, order_id))
585 self.lcm_tasks.register(
586 "vim_account", vim_id, order_id, "vim_delete", task
587 )
588 return
589 elif command == "show":
590 print("not implemented show with vim_account")
591 sys.stdout.flush()
592 return
593 elif command in ("edit", "edited"):
594 if not self.main_config.RO.ng:
595 task = asyncio.ensure_future(self.vim.edit(params, order_id))
596 self.lcm_tasks.register(
597 "vim_account", vim_id, order_id, "vim_edit", task
598 )
599 return
600 elif command == "deleted":
601 return # TODO cleaning of task just in case should be done
602 elif topic == "wim_account":
603 wim_id = params["_id"]
604 if command in ("create", "created"):
605 if not self.main_config.RO.ng:
606 task = asyncio.ensure_future(self.wim.create(params, order_id))
607 self.lcm_tasks.register(
608 "wim_account", wim_id, order_id, "wim_create", task
609 )
610 return
611 elif command == "delete" or command == "deleted":
612 self.lcm_tasks.cancel(topic, wim_id)
613 task = asyncio.ensure_future(self.wim.delete(params, order_id))
614 self.lcm_tasks.register(
615 "wim_account", wim_id, order_id, "wim_delete", task
616 )
617 return
618 elif command == "show":
619 print("not implemented show with wim_account")
620 sys.stdout.flush()
621 return
622 elif command in ("edit", "edited"):
623 task = asyncio.ensure_future(self.wim.edit(params, order_id))
624 self.lcm_tasks.register(
625 "wim_account", wim_id, order_id, "wim_edit", task
626 )
627 return
628 elif command == "deleted":
629 return # TODO cleaning of task just in case should be done
630 elif topic == "sdn":
631 _sdn_id = params["_id"]
632 if command in ("create", "created"):
633 if not self.main_config.RO.ng:
634 task = asyncio.ensure_future(self.sdn.create(params, order_id))
635 self.lcm_tasks.register(
636 "sdn", _sdn_id, order_id, "sdn_create", task
637 )
638 return
639 elif command == "delete" or command == "deleted":
640 self.lcm_tasks.cancel(topic, _sdn_id)
641 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
642 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
643 return
644 elif command in ("edit", "edited"):
645 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
646 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
647 return
648 elif command == "deleted":
649 return # TODO cleaning of task just in case should be done
650 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
651
652 async def kafka_read(self):
653 self.logger.debug(
654 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
655 )
656 # future = asyncio.Future()
657 self.consecutive_errors = 0
658 self.first_start = True
659 while self.consecutive_errors < 10:
660 try:
661 topics = (
662 "ns",
663 "vim_account",
664 "wim_account",
665 "sdn",
666 "nsi",
667 "k8scluster",
668 "vca",
669 "k8srepo",
670 "pla",
671 )
672 topics_admin = ("admin",)
673 await asyncio.gather(
674 self.msg.aioread(
675 topics, self.loop, self.kafka_read_callback, from_beginning=True
676 ),
677 self.msg_admin.aioread(
678 topics_admin,
679 self.loop,
680 self.kafka_read_callback,
681 group_id=False,
682 ),
683 )
684
685 except LcmExceptionExit:
686 self.logger.debug("Bye!")
687 break
688 except Exception as e:
689 # if not first_start is the first time after starting. So leave more time and wait
690 # to allow kafka starts
691 if self.consecutive_errors == 8 if not self.first_start else 30:
692 self.logger.error(
693 "Task kafka_read task exit error too many errors. Exception: {}".format(
694 e
695 )
696 )
697 raise
698 self.consecutive_errors += 1
699 self.logger.error(
700 "Task kafka_read retrying after Exception {}".format(e)
701 )
702 wait_time = 2 if not self.first_start else 5
703 await asyncio.sleep(wait_time, loop=self.loop)
704
705 # self.logger.debug("Task kafka_read terminating")
706 self.logger.debug("Task kafka_read exit")
707
708 def start(self):
709 # check RO version
710 self.loop.run_until_complete(self.check_RO_version())
711
712 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
713 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
714 self.netslice = netslice.NetsliceLcm(
715 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
716 )
717 self.vim = vim_sdn.VimLcm(
718 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
719 )
720 self.wim = vim_sdn.WimLcm(
721 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
722 )
723 self.sdn = vim_sdn.SdnLcm(
724 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
725 )
726 self.k8scluster = vim_sdn.K8sClusterLcm(
727 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
728 )
729 self.vca = vim_sdn.VcaLcm(
730 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
731 )
732 self.k8srepo = vim_sdn.K8sRepoLcm(
733 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
734 )
735
736 self.loop.run_until_complete(
737 asyncio.gather(self.kafka_read(), self.kafka_ping())
738 )
739
740 # TODO
741 # self.logger.debug("Terminating cancelling creation tasks")
742 # self.lcm_tasks.cancel("ALL", "create")
743 # timeout = 200
744 # while self.is_pending_tasks():
745 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
746 # await asyncio.sleep(2, loop=self.loop)
747 # timeout -= 2
748 # if not timeout:
749 # self.lcm_tasks.cancel("ALL", "ALL")
750 self.loop.close()
751 self.loop = None
752 if self.db:
753 self.db.db_disconnect()
754 if self.msg:
755 self.msg.disconnect()
756 if self.msg_admin:
757 self.msg_admin.disconnect()
758 if self.fs:
759 self.fs.fs_disconnect()
760
761 def read_config_file(self, config_file):
762 try:
763 with open(config_file) as f:
764 return yaml.safe_load(f)
765 except Exception as e:
766 self.logger.critical("At config file '{}': {}".format(config_file, e))
767 exit(1)
768
769 @staticmethod
770 def get_process_id():
771 """
772 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
773 will provide a random one
774 :return: Obtained ID
775 """
776 # Try getting docker id. If fails, get pid
777 try:
778 with open("/proc/self/cgroup", "r") as f:
779 text_id_ = f.readline()
780 _, _, text_id = text_id_.rpartition("/")
781 text_id = text_id.replace("\n", "")[:12]
782 if text_id:
783 return text_id
784 except Exception:
785 pass
786 # Return a random id
787 return "".join(random_choice("0123456789abcdef") for _ in range(12))
788
789
790 def usage():
791 print(
792 """Usage: {} [options]
793 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
794 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
795 -h|--help: shows this help
796 """.format(
797 sys.argv[0]
798 )
799 )
800 # --log-socket-host HOST: send logs to this host")
801 # --log-socket-port PORT: send logs using this port (default: 9022)")
802
803
804 if __name__ == "__main__":
805 try:
806 # print("SYS.PATH='{}'".format(sys.path))
807 # load parameters and configuration
808 # -h
809 # -c value
810 # --config value
811 # --help
812 # --health-check
813 opts, args = getopt.getopt(
814 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
815 )
816 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
817 config_file = None
818 for o, a in opts:
819 if o in ("-h", "--help"):
820 usage()
821 sys.exit()
822 elif o in ("-c", "--config"):
823 config_file = a
824 elif o == "--health-check":
825 from osm_lcm.lcm_hc import health_check
826
827 health_check(config_file, Lcm.ping_interval_pace)
828 # elif o == "--log-socket-port":
829 # log_socket_port = a
830 # elif o == "--log-socket-host":
831 # log_socket_host = a
832 # elif o == "--log-file":
833 # log_file = a
834 else:
835 assert False, "Unhandled option"
836
837 if config_file:
838 if not path.isfile(config_file):
839 print(
840 "configuration file '{}' does not exist".format(config_file),
841 file=sys.stderr,
842 )
843 exit(1)
844 else:
845 for config_file in (
846 __file__[: __file__.rfind(".")] + ".cfg",
847 "./lcm.cfg",
848 "/etc/osm/lcm.cfg",
849 ):
850 if path.isfile(config_file):
851 break
852 else:
853 print(
854 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
855 file=sys.stderr,
856 )
857 exit(1)
858 lcm = Lcm(config_file)
859 lcm.start()
860 except (LcmException, getopt.GetoptError) as e:
861 print(str(e), file=sys.stderr)
862 # usage()
863 exit(1)