Fixes VCA deletion in NS termination
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import pdb
23
24 import asyncio
25 import yaml
26 import logging
27 import logging.handlers
28 import getopt
29 import sys
30 from random import SystemRandom
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path, getenv
50 from n2vc import version as n2vc_version
51 import traceback
52
53 if getenv("OSMLCM_PDB_DEBUG", None) is not None:
54 pdb.set_trace()
55
56
57 __author__ = "Alfonso Tierno"
58 min_RO_version = "6.0.2"
59 min_n2vc_version = "0.0.2"
60
61 min_common_version = "0.1.19"
62
63
64 class Lcm:
65 ping_interval_pace = (
66 120 # how many time ping is send once is confirmed all is running
67 )
68 ping_interval_boot = 5 # how many time ping is sent when booting
69
70 main_config = LcmCfg()
71
72 def __init__(self, config_file):
73 """
74 Init, Connect to database, filesystem storage, and messaging
75 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
76 :return: None
77 """
78 self.db = None
79 self.msg = None
80 self.msg_admin = None
81 self.fs = None
82 self.pings_not_received = 1
83 self.consecutive_errors = 0
84 self.first_start = False
85
86 # logging
87 self.logger = logging.getLogger("lcm")
88 # get id
89 self.worker_id = self.get_process_id()
90 # load configuration
91 config = self.read_config_file(config_file)
92 self.main_config.set_from_dict(config)
93 self.main_config.transform()
94 self.main_config.load_from_env()
95 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
96 # TODO: check if lcm_hc.py is necessary
97 self.health_check_file = get_health_check_file(self.main_config.to_dict())
98 self.ns = (
99 self.netslice
100 ) = (
101 self.vim
102 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
103
104 # logging
105 log_format_simple = (
106 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 )
108 log_formatter_simple = logging.Formatter(
109 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
110 )
111 if self.main_config.globalConfig.logfile:
112 file_handler = logging.handlers.RotatingFileHandler(
113 self.main_config.globalConfig.logfile,
114 maxBytes=100e6,
115 backupCount=9,
116 delay=0,
117 )
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not self.main_config.globalConfig.to_dict()["nologging"]:
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if self.main_config.globalConfig.to_dict()["loglevel"]:
126 self.logger.setLevel(self.main_config.globalConfig.loglevel)
127
128 # logging other modules
129 for logger in ("message", "database", "storage", "tsdb"):
130 logger_config = self.main_config.to_dict()[logger]
131 logger_module = logging.getLogger(logger_config["logger_name"])
132 if logger_config["logfile"]:
133 file_handler = logging.handlers.RotatingFileHandler(
134 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
135 )
136 file_handler.setFormatter(log_formatter_simple)
137 logger_module.addHandler(file_handler)
138 if logger_config["loglevel"]:
139 logger_module.setLevel(logger_config["loglevel"])
140 self.logger.critical(
141 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
142 )
143
144 # check version of N2VC
145 # TODO enhance with int conversion or from distutils.version import LooseVersion
146 # or with list(map(int, version.split(".")))
147 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
148 raise LcmException(
149 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
150 n2vc_version, min_n2vc_version
151 )
152 )
153 # check version of common
154 if versiontuple(common_version) < versiontuple(min_common_version):
155 raise LcmException(
156 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
157 common_version, min_common_version
158 )
159 )
160
161 try:
162 self.db = Database(self.main_config.to_dict()).instance.db
163
164 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
165 self.fs.sync()
166
167 # copy message configuration in order to remove 'group_id' for msg_admin
168 config_message = self.main_config.message.to_dict()
169 config_message["loop"] = asyncio.get_event_loop()
170 if config_message["driver"] == "local":
171 self.msg = msglocal.MsgLocal()
172 self.msg.connect(config_message)
173 self.msg_admin = msglocal.MsgLocal()
174 config_message.pop("group_id", None)
175 self.msg_admin.connect(config_message)
176 elif config_message["driver"] == "kafka":
177 self.msg = msgkafka.MsgKafka()
178 self.msg.connect(config_message)
179 self.msg_admin = msgkafka.MsgKafka()
180 config_message.pop("group_id", None)
181 self.msg_admin.connect(config_message)
182 else:
183 raise LcmException(
184 "Invalid configuration param '{}' at '[message]':'driver'".format(
185 self.main_config.message.driver
186 )
187 )
188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
192 # contains created tasks/futures to be able to cancel
193 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
194
195 async def check_RO_version(self):
196 tries = 14
197 last_error = None
198 while True:
199 ro_uri = self.main_config.RO.uri
200 if not ro_uri:
201 ro_uri = ""
202 try:
203 # try new RO, if fail old RO
204 try:
205 self.main_config.RO.uri = ro_uri + "ro"
206 ro_server = NgRoClient(**self.main_config.RO.to_dict())
207 ro_version = await ro_server.get_version()
208 self.main_config.RO.ng = True
209 except Exception:
210 self.main_config.RO.uri = ro_uri + "openmano"
211 ro_server = ROClient(**self.main_config.RO.to_dict())
212 ro_version = await ro_server.get_version()
213 self.main_config.RO.ng = False
214 if versiontuple(ro_version) < versiontuple(min_RO_version):
215 raise LcmException(
216 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
217 ro_version, min_RO_version
218 )
219 )
220 self.logger.info(
221 "Connected to RO version {} new-generation version {}".format(
222 ro_version, self.main_config.RO.ng
223 )
224 )
225 return
226 except (ROClientException, NgRoException) as e:
227 self.main_config.RO.uri = ro_uri
228 tries -= 1
229 traceback.print_tb(e.__traceback__)
230 error_text = "Error while connecting to RO on {}: {}".format(
231 self.main_config.RO.uri, e
232 )
233 if tries <= 0:
234 self.logger.critical(error_text)
235 raise LcmException(error_text)
236 if last_error != error_text:
237 last_error = error_text
238 self.logger.error(
239 error_text + ". Waiting until {} seconds".format(5 * tries)
240 )
241 await asyncio.sleep(5)
242
243 async def test(self, param=None):
244 self.logger.debug("Starting/Ending test task: {}".format(param))
245
246 async def kafka_ping(self):
247 self.logger.debug("Task kafka_ping Enter")
248 consecutive_errors = 0
249 first_start = True
250 kafka_has_received = False
251 self.pings_not_received = 1
252 while True:
253 try:
254 await self.msg_admin.aiowrite(
255 "admin",
256 "ping",
257 {
258 "from": "lcm",
259 "to": "lcm",
260 "worker_id": self.worker_id,
261 "version": lcm_version,
262 },
263 )
264 # time between pings are low when it is not received and at starting
265 wait_time = (
266 self.ping_interval_boot
267 if not kafka_has_received
268 else self.ping_interval_pace
269 )
270 if not self.pings_not_received:
271 kafka_has_received = True
272 self.pings_not_received += 1
273 await asyncio.sleep(wait_time)
274 if self.pings_not_received > 10:
275 raise LcmException("It is not receiving pings from Kafka bus")
276 consecutive_errors = 0
277 first_start = False
278 except LcmException:
279 raise
280 except Exception as e:
281 # if not first_start is the first time after starting. So leave more time and wait
282 # to allow kafka starts
283 if consecutive_errors == 8 if not first_start else 30:
284 self.logger.error(
285 "Task kafka_read task exit error too many errors. Exception: {}".format(
286 e
287 )
288 )
289 raise
290 consecutive_errors += 1
291 self.logger.error(
292 "Task kafka_read retrying after Exception {}".format(e)
293 )
294 wait_time = 2 if not first_start else 5
295 await asyncio.sleep(wait_time)
296
297 async def kafka_read_callback(self, topic, command, params):
298 order_id = 1
299
300 if topic != "admin" and command != "ping":
301 self.logger.debug(
302 "Task kafka_read receives {} {}: {}".format(topic, command, params)
303 )
304 self.consecutive_errors = 0
305 self.first_start = False
306 order_id += 1
307 if command == "exit":
308 raise LcmExceptionExit
309 elif command.startswith("#"):
310 return
311 elif command == "echo":
312 # just for test
313 print(params)
314 sys.stdout.flush()
315 return
316 elif command == "test":
317 asyncio.Task(self.test(params))
318 return
319
320 if topic == "admin":
321 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
322 if params.get("worker_id") != self.worker_id:
323 return
324 self.pings_not_received = 0
325 try:
326 with open(self.health_check_file, "w") as f:
327 f.write(str(time()))
328 except Exception as e:
329 self.logger.error(
330 "Cannot write into '{}' for healthcheck: {}".format(
331 self.health_check_file, e
332 )
333 )
334 return
335 elif topic == "nslcmops":
336 if command == "cancel":
337 nslcmop_id = params["_id"]
338 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
339 nsr_id = params["nsInstanceId"]
340 # cancel the tasks and wait
341 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
342 try:
343 await task
344 self.logger.debug(
345 "Cancelled task ended {},{},{}".format(
346 nsr_id, nslcmop_id, task
347 )
348 )
349 except asyncio.CancelledError:
350 self.logger.debug(
351 "Task already cancelled and finished {},{},{}".format(
352 nsr_id, nslcmop_id, task
353 )
354 )
355 # update DB
356 q_filter = {"_id": nslcmop_id}
357 update_dict = {
358 "operationState": "FAILED_TEMP",
359 "isCancelPending": False,
360 }
361 unset_dict = {
362 "cancelMode": None,
363 }
364 self.db.set_one(
365 "nslcmops",
366 q_filter=q_filter,
367 update_dict=update_dict,
368 fail_on_empty=False,
369 unset=unset_dict,
370 )
371 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
372 return
373 elif topic == "pla":
374 if command == "placement":
375 self.ns.update_nsrs_with_pla_result(params)
376 return
377 elif topic == "k8scluster":
378 if command == "create" or command == "created":
379 k8scluster_id = params.get("_id")
380 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
381 self.lcm_tasks.register(
382 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
383 )
384 return
385 elif command == "edit" or command == "edited":
386 k8scluster_id = params.get("_id")
387 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
388 self.lcm_tasks.register(
389 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
390 )
391 return
392 elif command == "delete" or command == "deleted":
393 k8scluster_id = params.get("_id")
394 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
395 self.lcm_tasks.register(
396 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
397 )
398 return
399 elif topic == "vca":
400 if command == "create" or command == "created":
401 vca_id = params.get("_id")
402 task = asyncio.ensure_future(self.vca.create(params, order_id))
403 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
404 return
405 elif command == "edit" or command == "edited":
406 vca_id = params.get("_id")
407 task = asyncio.ensure_future(self.vca.edit(params, order_id))
408 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
409 return
410 elif command == "delete" or command == "deleted":
411 vca_id = params.get("_id")
412 task = asyncio.ensure_future(self.vca.delete(params, order_id))
413 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
414 return
415 elif topic == "k8srepo":
416 if command == "create" or command == "created":
417 k8srepo_id = params.get("_id")
418 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
419 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
420 self.lcm_tasks.register(
421 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
422 )
423 return
424 elif command == "delete" or command == "deleted":
425 k8srepo_id = params.get("_id")
426 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
427 self.lcm_tasks.register(
428 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
429 )
430 return
431 elif topic == "ns":
432 if command == "instantiate":
433 # self.logger.debug("Deploying NS {}".format(nsr_id))
434 nslcmop = params
435 nslcmop_id = nslcmop["_id"]
436 nsr_id = nslcmop["nsInstanceId"]
437 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
438 self.lcm_tasks.register(
439 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
440 )
441 return
442 elif command == "terminate":
443 # self.logger.debug("Deleting NS {}".format(nsr_id))
444 nslcmop = params
445 nslcmop_id = nslcmop["_id"]
446 nsr_id = nslcmop["nsInstanceId"]
447 self.lcm_tasks.cancel(topic, nsr_id)
448 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
449 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
450 return
451 elif command == "vca_status_refresh":
452 nslcmop = params
453 nslcmop_id = nslcmop["_id"]
454 nsr_id = nslcmop["nsInstanceId"]
455 task = asyncio.ensure_future(
456 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
457 )
458 self.lcm_tasks.register(
459 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
460 )
461 return
462 elif command == "action":
463 # self.logger.debug("Update NS {}".format(nsr_id))
464 nslcmop = params
465 nslcmop_id = nslcmop["_id"]
466 nsr_id = nslcmop["nsInstanceId"]
467 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
468 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
469 return
470 elif command == "update":
471 # self.logger.debug("Update NS {}".format(nsr_id))
472 nslcmop = params
473 nslcmop_id = nslcmop["_id"]
474 nsr_id = nslcmop["nsInstanceId"]
475 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
476 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
477 return
478 elif command == "scale":
479 # self.logger.debug("Update NS {}".format(nsr_id))
480 nslcmop = params
481 nslcmop_id = nslcmop["_id"]
482 nsr_id = nslcmop["nsInstanceId"]
483 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
484 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
485 return
486 elif command == "heal":
487 # self.logger.debug("Healing NS {}".format(nsr_id))
488 nslcmop = params
489 nslcmop_id = nslcmop["_id"]
490 nsr_id = nslcmop["nsInstanceId"]
491 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
492 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
493 return
494 elif command == "migrate":
495 nslcmop = params
496 nslcmop_id = nslcmop["_id"]
497 nsr_id = nslcmop["nsInstanceId"]
498 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
499 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
500 return
501 elif command == "verticalscale":
502 nslcmop = params
503 nslcmop_id = nslcmop["_id"]
504 nsr_id = nslcmop["nsInstanceId"]
505 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
506 self.logger.debug(
507 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
508 )
509 self.lcm_tasks.register(
510 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
511 )
512 self.logger.debug(
513 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
514 )
515 return
516 elif command == "show":
517 nsr_id = params
518 try:
519 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
520 print(
521 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
522 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
523 "".format(
524 nsr_id,
525 db_nsr["operational-status"],
526 db_nsr["config-status"],
527 db_nsr["detailed-status"],
528 db_nsr["_admin"]["deployed"],
529 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
530 )
531 )
532 except Exception as e:
533 print("nsr {} not found: {}".format(nsr_id, e))
534 sys.stdout.flush()
535 return
536 elif command == "deleted":
537 return # TODO cleaning of task just in case should be done
538 elif command in (
539 "vnf_terminated",
540 "policy_updated",
541 "terminated",
542 "instantiated",
543 "scaled",
544 "healed",
545 "actioned",
546 "updated",
547 "migrated",
548 "verticalscaled",
549 ): # "scaled-cooldown-time"
550 return
551
552 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
553 if command == "instantiate":
554 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
555 nsilcmop = params
556 nsilcmop_id = nsilcmop["_id"] # slice operation id
557 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
558 task = asyncio.ensure_future(
559 self.netslice.instantiate(nsir_id, nsilcmop_id)
560 )
561 self.lcm_tasks.register(
562 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
563 )
564 return
565 elif command == "terminate":
566 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
567 nsilcmop = params
568 nsilcmop_id = nsilcmop["_id"] # slice operation id
569 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
570 self.lcm_tasks.cancel(topic, nsir_id)
571 task = asyncio.ensure_future(
572 self.netslice.terminate(nsir_id, nsilcmop_id)
573 )
574 self.lcm_tasks.register(
575 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
576 )
577 return
578 elif command == "show":
579 nsir_id = params
580 try:
581 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
582 print(
583 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
584 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
585 "".format(
586 nsir_id,
587 db_nsir["operational-status"],
588 db_nsir["config-status"],
589 db_nsir["detailed-status"],
590 db_nsir["_admin"]["deployed"],
591 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
592 )
593 )
594 except Exception as e:
595 print("nsir {} not found: {}".format(nsir_id, e))
596 sys.stdout.flush()
597 return
598 elif command == "deleted":
599 return # TODO cleaning of task just in case should be done
600 elif command in (
601 "terminated",
602 "instantiated",
603 "scaled",
604 "healed",
605 "actioned",
606 ): # "scaled-cooldown-time"
607 return
608 elif topic == "vim_account":
609 vim_id = params["_id"]
610 if command in ("create", "created"):
611 if not self.main_config.RO.ng:
612 task = asyncio.ensure_future(self.vim.create(params, order_id))
613 self.lcm_tasks.register(
614 "vim_account", vim_id, order_id, "vim_create", task
615 )
616 return
617 elif command == "delete" or command == "deleted":
618 self.lcm_tasks.cancel(topic, vim_id)
619 task = asyncio.ensure_future(self.vim.delete(params, order_id))
620 self.lcm_tasks.register(
621 "vim_account", vim_id, order_id, "vim_delete", task
622 )
623 return
624 elif command == "show":
625 print("not implemented show with vim_account")
626 sys.stdout.flush()
627 return
628 elif command in ("edit", "edited"):
629 if not self.main_config.RO.ng:
630 task = asyncio.ensure_future(self.vim.edit(params, order_id))
631 self.lcm_tasks.register(
632 "vim_account", vim_id, order_id, "vim_edit", task
633 )
634 return
635 elif command == "deleted":
636 return # TODO cleaning of task just in case should be done
637 elif topic == "wim_account":
638 wim_id = params["_id"]
639 if command in ("create", "created"):
640 if not self.main_config.RO.ng:
641 task = asyncio.ensure_future(self.wim.create(params, order_id))
642 self.lcm_tasks.register(
643 "wim_account", wim_id, order_id, "wim_create", task
644 )
645 return
646 elif command == "delete" or command == "deleted":
647 self.lcm_tasks.cancel(topic, wim_id)
648 task = asyncio.ensure_future(self.wim.delete(params, order_id))
649 self.lcm_tasks.register(
650 "wim_account", wim_id, order_id, "wim_delete", task
651 )
652 return
653 elif command == "show":
654 print("not implemented show with wim_account")
655 sys.stdout.flush()
656 return
657 elif command in ("edit", "edited"):
658 task = asyncio.ensure_future(self.wim.edit(params, order_id))
659 self.lcm_tasks.register(
660 "wim_account", wim_id, order_id, "wim_edit", task
661 )
662 return
663 elif command == "deleted":
664 return # TODO cleaning of task just in case should be done
665 elif topic == "sdn":
666 _sdn_id = params["_id"]
667 if command in ("create", "created"):
668 if not self.main_config.RO.ng:
669 task = asyncio.ensure_future(self.sdn.create(params, order_id))
670 self.lcm_tasks.register(
671 "sdn", _sdn_id, order_id, "sdn_create", task
672 )
673 return
674 elif command == "delete" or command == "deleted":
675 self.lcm_tasks.cancel(topic, _sdn_id)
676 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
677 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
678 return
679 elif command in ("edit", "edited"):
680 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
681 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
682 return
683 elif command == "deleted":
684 return # TODO cleaning of task just in case should be done
685 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
686
687 async def kafka_read(self):
688 self.logger.debug(
689 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
690 )
691 self.consecutive_errors = 0
692 self.first_start = True
693 while self.consecutive_errors < 10:
694 try:
695 topics = (
696 "ns",
697 "vim_account",
698 "wim_account",
699 "sdn",
700 "nsi",
701 "k8scluster",
702 "vca",
703 "k8srepo",
704 "pla",
705 "nslcmops",
706 )
707 topics_admin = ("admin",)
708 await asyncio.gather(
709 self.msg.aioread(
710 topics,
711 aiocallback=self.kafka_read_callback,
712 from_beginning=True,
713 ),
714 self.msg_admin.aioread(
715 topics_admin,
716 aiocallback=self.kafka_read_callback,
717 group_id=False,
718 ),
719 )
720
721 except LcmExceptionExit:
722 self.logger.debug("Bye!")
723 break
724 except Exception as e:
725 # if not first_start is the first time after starting. So leave more time and wait
726 # to allow kafka starts
727 if self.consecutive_errors == 8 if not self.first_start else 30:
728 self.logger.error(
729 "Task kafka_read task exit error too many errors. Exception: {}".format(
730 e
731 )
732 )
733 raise
734 self.consecutive_errors += 1
735 self.logger.error(
736 "Task kafka_read retrying after Exception {}".format(e)
737 )
738 wait_time = 2 if not self.first_start else 5
739 await asyncio.sleep(wait_time)
740
741 self.logger.debug("Task kafka_read exit")
742
743 async def kafka_read_ping(self):
744 await asyncio.gather(self.kafka_read(), self.kafka_ping())
745
746 async def start(self):
747 # check RO version
748 await self.check_RO_version()
749
750 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
751 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
752 self.netslice = netslice.NetsliceLcm(
753 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
754 )
755 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
756 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
757 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
758 self.k8scluster = vim_sdn.K8sClusterLcm(
759 self.msg, self.lcm_tasks, self.main_config.to_dict()
760 )
761 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
762 self.k8srepo = vim_sdn.K8sRepoLcm(
763 self.msg, self.lcm_tasks, self.main_config.to_dict()
764 )
765
766 await self.kafka_read_ping()
767
768 # TODO
769 # self.logger.debug("Terminating cancelling creation tasks")
770 # self.lcm_tasks.cancel("ALL", "create")
771 # timeout = 200
772 # while self.is_pending_tasks():
773 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
774 # await asyncio.sleep(2)
775 # timeout -= 2
776 # if not timeout:
777 # self.lcm_tasks.cancel("ALL", "ALL")
778 if self.db:
779 self.db.db_disconnect()
780 if self.msg:
781 self.msg.disconnect()
782 if self.msg_admin:
783 self.msg_admin.disconnect()
784 if self.fs:
785 self.fs.fs_disconnect()
786
787 def read_config_file(self, config_file):
788 try:
789 with open(config_file) as f:
790 return yaml.safe_load(f)
791 except Exception as e:
792 self.logger.critical("At config file '{}': {}".format(config_file, e))
793 exit(1)
794
795 @staticmethod
796 def get_process_id():
797 """
798 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
799 will provide a random one
800 :return: Obtained ID
801 """
802
803 def get_docker_id():
804 try:
805 with open("/proc/self/cgroup", "r") as f:
806 text_id_ = f.readline()
807 _, _, text_id = text_id_.rpartition("/")
808 return text_id.replace("\n", "")[:12]
809 except Exception:
810 return None
811
812 def generate_random_id():
813 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
814
815 # Try getting docker id. If it fails, generate a random id
816 docker_id = get_docker_id()
817 return docker_id if docker_id else generate_random_id()
818
819
820 def usage():
821 print(
822 """Usage: {} [options]
823 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
824 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
825 -h|--help: shows this help
826 """.format(
827 sys.argv[0]
828 )
829 )
830 # --log-socket-host HOST: send logs to this host")
831 # --log-socket-port PORT: send logs using this port (default: 9022)")
832
833
834 if __name__ == "__main__":
835 try:
836 # print("SYS.PATH='{}'".format(sys.path))
837 # load parameters and configuration
838 # -h
839 # -c value
840 # --config value
841 # --help
842 # --health-check
843 opts, args = getopt.getopt(
844 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
845 )
846 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
847 config_file = None
848 for o, a in opts:
849 if o in ("-h", "--help"):
850 usage()
851 sys.exit()
852 elif o in ("-c", "--config"):
853 config_file = a
854 elif o == "--health-check":
855 from osm_lcm.lcm_hc import health_check
856
857 health_check(config_file, Lcm.ping_interval_pace)
858 else:
859 print(f"Unhandled option: {o}")
860 exit(1)
861
862 if config_file:
863 if not path.isfile(config_file):
864 print(
865 "configuration file '{}' does not exist".format(config_file),
866 file=sys.stderr,
867 )
868 exit(1)
869 else:
870 for config_file in (
871 __file__[: __file__.rfind(".")] + ".cfg",
872 "./lcm.cfg",
873 "/etc/osm/lcm.cfg",
874 ):
875 if path.isfile(config_file):
876 break
877 else:
878 print(
879 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
880 file=sys.stderr,
881 )
882 exit(1)
883 lcm = Lcm(config_file)
884 asyncio.run(lcm.start())
885 except (LcmException, getopt.GetoptError) as e:
886 print(str(e), file=sys.stderr)
887 # usage()
888 exit(1)