Bug 2042 fixed
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import pdb
23
24 import os
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31 from random import SystemRandom
32
33 from osm_lcm import ns, vim_sdn, netslice
34 from osm_lcm.ng_ro import NgRoException, NgRoClient
35 from osm_lcm.ROclient import ROClient, ROClientException
36
37 from time import time
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48 from osm_lcm.data_utils.lcm_config import LcmCfg
49 from osm_lcm.lcm_hc import get_health_check_file
50 from os import path, getenv
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70
71 main_config = LcmCfg()
72
73 def __init__(self, config_file):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 self.db = None
80 self.msg = None
81 self.msg_admin = None
82 self.fs = None
83 self.pings_not_received = 1
84 self.consecutive_errors = 0
85 self.first_start = False
86
87 # logging
88 self.logger = logging.getLogger("lcm")
89 # get id
90 self.worker_id = self.get_process_id()
91 # load configuration
92 config = self.read_config_file(config_file)
93 self.main_config.set_from_dict(config)
94 self.main_config.transform()
95 self.main_config.load_from_env()
96 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97 # TODO: check if lcm_hc.py is necessary
98 self.health_check_file = get_health_check_file(self.main_config.to_dict())
99 self.ns = (
100 self.netslice
101 ) = (
102 self.vim
103 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
104
105 # logging
106 log_format_simple = (
107 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 )
109 log_formatter_simple = logging.Formatter(
110 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
111 )
112 if self.main_config.globalConfig.logfile:
113 file_handler = logging.handlers.RotatingFileHandler(
114 self.main_config.globalConfig.logfile,
115 maxBytes=100e6,
116 backupCount=9,
117 delay=0,
118 )
119 file_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(file_handler)
121 if not self.main_config.globalConfig.to_dict()["nologging"]:
122 str_handler = logging.StreamHandler()
123 str_handler.setFormatter(log_formatter_simple)
124 self.logger.addHandler(str_handler)
125
126 if self.main_config.globalConfig.to_dict()["loglevel"]:
127 self.logger.setLevel(self.main_config.globalConfig.loglevel)
128
129 # logging other modules
130 for logger in ("message", "database", "storage", "tsdb"):
131 logger_config = self.main_config.to_dict()[logger]
132 logger_module = logging.getLogger(logger_config["logger_name"])
133 if logger_config["logfile"]:
134 file_handler = logging.handlers.RotatingFileHandler(
135 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
136 )
137 file_handler.setFormatter(log_formatter_simple)
138 logger_module.addHandler(file_handler)
139 if logger_config["loglevel"]:
140 logger_module.setLevel(logger_config["loglevel"])
141 self.logger.critical(
142 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
143 )
144
145 # check version of N2VC
146 # TODO enhance with int conversion or from distutils.version import LooseVersion
147 # or with list(map(int, version.split(".")))
148 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
149 raise LcmException(
150 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
151 n2vc_version, min_n2vc_version
152 )
153 )
154 # check version of common
155 if versiontuple(common_version) < versiontuple(min_common_version):
156 raise LcmException(
157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
158 common_version, min_common_version
159 )
160 )
161
162 try:
163 self.db = Database(self.main_config.to_dict()).instance.db
164
165 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
166 self.fs.sync()
167
168 # copy message configuration in order to remove 'group_id' for msg_admin
169 config_message = self.main_config.message.to_dict()
170 config_message["loop"] = asyncio.get_event_loop()
171 if config_message["driver"] == "local":
172 self.msg = msglocal.MsgLocal()
173 self.msg.connect(config_message)
174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 elif config_message["driver"] == "kafka":
178 self.msg = msgkafka.MsgKafka()
179 self.msg.connect(config_message)
180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
183 else:
184 raise LcmException(
185 "Invalid configuration param '{}' at '[message]':'driver'".format(
186 self.main_config.message.driver
187 )
188 )
189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
193 # contains created tasks/futures to be able to cancel
194 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
195
196 async def check_RO_version(self):
197 tries = 14
198 last_error = None
199 while True:
200 ro_uri = self.main_config.RO.uri
201 if not ro_uri:
202 ro_uri = ""
203 try:
204 # try new RO, if fail old RO
205 try:
206 self.main_config.RO.uri = ro_uri + "ro"
207 ro_server = NgRoClient(**self.main_config.RO.to_dict())
208 ro_version = await ro_server.get_version()
209 self.main_config.RO.ng = True
210 except Exception:
211 self.main_config.RO.uri = ro_uri + "openmano"
212 ro_server = ROClient(**self.main_config.RO.to_dict())
213 ro_version = await ro_server.get_version()
214 self.main_config.RO.ng = False
215 if versiontuple(ro_version) < versiontuple(min_RO_version):
216 raise LcmException(
217 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version
219 )
220 )
221 self.logger.info(
222 "Connected to RO version {} new-generation version {}".format(
223 ro_version, self.main_config.RO.ng
224 )
225 )
226 return
227 except (ROClientException, NgRoException) as e:
228 self.main_config.RO.uri = ro_uri
229 tries -= 1
230 traceback.print_tb(e.__traceback__)
231 error_text = "Error while connecting to RO on {}: {}".format(
232 self.main_config.RO.uri, e
233 )
234 if tries <= 0:
235 self.logger.critical(error_text)
236 raise LcmException(error_text)
237 if last_error != error_text:
238 last_error = error_text
239 self.logger.error(
240 error_text + ". Waiting until {} seconds".format(5 * tries)
241 )
242 await asyncio.sleep(5)
243
244 async def test(self, param=None):
245 self.logger.debug("Starting/Ending test task: {}".format(param))
246
247 async def kafka_ping(self):
248 self.logger.debug("Task kafka_ping Enter")
249 consecutive_errors = 0
250 first_start = True
251 kafka_has_received = False
252 self.pings_not_received = 1
253 while True:
254 try:
255 await self.msg_admin.aiowrite(
256 "admin",
257 "ping",
258 {
259 "from": "lcm",
260 "to": "lcm",
261 "worker_id": self.worker_id,
262 "version": lcm_version,
263 },
264 )
265 # time between pings are low when it is not received and at starting
266 wait_time = (
267 self.ping_interval_boot
268 if not kafka_has_received
269 else self.ping_interval_pace
270 )
271 if not self.pings_not_received:
272 kafka_has_received = True
273 self.pings_not_received += 1
274 await asyncio.sleep(wait_time)
275 if self.pings_not_received > 10:
276 raise LcmException("It is not receiving pings from Kafka bus")
277 consecutive_errors = 0
278 first_start = False
279 except LcmException:
280 raise
281 except Exception as e:
282 # if not first_start is the first time after starting. So leave more time and wait
283 # to allow kafka starts
284 if consecutive_errors == 8 if not first_start else 30:
285 self.logger.error(
286 "Task kafka_read task exit error too many errors. Exception: {}".format(
287 e
288 )
289 )
290 raise
291 consecutive_errors += 1
292 self.logger.error(
293 "Task kafka_read retrying after Exception {}".format(e)
294 )
295 wait_time = 2 if not first_start else 5
296 await asyncio.sleep(wait_time)
297
298 async def kafka_read_callback(self, topic, command, params):
299 order_id = 1
300
301 if topic != "admin" and command != "ping":
302 self.logger.debug(
303 "Task kafka_read receives {} {}: {}".format(topic, command, params)
304 )
305 self.consecutive_errors = 0
306 self.first_start = False
307 order_id += 1
308 if command == "exit":
309 raise LcmExceptionExit
310 elif command.startswith("#"):
311 return
312 elif command == "echo":
313 # just for test
314 print(params)
315 sys.stdout.flush()
316 return
317 elif command == "test":
318 asyncio.Task(self.test(params))
319 return
320
321 if topic == "admin":
322 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
323 if params.get("worker_id") != self.worker_id:
324 return
325 self.pings_not_received = 0
326 try:
327 with open(self.health_check_file, "w") as f:
328 f.write(str(time()))
329 except Exception as e:
330 self.logger.error(
331 "Cannot write into '{}' for healthcheck: {}".format(
332 self.health_check_file, e
333 )
334 )
335 return
336 elif topic == "nslcmops":
337 if command == "cancel":
338 nslcmop_id = params["_id"]
339 self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
340 nsr_id = params["nsInstanceId"]
341 # cancel the tasks and wait
342 for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
343 try:
344 await task
345 self.logger.debug(
346 "Cancelled task ended {},{},{}".format(
347 nsr_id, nslcmop_id, task
348 )
349 )
350 except asyncio.CancelledError:
351 self.logger.debug(
352 "Task already cancelled and finished {},{},{}".format(
353 nsr_id, nslcmop_id, task
354 )
355 )
356 # update DB
357 q_filter = {"_id": nslcmop_id}
358 update_dict = {
359 "operationState": "FAILED_TEMP",
360 "isCancelPending": False,
361 }
362 unset_dict = {
363 "cancelMode": None,
364 }
365 self.db.set_one(
366 "nslcmops",
367 q_filter=q_filter,
368 update_dict=update_dict,
369 fail_on_empty=False,
370 unset=unset_dict,
371 )
372 self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
373 return
374 elif topic == "pla":
375 if command == "placement":
376 self.ns.update_nsrs_with_pla_result(params)
377 return
378 elif topic == "k8scluster":
379 if command == "create" or command == "created":
380 k8scluster_id = params.get("_id")
381 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
382 self.lcm_tasks.register(
383 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
384 )
385 return
386 elif command == "edit" or command == "edited":
387 k8scluster_id = params.get("_id")
388 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
389 self.lcm_tasks.register(
390 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
391 )
392 return
393 elif command == "delete" or command == "deleted":
394 k8scluster_id = params.get("_id")
395 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
396 self.lcm_tasks.register(
397 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
398 )
399 return
400 elif topic == "vca":
401 if command == "create" or command == "created":
402 vca_id = params.get("_id")
403 task = asyncio.ensure_future(self.vca.create(params, order_id))
404 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
405 return
406 elif command == "edit" or command == "edited":
407 vca_id = params.get("_id")
408 task = asyncio.ensure_future(self.vca.edit(params, order_id))
409 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
410 return
411 elif command == "delete" or command == "deleted":
412 vca_id = params.get("_id")
413 task = asyncio.ensure_future(self.vca.delete(params, order_id))
414 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
415 return
416 elif topic == "k8srepo":
417 if command == "create" or command == "created":
418 k8srepo_id = params.get("_id")
419 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
420 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
421 self.lcm_tasks.register(
422 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
423 )
424 return
425 elif command == "delete" or command == "deleted":
426 k8srepo_id = params.get("_id")
427 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
428 self.lcm_tasks.register(
429 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
430 )
431 return
432 elif topic == "ns":
433 if command == "instantiate":
434 # self.logger.debug("Deploying NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
439 self.lcm_tasks.register(
440 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
441 )
442 return
443 elif command == "terminate":
444 # self.logger.debug("Deleting NS {}".format(nsr_id))
445 nslcmop = params
446 nslcmop_id = nslcmop["_id"]
447 nsr_id = nslcmop["nsInstanceId"]
448 self.lcm_tasks.cancel(topic, nsr_id)
449 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
450 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
451 return
452 elif command == "vca_status_refresh":
453 nslcmop = params
454 nslcmop_id = nslcmop["_id"]
455 nsr_id = nslcmop["nsInstanceId"]
456 task = asyncio.ensure_future(
457 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
458 )
459 self.lcm_tasks.register(
460 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
461 )
462 return
463 elif command == "action":
464 # self.logger.debug("Update NS {}".format(nsr_id))
465 nslcmop = params
466 nslcmop_id = nslcmop["_id"]
467 nsr_id = nslcmop["nsInstanceId"]
468 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
469 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
470 return
471 elif command == "update":
472 # self.logger.debug("Update NS {}".format(nsr_id))
473 nslcmop = params
474 nslcmop_id = nslcmop["_id"]
475 nsr_id = nslcmop["nsInstanceId"]
476 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
477 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
478 return
479 elif command == "scale":
480 # self.logger.debug("Update NS {}".format(nsr_id))
481 nslcmop = params
482 nslcmop_id = nslcmop["_id"]
483 nsr_id = nslcmop["nsInstanceId"]
484 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
485 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
486 return
487 elif command == "heal":
488 # self.logger.debug("Healing NS {}".format(nsr_id))
489 nslcmop = params
490 nslcmop_id = nslcmop["_id"]
491 nsr_id = nslcmop["nsInstanceId"]
492 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
493 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
494 return
495 elif command == "migrate":
496 nslcmop = params
497 nslcmop_id = nslcmop["_id"]
498 nsr_id = nslcmop["nsInstanceId"]
499 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
500 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
501 return
502 elif command == "verticalscale":
503 nslcmop = params
504 nslcmop_id = nslcmop["_id"]
505 nsr_id = nslcmop["nsInstanceId"]
506 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
507 self.logger.debug(
508 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
509 )
510 self.lcm_tasks.register(
511 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
512 )
513 self.logger.debug(
514 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
515 )
516 return
517 elif command == "show":
518 nsr_id = params
519 try:
520 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
521 print(
522 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
523 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
524 "".format(
525 nsr_id,
526 db_nsr["operational-status"],
527 db_nsr["config-status"],
528 db_nsr["detailed-status"],
529 db_nsr["_admin"]["deployed"],
530 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
531 )
532 )
533 except Exception as e:
534 print("nsr {} not found: {}".format(nsr_id, e))
535 sys.stdout.flush()
536 return
537 elif command == "deleted":
538 return # TODO cleaning of task just in case should be done
539 elif command in (
540 "vnf_terminated",
541 "policy_updated",
542 "terminated",
543 "instantiated",
544 "scaled",
545 "healed",
546 "actioned",
547 "updated",
548 "migrated",
549 "verticalscaled",
550 ): # "scaled-cooldown-time"
551 return
552
553 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
554 if command == "instantiate":
555 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
556 nsilcmop = params
557 nsilcmop_id = nsilcmop["_id"] # slice operation id
558 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
559 task = asyncio.ensure_future(
560 self.netslice.instantiate(nsir_id, nsilcmop_id)
561 )
562 self.lcm_tasks.register(
563 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
564 )
565 return
566 elif command == "terminate":
567 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
568 nsilcmop = params
569 nsilcmop_id = nsilcmop["_id"] # slice operation id
570 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
571 self.lcm_tasks.cancel(topic, nsir_id)
572 task = asyncio.ensure_future(
573 self.netslice.terminate(nsir_id, nsilcmop_id)
574 )
575 self.lcm_tasks.register(
576 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
577 )
578 return
579 elif command == "show":
580 nsir_id = params
581 try:
582 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
583 print(
584 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
585 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
586 "".format(
587 nsir_id,
588 db_nsir["operational-status"],
589 db_nsir["config-status"],
590 db_nsir["detailed-status"],
591 db_nsir["_admin"]["deployed"],
592 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
593 )
594 )
595 except Exception as e:
596 print("nsir {} not found: {}".format(nsir_id, e))
597 sys.stdout.flush()
598 return
599 elif command == "deleted":
600 return # TODO cleaning of task just in case should be done
601 elif command in (
602 "terminated",
603 "instantiated",
604 "scaled",
605 "healed",
606 "actioned",
607 ): # "scaled-cooldown-time"
608 return
609 elif topic == "vim_account":
610 vim_id = params["_id"]
611 if command in ("create", "created"):
612 if not self.main_config.RO.ng:
613 task = asyncio.ensure_future(self.vim.create(params, order_id))
614 self.lcm_tasks.register(
615 "vim_account", vim_id, order_id, "vim_create", task
616 )
617 return
618 elif command == "delete" or command == "deleted":
619 self.lcm_tasks.cancel(topic, vim_id)
620 task = asyncio.ensure_future(self.vim.delete(params, order_id))
621 self.lcm_tasks.register(
622 "vim_account", vim_id, order_id, "vim_delete", task
623 )
624 return
625 elif command == "show":
626 print("not implemented show with vim_account")
627 sys.stdout.flush()
628 return
629 elif command in ("edit", "edited"):
630 if not self.main_config.RO.ng:
631 task = asyncio.ensure_future(self.vim.edit(params, order_id))
632 self.lcm_tasks.register(
633 "vim_account", vim_id, order_id, "vim_edit", task
634 )
635 return
636 elif command == "deleted":
637 return # TODO cleaning of task just in case should be done
638 elif topic == "wim_account":
639 wim_id = params["_id"]
640 if command in ("create", "created"):
641 if not self.main_config.RO.ng:
642 task = asyncio.ensure_future(self.wim.create(params, order_id))
643 self.lcm_tasks.register(
644 "wim_account", wim_id, order_id, "wim_create", task
645 )
646 return
647 elif command == "delete" or command == "deleted":
648 self.lcm_tasks.cancel(topic, wim_id)
649 task = asyncio.ensure_future(self.wim.delete(params, order_id))
650 self.lcm_tasks.register(
651 "wim_account", wim_id, order_id, "wim_delete", task
652 )
653 return
654 elif command == "show":
655 print("not implemented show with wim_account")
656 sys.stdout.flush()
657 return
658 elif command in ("edit", "edited"):
659 task = asyncio.ensure_future(self.wim.edit(params, order_id))
660 self.lcm_tasks.register(
661 "wim_account", wim_id, order_id, "wim_edit", task
662 )
663 return
664 elif command == "deleted":
665 return # TODO cleaning of task just in case should be done
666 elif topic == "sdn":
667 _sdn_id = params["_id"]
668 if command in ("create", "created"):
669 if not self.main_config.RO.ng:
670 task = asyncio.ensure_future(self.sdn.create(params, order_id))
671 self.lcm_tasks.register(
672 "sdn", _sdn_id, order_id, "sdn_create", task
673 )
674 return
675 elif command == "delete" or command == "deleted":
676 self.lcm_tasks.cancel(topic, _sdn_id)
677 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
678 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
679 return
680 elif command in ("edit", "edited"):
681 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
682 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
683 return
684 elif command == "deleted":
685 return # TODO cleaning of task just in case should be done
686 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
687
688 async def kafka_read(self):
689 self.logger.debug(
690 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
691 )
692 self.consecutive_errors = 0
693 self.first_start = True
694 while self.consecutive_errors < 10:
695 try:
696 topics = (
697 "ns",
698 "vim_account",
699 "wim_account",
700 "sdn",
701 "nsi",
702 "k8scluster",
703 "vca",
704 "k8srepo",
705 "pla",
706 "nslcmops",
707 )
708 topics_admin = ("admin",)
709 await asyncio.gather(
710 self.msg.aioread(
711 topics,
712 aiocallback=self.kafka_read_callback,
713 from_beginning=True,
714 ),
715 self.msg_admin.aioread(
716 topics_admin,
717 aiocallback=self.kafka_read_callback,
718 group_id=False,
719 ),
720 )
721
722 except LcmExceptionExit:
723 self.logger.debug("Bye!")
724 break
725 except Exception as e:
726 # if not first_start is the first time after starting. So leave more time and wait
727 # to allow kafka starts
728 if self.consecutive_errors == 8 if not self.first_start else 30:
729 self.logger.error(
730 "Task kafka_read task exit error too many errors. Exception: {}".format(
731 e
732 )
733 )
734 raise
735 self.consecutive_errors += 1
736 self.logger.error(
737 "Task kafka_read retrying after Exception {}".format(e)
738 )
739 wait_time = 2 if not self.first_start else 5
740 await asyncio.sleep(wait_time)
741
742 self.logger.debug("Task kafka_read exit")
743
744 async def kafka_read_ping(self):
745 await asyncio.gather(self.kafka_read(), self.kafka_ping())
746
747 async def start(self):
748 # check RO version
749 await self.check_RO_version()
750
751 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
752 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
753 self.netslice = netslice.NetsliceLcm(
754 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
755 )
756 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
757 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
758 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
759 self.k8scluster = vim_sdn.K8sClusterLcm(
760 self.msg, self.lcm_tasks, self.main_config.to_dict()
761 )
762 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
763 self.k8srepo = vim_sdn.K8sRepoLcm(
764 self.msg, self.lcm_tasks, self.main_config.to_dict()
765 )
766
767 await self.kafka_read_ping()
768
769 # TODO
770 # self.logger.debug("Terminating cancelling creation tasks")
771 # self.lcm_tasks.cancel("ALL", "create")
772 # timeout = 200
773 # while self.is_pending_tasks():
774 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
775 # await asyncio.sleep(2)
776 # timeout -= 2
777 # if not timeout:
778 # self.lcm_tasks.cancel("ALL", "ALL")
779 if self.db:
780 self.db.db_disconnect()
781 if self.msg:
782 self.msg.disconnect()
783 if self.msg_admin:
784 self.msg_admin.disconnect()
785 if self.fs:
786 self.fs.fs_disconnect()
787
788 def read_config_file(self, config_file):
789 try:
790 with open(config_file) as f:
791 return yaml.safe_load(f)
792 except Exception as e:
793 self.logger.critical("At config file '{}': {}".format(config_file, e))
794 exit(1)
795
796 @staticmethod
797 def get_process_id():
798 """
799 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
800 will provide a random one
801 :return: Obtained ID
802 """
803
804 def get_docker_id():
805 try:
806 with open("/proc/self/cgroup", "r") as f:
807 text_id_ = f.readline()
808 _, _, text_id = text_id_.rpartition("/")
809 return text_id.replace("\n", "")[:12]
810 except Exception:
811 return None
812
813 def generate_random_id():
814 return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
815
816 # Try getting docker id. If it fails, generate a random id
817 docker_id = get_docker_id()
818 return docker_id if docker_id else generate_random_id()
819
820
821 def usage():
822 print(
823 """Usage: {} [options]
824 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
825 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
826 -h|--help: shows this help
827 """.format(
828 sys.argv[0]
829 )
830 )
831 # --log-socket-host HOST: send logs to this host")
832 # --log-socket-port PORT: send logs using this port (default: 9022)")
833
834
835 if __name__ == "__main__":
836 try:
837 # print("SYS.PATH='{}'".format(sys.path))
838 # load parameters and configuration
839 # -h
840 # -c value
841 # --config value
842 # --help
843 # --health-check
844 opts, args = getopt.getopt(
845 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
846 )
847 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
848 config_file = None
849 for o, a in opts:
850 if o in ("-h", "--help"):
851 usage()
852 sys.exit()
853 elif o in ("-c", "--config"):
854 config_file = a
855 elif o == "--health-check":
856 from osm_lcm.lcm_hc import health_check
857
858 health_check(config_file, Lcm.ping_interval_pace)
859 else:
860 print(f"Unhandled option: {o}")
861 exit(1)
862
863 if config_file:
864 if not path.isfile(config_file):
865 print(
866 "configuration file '{}' does not exist".format(config_file),
867 file=sys.stderr,
868 )
869 exit(1)
870 else:
871 for config_file in (
872 __file__[: __file__.rfind(".")] + ".cfg",
873 "./lcm.cfg",
874 "/etc/osm/lcm.cfg",
875 ):
876 if path.isfile(config_file):
877 break
878 else:
879 print(
880 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
881 file=sys.stderr,
882 )
883 exit(1)
884 config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file)))
885 lcm = Lcm(config_file)
886 asyncio.run(lcm.start())
887 except (LcmException, getopt.GetoptError) as e:
888 print(str(e), file=sys.stderr)
889 # usage()
890 exit(1)