1c81da110329ee4e714da5e3475d73c0922d4d12
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import pdb
23
24 import os
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31 from random import SystemRandom
32
33 from osm_lcm import ns, vim_sdn, netslice
34 from osm_lcm.ng_ro import NgRoException, NgRoClient
35 from osm_lcm.ROclient import ROClient, ROClientException
36
37 from time import time
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48 from osm_lcm.data_utils.lcm_config import LcmCfg
49 from osm_lcm.lcm_hc import get_health_check_file
50 from os import path, getenv
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70
71 main_config = LcmCfg()
72
73 def __init__(self, config_file):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 self.db = None
80 self.msg = None
81 self.msg_admin = None
82 self.fs = None
83 self.pings_not_received = 1
84 self.consecutive_errors = 0
85 self.first_start = False
86
87 # logging
88 self.logger = logging.getLogger("lcm")
89 # get id
90 self.worker_id = self.get_process_id()
91 # load configuration
92 config = self.read_config_file(config_file)
93 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
99 self.ns = (
100 self.netslice
101 ) = (
102 self.vim
103 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
104
105 # logging
106 log_format_simple = (
107 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 )
109 log_formatter_simple = logging.Formatter(
110 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
111 )
112 if self.main_config.globalConfig.logfile:
113 file_handler = logging.handlers.RotatingFileHandler(
114 self.main_config.globalConfig.logfile,
115 maxBytes=100e6,
116 backupCount=9,
117 delay=0,
118 )
119 file_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(file_handler)
121 if not self.main_config.globalConfig.to_dict()["nologging"]:
122 str_handler = logging.StreamHandler()
123 str_handler.setFormatter(log_formatter_simple)
124 self.logger.addHandler(str_handler)
125
126 if self.main_config.globalConfig.to_dict()["loglevel"]:
127 self.logger.setLevel(self.main_config.globalConfig.loglevel)
128
129 # logging other modules
130 for logger in ("message", "database", "storage", "tsdb"):
131 logger_config = self.main_config.to_dict()[logger]
132 logger_module = logging.getLogger(logger_config["logger_name"])
133 if logger_config["logfile"]:
134 file_handler = logging.handlers.RotatingFileHandler(
135 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
136 )
137 file_handler.setFormatter(log_formatter_simple)
138 logger_module.addHandler(file_handler)
139 if logger_config["loglevel"]:
140 logger_module.setLevel(logger_config["loglevel"])
141 self.logger.critical(
142 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
143 )
144
145 # check version of N2VC
146 # TODO enhance with int conversion or from distutils.version import LooseVersion
147 # or with list(map(int, version.split(".")))
148 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
149 raise LcmException(
150 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
151 n2vc_version, min_n2vc_version
152 )
153 )
154 # check version of common
155 if versiontuple(common_version) < versiontuple(min_common_version):
156 raise LcmException(
157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
158 common_version, min_common_version
159 )
160 )
161
162 try:
163 self.db = Database(self.main_config.to_dict()).instance.db
164
165 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
166 self.fs.sync()
167
168 # copy message configuration in order to remove 'group_id' for msg_admin
169 config_message = self.main_config.message.to_dict()
170 config_message["loop"] = asyncio.get_event_loop()
171 if config_message["driver"] == "local":
172 self.msg = msglocal.MsgLocal()
173 self.msg.connect(config_message)
174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 elif config_message["driver"] == "kafka":
178 self.msg = msgkafka.MsgKafka()
179 self.msg.connect(config_message)
180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
183 else:
184 raise LcmException(
185 "Invalid configuration param '{}' at '[message]':'driver'".format(
186 self.main_config.message.driver
187 )
188 )
189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
193 # contains created tasks/futures to be able to cancel
194 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
195
196 async def check_RO_version(self):
197 tries = 14
198 last_error = None
199 while True:
200 ro_uri = self.main_config.RO.uri
201 if not ro_uri:
202 ro_uri = ""
203 try:
204 # try new RO, if fail old RO
205 try:
206 self.main_config.RO.uri = ro_uri + "ro"
207 ro_server = NgRoClient(**self.main_config.RO.to_dict())
208 ro_version = await ro_server.get_version()
209 self.main_config.RO.ng = True
210 except Exception:
211 self.main_config.RO.uri = ro_uri + "openmano"
212 ro_server = ROClient(**self.main_config.RO.to_dict())
213 ro_version = await ro_server.get_version()
214 self.main_config.RO.ng = False
215 if versiontuple(ro_version) < versiontuple(min_RO_version):
216 raise LcmException(
217 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version
219 )
220 )
221 self.logger.info(
222 "Connected to RO version {} new-generation version {}".format(
223 ro_version, self.main_config.RO.ng
224 )
225 )
226 return
227 except (ROClientException, NgRoException) as e:
228 self.main_config.RO.uri = ro_uri
229 tries -= 1
230 traceback.print_tb(e.__traceback__)
231 error_text = "Error while connecting to RO on {}: {}".format(
232 self.main_config.RO.uri, e
233 )
234 if tries <= 0:
235 self.logger.critical(error_text)
236 raise LcmException(error_text)
237 if last_error != error_text:
238 last_error = error_text
239 self.logger.error(
240 error_text + ". Waiting until {} seconds".format(5 * tries)
241 )
242 await asyncio.sleep(5)
243
244 async def test(self, param=None):
245 self.logger.debug("Starting/Ending test task: {}".format(param))
246
247 async def kafka_ping(self):
248 self.logger.debug("Task kafka_ping Enter")
249 consecutive_errors = 0
250 first_start = True
251 kafka_has_received = False
252 self.pings_not_received = 1
253 while True:
254 try:
255 await self.msg_admin.aiowrite(
256 "admin",
257 "ping",
258 {
259 "from": "lcm",
260 "to": "lcm",
261 "worker_id": self.worker_id,
262 "version": lcm_version,
263 },
264 )
265 # time between pings are low when it is not received and at starting
266 wait_time = (
267 self.ping_interval_boot
268 if not kafka_has_received
269 else self.ping_interval_pace
270 )
271 if not self.pings_not_received:
272 kafka_has_received = True
273 self.pings_not_received += 1
274 await asyncio.sleep(wait_time)
275 if self.pings_not_received > 10:
276 raise LcmException("It is not receiving pings from Kafka bus")
277 consecutive_errors = 0
278 first_start = False
279 except LcmException:
280 raise
281 except Exception as e:
282 # if not first_start is the first time after starting. So leave more time and wait
283 # to allow kafka starts
284 if consecutive_errors == 8 if not first_start else 30:
285 self.logger.error(
286 "Task kafka_read task exit error too many errors. Exception: {}".format(
287 e
288 )
289 )
290 raise
291 consecutive_errors += 1
292 self.logger.error(
293 "Task kafka_read retrying after Exception {}".format(e)
294 )
295 wait_time = 2 if not first_start else 5
296 await asyncio.sleep(wait_time)
297
298 async def kafka_read_callback(self, topic, command, params):
299 order_id = 1
300
301 if topic != "admin" and command != "ping":
302 self.logger.debug(
303 "Task kafka_read receives {} {}: {}".format(topic, command, params)
304 )
305 self.consecutive_errors = 0
306 self.first_start = False
307 order_id += 1
308 if command == "exit":
309 raise LcmExceptionExit
310 elif command.startswith("#"):
311 return
312 elif command == "echo":
313 # just for test
314 print(params)
315 sys.stdout.flush()
316 return
317 elif command == "test":
318 asyncio.Task(self.test(params))
319 return
320
321 if topic == "admin":
322 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
323 if params.get("worker_id") != self.worker_id:
324 return
325 self.pings_not_received = 0
326 try:
327 with open(self.health_check_file, "w") as f:
328 f.write(str(time()))
329 except Exception as e:
330 self.logger.error(
331 "Cannot write into '{}' for healthcheck: {}".format(
332 self.health_check_file, e
333 )
334 )
335 return
336 elif topic == "nslcmops":
337 if command == "cancel":
338 nslcmop_id = params["_id"]
339 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
340 nsr_id = params["nsInstanceId"]
341 # cancel the tasks and wait
342 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
343 try:
344 await task
345 self.logger.debug(
346 "Cancelled task ended {},{},{}".format(
347 nsr_id, nslcmop_id, task
348 )
349 )
350 except asyncio.CancelledError:
351 self.logger.debug(
352 "Task already cancelled and finished {},{},{}".format(
353 nsr_id, nslcmop_id, task
354 )
355 )
356 # update DB
357 q_filter = {"_id": nslcmop_id}
358 update_dict = {
359 "operationState": "FAILED_TEMP",
360 "isCancelPending": False,
361 }
362 unset_dict = {
363 "cancelMode": None,
364 }
365 self.db.set_one(
366 "nslcmops",
367 q_filter=q_filter,
368 update_dict=update_dict,
369 fail_on_empty=False,
370 unset=unset_dict,
371 )
372 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
373 return
374 elif topic == "pla":
375 if command == "placement":
376 self.ns.update_nsrs_with_pla_result(params)
377 return
378 elif topic == "k8scluster":
379 if command == "create" or command == "created":
380 k8scluster_id = params.get("_id")
381 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
382 self.lcm_tasks.register(
383 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
384 )
385 return
386 elif command == "edit" or command == "edited":
387 k8scluster_id = params.get("_id")
388 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
389 self.lcm_tasks.register(
390 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
391 )
392 return
393 elif command == "delete" or command == "deleted":
394 k8scluster_id = params.get("_id")
395 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
396 self.lcm_tasks.register(
397 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
398 )
399 return
400 elif topic == "vca":
401 if command == "create" or command == "created":
402 vca_id = params.get("_id")
403 task = asyncio.ensure_future(self.vca.create(params, order_id))
404 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
405 return
406 elif command == "edit" or command == "edited":
407 vca_id = params.get("_id")
408 task = asyncio.ensure_future(self.vca.edit(params, order_id))
409 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
410 return
411 elif command == "delete" or command == "deleted":
412 vca_id = params.get("_id")
413 task = asyncio.ensure_future(self.vca.delete(params, order_id))
414 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
415 return
416 elif topic == "k8srepo":
417 if command == "create" or command == "created":
418 k8srepo_id = params.get("_id")
419 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
420 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
421 self.lcm_tasks.register(
422 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
423 )
424 return
425 elif command == "delete" or command == "deleted":
426 k8srepo_id = params.get("_id")
427 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
428 self.lcm_tasks.register(
429 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
430 )
431 return
432 elif topic == "ns":
433 if command == "instantiate":
434 # self.logger.debug("Deploying NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
439 self.lcm_tasks.register(
440 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
441 )
442 return
443 elif command == "terminate":
444 # self.logger.debug("Deleting NS {}".format(nsr_id))
445 nslcmop = params
446 nslcmop_id = nslcmop["_id"]
447 nsr_id = nslcmop["nsInstanceId"]
448 self.lcm_tasks.cancel(topic, nsr_id)
449 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
450 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
451 return
452 elif command == "vca_status_refresh":
453 nslcmop = params
454 nslcmop_id = nslcmop["_id"]
455 nsr_id = nslcmop["nsInstanceId"]
456 task = asyncio.ensure_future(
457 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
458 )
459 self.lcm_tasks.register(
460 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
461 )
462 return
463 elif command == "action":
464 # self.logger.debug("Update NS {}".format(nsr_id))
465 nslcmop = params
466 nslcmop_id = nslcmop["_id"]
467 nsr_id = nslcmop["nsInstanceId"]
468 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
469 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
470 return
471 elif command == "update":
472 # self.logger.debug("Update NS {}".format(nsr_id))
473 nslcmop = params
474 nslcmop_id = nslcmop["_id"]
475 nsr_id = nslcmop["nsInstanceId"]
476 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
477 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
478 return
479 elif command == "scale":
480 # self.logger.debug("Update NS {}".format(nsr_id))
481 nslcmop = params
482 nslcmop_id = nslcmop["_id"]
483 nsr_id = nslcmop["nsInstanceId"]
484 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
485 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
486 return
487 elif command == "heal":
488 # self.logger.debug("Healing NS {}".format(nsr_id))
489 nslcmop = params
490 nslcmop_id = nslcmop["_id"]
491 nsr_id = nslcmop["nsInstanceId"]
492 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
493 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
494 return
495 elif command == "migrate":
496 nslcmop = params
497 nslcmop_id = nslcmop["_id"]
498 nsr_id = nslcmop["nsInstanceId"]
499 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
500 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
501 return
502 elif command == "show":
503 nsr_id = params
504 try:
505 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
506 print(
507 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
508 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
509 "".format(
510 nsr_id,
511 db_nsr["operational-status"],
512 db_nsr["config-status"],
513 db_nsr["detailed-status"],
514 db_nsr["_admin"]["deployed"],
515 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
516 )
517 )
518 except Exception as e:
519 print("nsr {} not found: {}".format(nsr_id, e))
520 sys.stdout.flush()
521 return
522 elif command == "deleted":
523 return # TODO cleaning of task just in case should be done
524 elif command in (
525 "vnf_terminated",
526 "policy_updated",
527 "terminated",
528 "instantiated",
529 "scaled",
530 "healed",
531 "actioned",
532 "updated",
533 "migrated",
534 ): # "scaled-cooldown-time"
535 return
536
537 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
538 if command == "instantiate":
539 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
540 nsilcmop = params
541 nsilcmop_id = nsilcmop["_id"] # slice operation id
542 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
543 task = asyncio.ensure_future(
544 self.netslice.instantiate(nsir_id, nsilcmop_id)
545 )
546 self.lcm_tasks.register(
547 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
548 )
549 return
550 elif command == "terminate":
551 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
552 nsilcmop = params
553 nsilcmop_id = nsilcmop["_id"] # slice operation id
554 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
555 self.lcm_tasks.cancel(topic, nsir_id)
556 task = asyncio.ensure_future(
557 self.netslice.terminate(nsir_id, nsilcmop_id)
558 )
559 self.lcm_tasks.register(
560 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
561 )
562 return
563 elif command == "show":
564 nsir_id = params
565 try:
566 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
567 print(
568 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
569 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
570 "".format(
571 nsir_id,
572 db_nsir["operational-status"],
573 db_nsir["config-status"],
574 db_nsir["detailed-status"],
575 db_nsir["_admin"]["deployed"],
576 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
577 )
578 )
579 except Exception as e:
580 print("nsir {} not found: {}".format(nsir_id, e))
581 sys.stdout.flush()
582 return
583 elif command == "deleted":
584 return # TODO cleaning of task just in case should be done
585 elif command in (
586 "terminated",
587 "instantiated",
588 "scaled",
589 "healed",
590 "actioned",
591 ): # "scaled-cooldown-time"
592 return
593 elif topic == "vim_account":
594 vim_id = params["_id"]
595 if command in ("create", "created"):
596 if not self.main_config.RO.ng:
597 task = asyncio.ensure_future(self.vim.create(params, order_id))
598 self.lcm_tasks.register(
599 "vim_account", vim_id, order_id, "vim_create", task
600 )
601 return
602 elif command == "delete" or command == "deleted":
603 self.lcm_tasks.cancel(topic, vim_id)
604 task = asyncio.ensure_future(self.vim.delete(params, order_id))
605 self.lcm_tasks.register(
606 "vim_account", vim_id, order_id, "vim_delete", task
607 )
608 return
609 elif command == "show":
610 print("not implemented show with vim_account")
611 sys.stdout.flush()
612 return
613 elif command in ("edit", "edited"):
614 if not self.main_config.RO.ng:
615 task = asyncio.ensure_future(self.vim.edit(params, order_id))
616 self.lcm_tasks.register(
617 "vim_account", vim_id, order_id, "vim_edit", task
618 )
619 return
620 elif command == "deleted":
621 return # TODO cleaning of task just in case should be done
622 elif topic == "wim_account":
623 wim_id = params["_id"]
624 if command in ("create", "created"):
625 if not self.main_config.RO.ng:
626 task = asyncio.ensure_future(self.wim.create(params, order_id))
627 self.lcm_tasks.register(
628 "wim_account", wim_id, order_id, "wim_create", task
629 )
630 return
631 elif command == "delete" or command == "deleted":
632 self.lcm_tasks.cancel(topic, wim_id)
633 task = asyncio.ensure_future(self.wim.delete(params, order_id))
634 self.lcm_tasks.register(
635 "wim_account", wim_id, order_id, "wim_delete", task
636 )
637 return
638 elif command == "show":
639 print("not implemented show with wim_account")
640 sys.stdout.flush()
641 return
642 elif command in ("edit", "edited"):
643 task = asyncio.ensure_future(self.wim.edit(params, order_id))
644 self.lcm_tasks.register(
645 "wim_account", wim_id, order_id, "wim_edit", task
646 )
647 return
648 elif command == "deleted":
649 return # TODO cleaning of task just in case should be done
650 elif topic == "sdn":
651 _sdn_id = params["_id"]
652 if command in ("create", "created"):
653 if not self.main_config.RO.ng:
654 task = asyncio.ensure_future(self.sdn.create(params, order_id))
655 self.lcm_tasks.register(
656 "sdn", _sdn_id, order_id, "sdn_create", task
657 )
658 return
659 elif command == "delete" or command == "deleted":
660 self.lcm_tasks.cancel(topic, _sdn_id)
661 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
662 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
663 return
664 elif command in ("edit", "edited"):
665 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
666 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
667 return
668 elif command == "deleted":
669 return # TODO cleaning of task just in case should be done
670 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
671
672 async def kafka_read(self):
673 self.logger.debug(
674 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
675 )
676 self.consecutive_errors = 0
677 self.first_start = True
678 while self.consecutive_errors < 10:
679 try:
680 topics = (
681 "ns",
682 "vim_account",
683 "wim_account",
684 "sdn",
685 "nsi",
686 "k8scluster",
687 "vca",
688 "k8srepo",
689 "pla",
690 "nslcmops",
691 )
692 topics_admin = ("admin",)
693 await asyncio.gather(
694 self.msg.aioread(
695 topics,
696 aiocallback=self.kafka_read_callback,
697 from_beginning=True,
698 ),
699 self.msg_admin.aioread(
700 topics_admin,
701 aiocallback=self.kafka_read_callback,
702 group_id=False,
703 ),
704 )
705
706 except LcmExceptionExit:
707 self.logger.debug("Bye!")
708 break
709 except Exception as e:
710 # if not first_start is the first time after starting. So leave more time and wait
711 # to allow kafka starts
712 if self.consecutive_errors == 8 if not self.first_start else 30:
713 self.logger.error(
714 "Task kafka_read task exit error too many errors. Exception: {}".format(
715 e
716 )
717 )
718 raise
719 self.consecutive_errors += 1
720 self.logger.error(
721 "Task kafka_read retrying after Exception {}".format(e)
722 )
723 wait_time = 2 if not self.first_start else 5
724 await asyncio.sleep(wait_time)
725
726 self.logger.debug("Task kafka_read exit")
727
728 async def kafka_read_ping(self):
729 await asyncio.gather(self.kafka_read(), self.kafka_ping())
730
731 async def start(self):
732 # check RO version
733 await self.check_RO_version()
734
735 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
736 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
737 self.netslice = netslice.NetsliceLcm(
738 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
739 )
740 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
741 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
742 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
743 self.k8scluster = vim_sdn.K8sClusterLcm(
744 self.msg, self.lcm_tasks, self.main_config.to_dict()
745 )
746 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
747 self.k8srepo = vim_sdn.K8sRepoLcm(
748 self.msg, self.lcm_tasks, self.main_config.to_dict()
749 )
750
751 await self.kafka_read_ping()
752
753 # TODO
754 # self.logger.debug("Terminating cancelling creation tasks")
755 # self.lcm_tasks.cancel("ALL", "create")
756 # timeout = 200
757 # while self.is_pending_tasks():
758 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
759 # await asyncio.sleep(2)
760 # timeout -= 2
761 # if not timeout:
762 # self.lcm_tasks.cancel("ALL", "ALL")
763 if self.db:
764 self.db.db_disconnect()
765 if self.msg:
766 self.msg.disconnect()
767 if self.msg_admin:
768 self.msg_admin.disconnect()
769 if self.fs:
770 self.fs.fs_disconnect()
771
772 def read_config_file(self, config_file):
773 try:
774 with open(config_file) as f:
775 return yaml.safe_load(f)
776 except Exception as e:
777 self.logger.critical("At config file '{}': {}".format(config_file, e))
778 exit(1)
779
780 @staticmethod
781 def get_process_id():
782 """
783 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
784 will provide a random one
785 :return: Obtained ID
786 """
787
788 def get_docker_id():
789 try:
790 with open("/proc/self/cgroup", "r") as f:
791 text_id_ = f.readline()
792 _, _, text_id = text_id_.rpartition("/")
793 return text_id.replace("\n", "")[:12]
794 except Exception:
795 return None
796
797 def generate_random_id():
798 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
799
800 # Try getting docker id. If it fails, generate a random id
801 docker_id = get_docker_id()
802 return docker_id if docker_id else generate_random_id()
803
804
805 def usage():
806 print(
807 """Usage: {} [options]
808 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
809 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
810 -h|--help: shows this help
811 """.format(
812 sys.argv[0]
813 )
814 )
815 # --log-socket-host HOST: send logs to this host")
816 # --log-socket-port PORT: send logs using this port (default: 9022)")
817
818
819 if __name__ == "__main__":
820 try:
821 # print("SYS.PATH='{}'".format(sys.path))
822 # load parameters and configuration
823 # -h
824 # -c value
825 # --config value
826 # --help
827 # --health-check
828 opts, args = getopt.getopt(
829 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
830 )
831 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
832 config_file = None
833 for o, a in opts:
834 if o in ("-h", "--help"):
835 usage()
836 sys.exit()
837 elif o in ("-c", "--config"):
838 config_file = a
839 elif o == "--health-check":
840 from osm_lcm.lcm_hc import health_check
841
842 health_check(config_file, Lcm.ping_interval_pace)
843 else:
844 print(f"Unhandled option: {o}")
845 exit(1)
846
847 if config_file:
848 if not path.isfile(config_file):
849 print(
850 "configuration file '{}' does not exist".format(config_file),
851 file=sys.stderr,
852 )
853 exit(1)
854 else:
855 for config_file in (
856 __file__[: __file__.rfind(".")] + ".cfg",
857 "./lcm.cfg",
858 "/etc/osm/lcm.cfg",
859 ):
860 if path.isfile(config_file):
861 break
862 else:
863 print(
864 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
865 file=sys.stderr,
866 )
867 exit(1)
868 config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file)))
869 lcm = Lcm(config_file)
870 asyncio.run(lcm.start())
871 except (LcmException, getopt.GetoptError) as e:
872 print(str(e), file=sys.stderr)
873 # usage()
874 exit(1)