Bug 2174: VCA Update is not implemented
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path
50 from random import choice as random_choice
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66
67 ping_interval_pace = (
68 120 # how many time ping is send once is confirmed all is running
69 )
70 ping_interval_boot = 5 # how many time ping is sent when booting
71
72 main_config = LcmCfg()
73
74 def __init__(self, config_file, loop=None):
75 """
76 Init, Connect to database, filesystem storage, and messaging
77 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
78 :return: None
79 """
80 self.db = None
81 self.msg = None
82 self.msg_admin = None
83 self.fs = None
84 self.pings_not_received = 1
85 self.consecutive_errors = 0
86 self.first_start = False
87
88 # logging
89 self.logger = logging.getLogger("lcm")
90 # get id
91 self.worker_id = self.get_process_id()
92 # load configuration
93 config = self.read_config_file(config_file)
94 self.main_config.set_from_dict(config)
95 self.main_config.transform()
96 self.main_config.load_from_env()
97 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
98 # TODO: check if lcm_hc.py is necessary
99 self.health_check_file = get_health_check_file(self.main_config.to_dict())
100 self.loop = loop or asyncio.get_event_loop()
101 self.ns = (
102 self.netslice
103 ) = (
104 self.vim
105 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
106
107 # logging
108 log_format_simple = (
109 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
110 )
111 log_formatter_simple = logging.Formatter(
112 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
113 )
114 if self.main_config.globalConfig.logfile:
115 file_handler = logging.handlers.RotatingFileHandler(
116 self.main_config.globalConfig.logfile,
117 maxBytes=100e6,
118 backupCount=9,
119 delay=0,
120 )
121 file_handler.setFormatter(log_formatter_simple)
122 self.logger.addHandler(file_handler)
123 if not self.main_config.globalConfig.to_dict()["nologging"]:
124 str_handler = logging.StreamHandler()
125 str_handler.setFormatter(log_formatter_simple)
126 self.logger.addHandler(str_handler)
127
128 if self.main_config.globalConfig.to_dict()["loglevel"]:
129 self.logger.setLevel(self.main_config.globalConfig.loglevel)
130
131 # logging other modules
132 for logger in ("message", "database", "storage", "tsdb"):
133 logger_config = self.main_config.to_dict()[logger]
134 logger_module = logging.getLogger(logger_config["logger_name"])
135 if logger_config["logfile"]:
136 file_handler = logging.handlers.RotatingFileHandler(
137 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
138 )
139 file_handler.setFormatter(log_formatter_simple)
140 logger_module.addHandler(file_handler)
141 if logger_config["loglevel"]:
142 logger_module.setLevel(logger_config["loglevel"])
143 self.logger.critical(
144 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
145 )
146
147 # check version of N2VC
148 # TODO enhance with int conversion or from distutils.version import LooseVersion
149 # or with list(map(int, version.split(".")))
150 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
151 raise LcmException(
152 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
153 n2vc_version, min_n2vc_version
154 )
155 )
156 # check version of common
157 if versiontuple(common_version) < versiontuple(min_common_version):
158 raise LcmException(
159 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
160 common_version, min_common_version
161 )
162 )
163
164 try:
165 self.db = Database(self.main_config.to_dict()).instance.db
166
167 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
168 self.fs.sync()
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = self.main_config.message.to_dict()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException(
187 "Invalid configuration param '{}' at '[message]':'driver'".format(
188 self.main_config.message.driver
189 )
190 )
191 except (DbException, FsException, MsgException) as e:
192 self.logger.critical(str(e), exc_info=True)
193 raise LcmException(str(e))
194
195 # contains created tasks/futures to be able to cancel
196 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
197
198 async def check_RO_version(self):
199 tries = 14
200 last_error = None
201 while True:
202 ro_uri = self.main_config.RO.uri
203 if not ro_uri:
204 ro_uri = ""
205 try:
206 # try new RO, if fail old RO
207 try:
208 self.main_config.RO.uri = ro_uri + "ro"
209 ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
210 ro_version = await ro_server.get_version()
211 self.main_config.RO.ng = True
212 except Exception:
213 self.main_config.RO.uri = ro_uri + "openmano"
214 ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
215 ro_version = await ro_server.get_version()
216 self.main_config.RO.ng = False
217 if versiontuple(ro_version) < versiontuple(min_RO_version):
218 raise LcmException(
219 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
220 ro_version, min_RO_version
221 )
222 )
223 self.logger.info(
224 "Connected to RO version {} new-generation version {}".format(
225 ro_version, self.main_config.RO.ng
226 )
227 )
228 return
229 except (ROClientException, NgRoException) as e:
230 self.main_config.RO.uri = ro_uri
231 tries -= 1
232 traceback.print_tb(e.__traceback__)
233 error_text = "Error while connecting to RO on {}: {}".format(
234 self.main_config.RO.uri, e
235 )
236 if tries <= 0:
237 self.logger.critical(error_text)
238 raise LcmException(error_text)
239 if last_error != error_text:
240 last_error = error_text
241 self.logger.error(
242 error_text + ". Waiting until {} seconds".format(5 * tries)
243 )
244 await asyncio.sleep(5)
245
246 async def test(self, param=None):
247 self.logger.debug("Starting/Ending test task: {}".format(param))
248
249 async def kafka_ping(self):
250 self.logger.debug("Task kafka_ping Enter")
251 consecutive_errors = 0
252 first_start = True
253 kafka_has_received = False
254 self.pings_not_received = 1
255 while True:
256 try:
257 await self.msg_admin.aiowrite(
258 "admin",
259 "ping",
260 {
261 "from": "lcm",
262 "to": "lcm",
263 "worker_id": self.worker_id,
264 "version": lcm_version,
265 },
266 self.loop,
267 )
268 # time between pings are low when it is not received and at starting
269 wait_time = (
270 self.ping_interval_boot
271 if not kafka_has_received
272 else self.ping_interval_pace
273 )
274 if not self.pings_not_received:
275 kafka_has_received = True
276 self.pings_not_received += 1
277 await asyncio.sleep(wait_time, loop=self.loop)
278 if self.pings_not_received > 10:
279 raise LcmException("It is not receiving pings from Kafka bus")
280 consecutive_errors = 0
281 first_start = False
282 except LcmException:
283 raise
284 except Exception as e:
285 # if not first_start is the first time after starting. So leave more time and wait
286 # to allow kafka starts
287 if consecutive_errors == 8 if not first_start else 30:
288 self.logger.error(
289 "Task kafka_read task exit error too many errors. Exception: {}".format(
290 e
291 )
292 )
293 raise
294 consecutive_errors += 1
295 self.logger.error(
296 "Task kafka_read retrying after Exception {}".format(e)
297 )
298 wait_time = 2 if not first_start else 5
299 await asyncio.sleep(wait_time, loop=self.loop)
300
301 def kafka_read_callback(self, topic, command, params):
302 order_id = 1
303
304 if topic != "admin" and command != "ping":
305 self.logger.debug(
306 "Task kafka_read receives {} {}: {}".format(topic, command, params)
307 )
308 self.consecutive_errors = 0
309 self.first_start = False
310 order_id += 1
311 if command == "exit":
312 raise LcmExceptionExit
313 elif command.startswith("#"):
314 return
315 elif command == "echo":
316 # just for test
317 print(params)
318 sys.stdout.flush()
319 return
320 elif command == "test":
321 asyncio.Task(self.test(params), loop=self.loop)
322 return
323
324 if topic == "admin":
325 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
326 if params.get("worker_id") != self.worker_id:
327 return
328 self.pings_not_received = 0
329 try:
330 with open(self.health_check_file, "w") as f:
331 f.write(str(time()))
332 except Exception as e:
333 self.logger.error(
334 "Cannot write into '{}' for healthcheck: {}".format(
335 self.health_check_file, e
336 )
337 )
338 return
339 elif topic == "pla":
340 if command == "placement":
341 self.ns.update_nsrs_with_pla_result(params)
342 return
343 elif topic == "k8scluster":
344 if command == "create" or command == "created":
345 k8scluster_id = params.get("_id")
346 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
347 self.lcm_tasks.register(
348 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
349 )
350 return
351 elif command == "delete" or command == "deleted":
352 k8scluster_id = params.get("_id")
353 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
354 self.lcm_tasks.register(
355 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
356 )
357 return
358 elif topic == "vca":
359 if command == "create" or command == "created":
360 vca_id = params.get("_id")
361 task = asyncio.ensure_future(self.vca.create(params, order_id))
362 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
363 return
364 elif command == "edit" or command == "edited":
365 vca_id = params.get("_id")
366 task = asyncio.ensure_future(self.vca.edit(params, order_id))
367 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
368 return
369 elif command == "delete" or command == "deleted":
370 vca_id = params.get("_id")
371 task = asyncio.ensure_future(self.vca.delete(params, order_id))
372 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
373 return
374 elif topic == "k8srepo":
375 if command == "create" or command == "created":
376 k8srepo_id = params.get("_id")
377 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
378 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
379 self.lcm_tasks.register(
380 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
381 )
382 return
383 elif command == "delete" or command == "deleted":
384 k8srepo_id = params.get("_id")
385 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
386 self.lcm_tasks.register(
387 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
388 )
389 return
390 elif topic == "ns":
391 if command == "instantiate":
392 # self.logger.debug("Deploying NS {}".format(nsr_id))
393 nslcmop = params
394 nslcmop_id = nslcmop["_id"]
395 nsr_id = nslcmop["nsInstanceId"]
396 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
397 self.lcm_tasks.register(
398 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
399 )
400 return
401 elif command == "terminate":
402 # self.logger.debug("Deleting NS {}".format(nsr_id))
403 nslcmop = params
404 nslcmop_id = nslcmop["_id"]
405 nsr_id = nslcmop["nsInstanceId"]
406 self.lcm_tasks.cancel(topic, nsr_id)
407 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
408 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
409 return
410 elif command == "vca_status_refresh":
411 nslcmop = params
412 nslcmop_id = nslcmop["_id"]
413 nsr_id = nslcmop["nsInstanceId"]
414 task = asyncio.ensure_future(
415 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
416 )
417 self.lcm_tasks.register(
418 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
419 )
420 return
421 elif command == "action":
422 # self.logger.debug("Update NS {}".format(nsr_id))
423 nslcmop = params
424 nslcmop_id = nslcmop["_id"]
425 nsr_id = nslcmop["nsInstanceId"]
426 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
427 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
428 return
429 elif command == "update":
430 # self.logger.debug("Update NS {}".format(nsr_id))
431 nslcmop = params
432 nslcmop_id = nslcmop["_id"]
433 nsr_id = nslcmop["nsInstanceId"]
434 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
435 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
436 return
437 elif command == "scale":
438 # self.logger.debug("Update NS {}".format(nsr_id))
439 nslcmop = params
440 nslcmop_id = nslcmop["_id"]
441 nsr_id = nslcmop["nsInstanceId"]
442 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
443 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
444 return
445 elif command == "heal":
446 # self.logger.debug("Healing NS {}".format(nsr_id))
447 nslcmop = params
448 nslcmop_id = nslcmop["_id"]
449 nsr_id = nslcmop["nsInstanceId"]
450 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
451 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
452 return
453 elif command == "migrate":
454 nslcmop = params
455 nslcmop_id = nslcmop["_id"]
456 nsr_id = nslcmop["nsInstanceId"]
457 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
458 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
459 return
460 elif command == "verticalscale":
461 nslcmop = params
462 nslcmop_id = nslcmop["_id"]
463 nsr_id = nslcmop["nsInstanceId"]
464 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
465 self.logger.debug(
466 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
467 )
468 self.lcm_tasks.register(
469 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
470 )
471 self.logger.debug(
472 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
473 )
474 return
475 elif command == "show":
476 nsr_id = params
477 try:
478 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
479 print(
480 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
481 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
482 "".format(
483 nsr_id,
484 db_nsr["operational-status"],
485 db_nsr["config-status"],
486 db_nsr["detailed-status"],
487 db_nsr["_admin"]["deployed"],
488 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
489 )
490 )
491 except Exception as e:
492 print("nsr {} not found: {}".format(nsr_id, e))
493 sys.stdout.flush()
494 return
495 elif command == "deleted":
496 return # TODO cleaning of task just in case should be done
497 elif command in (
498 "vnf_terminated",
499 "policy_updated",
500 "terminated",
501 "instantiated",
502 "scaled",
503 "healed",
504 "actioned",
505 "updated",
506 "migrated",
507 "verticalscaled",
508 ): # "scaled-cooldown-time"
509 return
510
511 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
512 if command == "instantiate":
513 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
514 nsilcmop = params
515 nsilcmop_id = nsilcmop["_id"] # slice operation id
516 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
517 task = asyncio.ensure_future(
518 self.netslice.instantiate(nsir_id, nsilcmop_id)
519 )
520 self.lcm_tasks.register(
521 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
522 )
523 return
524 elif command == "terminate":
525 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
526 nsilcmop = params
527 nsilcmop_id = nsilcmop["_id"] # slice operation id
528 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
529 self.lcm_tasks.cancel(topic, nsir_id)
530 task = asyncio.ensure_future(
531 self.netslice.terminate(nsir_id, nsilcmop_id)
532 )
533 self.lcm_tasks.register(
534 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
535 )
536 return
537 elif command == "show":
538 nsir_id = params
539 try:
540 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
541 print(
542 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
543 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
544 "".format(
545 nsir_id,
546 db_nsir["operational-status"],
547 db_nsir["config-status"],
548 db_nsir["detailed-status"],
549 db_nsir["_admin"]["deployed"],
550 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
551 )
552 )
553 except Exception as e:
554 print("nsir {} not found: {}".format(nsir_id, e))
555 sys.stdout.flush()
556 return
557 elif command == "deleted":
558 return # TODO cleaning of task just in case should be done
559 elif command in (
560 "terminated",
561 "instantiated",
562 "scaled",
563 "healed",
564 "actioned",
565 ): # "scaled-cooldown-time"
566 return
567 elif topic == "vim_account":
568 vim_id = params["_id"]
569 if command in ("create", "created"):
570 if not self.main_config.RO.ng:
571 task = asyncio.ensure_future(self.vim.create(params, order_id))
572 self.lcm_tasks.register(
573 "vim_account", vim_id, order_id, "vim_create", task
574 )
575 return
576 elif command == "delete" or command == "deleted":
577 self.lcm_tasks.cancel(topic, vim_id)
578 task = asyncio.ensure_future(self.vim.delete(params, order_id))
579 self.lcm_tasks.register(
580 "vim_account", vim_id, order_id, "vim_delete", task
581 )
582 return
583 elif command == "show":
584 print("not implemented show with vim_account")
585 sys.stdout.flush()
586 return
587 elif command in ("edit", "edited"):
588 if not self.main_config.RO.ng:
589 task = asyncio.ensure_future(self.vim.edit(params, order_id))
590 self.lcm_tasks.register(
591 "vim_account", vim_id, order_id, "vim_edit", task
592 )
593 return
594 elif command == "deleted":
595 return # TODO cleaning of task just in case should be done
596 elif topic == "wim_account":
597 wim_id = params["_id"]
598 if command in ("create", "created"):
599 if not self.main_config.RO.ng:
600 task = asyncio.ensure_future(self.wim.create(params, order_id))
601 self.lcm_tasks.register(
602 "wim_account", wim_id, order_id, "wim_create", task
603 )
604 return
605 elif command == "delete" or command == "deleted":
606 self.lcm_tasks.cancel(topic, wim_id)
607 task = asyncio.ensure_future(self.wim.delete(params, order_id))
608 self.lcm_tasks.register(
609 "wim_account", wim_id, order_id, "wim_delete", task
610 )
611 return
612 elif command == "show":
613 print("not implemented show with wim_account")
614 sys.stdout.flush()
615 return
616 elif command in ("edit", "edited"):
617 task = asyncio.ensure_future(self.wim.edit(params, order_id))
618 self.lcm_tasks.register(
619 "wim_account", wim_id, order_id, "wim_edit", task
620 )
621 return
622 elif command == "deleted":
623 return # TODO cleaning of task just in case should be done
624 elif topic == "sdn":
625 _sdn_id = params["_id"]
626 if command in ("create", "created"):
627 if not self.main_config.RO.ng:
628 task = asyncio.ensure_future(self.sdn.create(params, order_id))
629 self.lcm_tasks.register(
630 "sdn", _sdn_id, order_id, "sdn_create", task
631 )
632 return
633 elif command == "delete" or command == "deleted":
634 self.lcm_tasks.cancel(topic, _sdn_id)
635 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
636 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
637 return
638 elif command in ("edit", "edited"):
639 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
640 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
641 return
642 elif command == "deleted":
643 return # TODO cleaning of task just in case should be done
644 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
645
646 async def kafka_read(self):
647 self.logger.debug(
648 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
649 )
650 # future = asyncio.Future()
651 self.consecutive_errors = 0
652 self.first_start = True
653 while self.consecutive_errors < 10:
654 try:
655 topics = (
656 "ns",
657 "vim_account",
658 "wim_account",
659 "sdn",
660 "nsi",
661 "k8scluster",
662 "vca",
663 "k8srepo",
664 "pla",
665 )
666 topics_admin = ("admin",)
667 await asyncio.gather(
668 self.msg.aioread(
669 topics, self.loop, self.kafka_read_callback, from_beginning=True
670 ),
671 self.msg_admin.aioread(
672 topics_admin,
673 self.loop,
674 self.kafka_read_callback,
675 group_id=False,
676 ),
677 )
678
679 except LcmExceptionExit:
680 self.logger.debug("Bye!")
681 break
682 except Exception as e:
683 # if not first_start is the first time after starting. So leave more time and wait
684 # to allow kafka starts
685 if self.consecutive_errors == 8 if not self.first_start else 30:
686 self.logger.error(
687 "Task kafka_read task exit error too many errors. Exception: {}".format(
688 e
689 )
690 )
691 raise
692 self.consecutive_errors += 1
693 self.logger.error(
694 "Task kafka_read retrying after Exception {}".format(e)
695 )
696 wait_time = 2 if not self.first_start else 5
697 await asyncio.sleep(wait_time, loop=self.loop)
698
699 # self.logger.debug("Task kafka_read terminating")
700 self.logger.debug("Task kafka_read exit")
701
702 def start(self):
703
704 # check RO version
705 self.loop.run_until_complete(self.check_RO_version())
706
707 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
708 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
709 self.netslice = netslice.NetsliceLcm(
710 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
711 )
712 self.vim = vim_sdn.VimLcm(
713 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
714 )
715 self.wim = vim_sdn.WimLcm(
716 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
717 )
718 self.sdn = vim_sdn.SdnLcm(
719 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
720 )
721 self.k8scluster = vim_sdn.K8sClusterLcm(
722 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
723 )
724 self.vca = vim_sdn.VcaLcm(
725 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
726 )
727 self.k8srepo = vim_sdn.K8sRepoLcm(
728 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
729 )
730
731 self.loop.run_until_complete(
732 asyncio.gather(self.kafka_read(), self.kafka_ping())
733 )
734
735 # TODO
736 # self.logger.debug("Terminating cancelling creation tasks")
737 # self.lcm_tasks.cancel("ALL", "create")
738 # timeout = 200
739 # while self.is_pending_tasks():
740 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
741 # await asyncio.sleep(2, loop=self.loop)
742 # timeout -= 2
743 # if not timeout:
744 # self.lcm_tasks.cancel("ALL", "ALL")
745 self.loop.close()
746 self.loop = None
747 if self.db:
748 self.db.db_disconnect()
749 if self.msg:
750 self.msg.disconnect()
751 if self.msg_admin:
752 self.msg_admin.disconnect()
753 if self.fs:
754 self.fs.fs_disconnect()
755
756 def read_config_file(self, config_file):
757 try:
758 with open(config_file) as f:
759 return yaml.safe_load(f)
760 except Exception as e:
761 self.logger.critical("At config file '{}': {}".format(config_file, e))
762 exit(1)
763
764 @staticmethod
765 def get_process_id():
766 """
767 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
768 will provide a random one
769 :return: Obtained ID
770 """
771 # Try getting docker id. If fails, get pid
772 try:
773 with open("/proc/self/cgroup", "r") as f:
774 text_id_ = f.readline()
775 _, _, text_id = text_id_.rpartition("/")
776 text_id = text_id.replace("\n", "")[:12]
777 if text_id:
778 return text_id
779 except Exception:
780 pass
781 # Return a random id
782 return "".join(random_choice("0123456789abcdef") for _ in range(12))
783
784
785 def usage():
786 print(
787 """Usage: {} [options]
788 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
789 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
790 -h|--help: shows this help
791 """.format(
792 sys.argv[0]
793 )
794 )
795 # --log-socket-host HOST: send logs to this host")
796 # --log-socket-port PORT: send logs using this port (default: 9022)")
797
798
799 if __name__ == "__main__":
800
801 try:
802 # print("SYS.PATH='{}'".format(sys.path))
803 # load parameters and configuration
804 # -h
805 # -c value
806 # --config value
807 # --help
808 # --health-check
809 opts, args = getopt.getopt(
810 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
811 )
812 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
813 config_file = None
814 for o, a in opts:
815 if o in ("-h", "--help"):
816 usage()
817 sys.exit()
818 elif o in ("-c", "--config"):
819 config_file = a
820 elif o == "--health-check":
821 from osm_lcm.lcm_hc import health_check
822
823 health_check(config_file, Lcm.ping_interval_pace)
824 # elif o == "--log-socket-port":
825 # log_socket_port = a
826 # elif o == "--log-socket-host":
827 # log_socket_host = a
828 # elif o == "--log-file":
829 # log_file = a
830 else:
831 assert False, "Unhandled option"
832
833 if config_file:
834 if not path.isfile(config_file):
835 print(
836 "configuration file '{}' does not exist".format(config_file),
837 file=sys.stderr,
838 )
839 exit(1)
840 else:
841 for config_file in (
842 __file__[: __file__.rfind(".")] + ".cfg",
843 "./lcm.cfg",
844 "/etc/osm/lcm.cfg",
845 ):
846 if path.isfile(config_file):
847 break
848 else:
849 print(
850 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
851 file=sys.stderr,
852 )
853 exit(1)
854 lcm = Lcm(config_file)
855 lcm.start()
856 except (LcmException, getopt.GetoptError) as e:
857 print(str(e), file=sys.stderr)
858 # usage()
859 exit(1)