Bug 2175: K8s-Cluster Update is not implemented
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.data_utils.lcm_config import LcmCfg
48 from osm_lcm.lcm_hc import get_health_check_file
49 from os import path
50 from random import choice as random_choice
51 from n2vc import version as n2vc_version
52 import traceback
53
54 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 pdb.set_trace()
56
57
58 __author__ = "Alfonso Tierno"
59 min_RO_version = "6.0.2"
60 min_n2vc_version = "0.0.2"
61
62 min_common_version = "0.1.19"
63
64
65 class Lcm:
66
67 ping_interval_pace = (
68 120 # how many time ping is send once is confirmed all is running
69 )
70 ping_interval_boot = 5 # how many time ping is sent when booting
71
72 main_config = LcmCfg()
73
74 def __init__(self, config_file, loop=None):
75 """
76 Init, Connect to database, filesystem storage, and messaging
77 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
78 :return: None
79 """
80 self.db = None
81 self.msg = None
82 self.msg_admin = None
83 self.fs = None
84 self.pings_not_received = 1
85 self.consecutive_errors = 0
86 self.first_start = False
87
88 # logging
89 self.logger = logging.getLogger("lcm")
90 # get id
91 self.worker_id = self.get_process_id()
92 # load configuration
93 config = self.read_config_file(config_file)
94 self.main_config.set_from_dict(config)
95 self.main_config.transform()
96 self.main_config.load_from_env()
97 self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
98 # TODO: check if lcm_hc.py is necessary
99 self.health_check_file = get_health_check_file(self.main_config.to_dict())
100 self.loop = loop or asyncio.get_event_loop()
101 self.ns = (
102 self.netslice
103 ) = (
104 self.vim
105 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
106
107 # logging
108 log_format_simple = (
109 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
110 )
111 log_formatter_simple = logging.Formatter(
112 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
113 )
114 if self.main_config.globalConfig.logfile:
115 file_handler = logging.handlers.RotatingFileHandler(
116 self.main_config.globalConfig.logfile,
117 maxBytes=100e6,
118 backupCount=9,
119 delay=0,
120 )
121 file_handler.setFormatter(log_formatter_simple)
122 self.logger.addHandler(file_handler)
123 if not self.main_config.globalConfig.to_dict()["nologging"]:
124 str_handler = logging.StreamHandler()
125 str_handler.setFormatter(log_formatter_simple)
126 self.logger.addHandler(str_handler)
127
128 if self.main_config.globalConfig.to_dict()["loglevel"]:
129 self.logger.setLevel(self.main_config.globalConfig.loglevel)
130
131 # logging other modules
132 for logger in ("message", "database", "storage", "tsdb"):
133 logger_config = self.main_config.to_dict()[logger]
134 logger_module = logging.getLogger(logger_config["logger_name"])
135 if logger_config["logfile"]:
136 file_handler = logging.handlers.RotatingFileHandler(
137 logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
138 )
139 file_handler.setFormatter(log_formatter_simple)
140 logger_module.addHandler(file_handler)
141 if logger_config["loglevel"]:
142 logger_module.setLevel(logger_config["loglevel"])
143 self.logger.critical(
144 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
145 )
146
147 # check version of N2VC
148 # TODO enhance with int conversion or from distutils.version import LooseVersion
149 # or with list(map(int, version.split(".")))
150 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
151 raise LcmException(
152 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
153 n2vc_version, min_n2vc_version
154 )
155 )
156 # check version of common
157 if versiontuple(common_version) < versiontuple(min_common_version):
158 raise LcmException(
159 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
160 common_version, min_common_version
161 )
162 )
163
164 try:
165 self.db = Database(self.main_config.to_dict()).instance.db
166
167 self.fs = Filesystem(self.main_config.to_dict()).instance.fs
168 self.fs.sync()
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = self.main_config.message.to_dict()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException(
187 "Invalid configuration param '{}' at '[message]':'driver'".format(
188 self.main_config.message.driver
189 )
190 )
191 except (DbException, FsException, MsgException) as e:
192 self.logger.critical(str(e), exc_info=True)
193 raise LcmException(str(e))
194
195 # contains created tasks/futures to be able to cancel
196 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
197
198 async def check_RO_version(self):
199 tries = 14
200 last_error = None
201 while True:
202 ro_uri = self.main_config.RO.uri
203 if not ro_uri:
204 ro_uri = ""
205 try:
206 # try new RO, if fail old RO
207 try:
208 self.main_config.RO.uri = ro_uri + "ro"
209 ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
210 ro_version = await ro_server.get_version()
211 self.main_config.RO.ng = True
212 except Exception:
213 self.main_config.RO.uri = ro_uri + "openmano"
214 ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
215 ro_version = await ro_server.get_version()
216 self.main_config.RO.ng = False
217 if versiontuple(ro_version) < versiontuple(min_RO_version):
218 raise LcmException(
219 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
220 ro_version, min_RO_version
221 )
222 )
223 self.logger.info(
224 "Connected to RO version {} new-generation version {}".format(
225 ro_version, self.main_config.RO.ng
226 )
227 )
228 return
229 except (ROClientException, NgRoException) as e:
230 self.main_config.RO.uri = ro_uri
231 tries -= 1
232 traceback.print_tb(e.__traceback__)
233 error_text = "Error while connecting to RO on {}: {}".format(
234 self.main_config.RO.uri, e
235 )
236 if tries <= 0:
237 self.logger.critical(error_text)
238 raise LcmException(error_text)
239 if last_error != error_text:
240 last_error = error_text
241 self.logger.error(
242 error_text + ". Waiting until {} seconds".format(5 * tries)
243 )
244 await asyncio.sleep(5)
245
246 async def test(self, param=None):
247 self.logger.debug("Starting/Ending test task: {}".format(param))
248
249 async def kafka_ping(self):
250 self.logger.debug("Task kafka_ping Enter")
251 consecutive_errors = 0
252 first_start = True
253 kafka_has_received = False
254 self.pings_not_received = 1
255 while True:
256 try:
257 await self.msg_admin.aiowrite(
258 "admin",
259 "ping",
260 {
261 "from": "lcm",
262 "to": "lcm",
263 "worker_id": self.worker_id,
264 "version": lcm_version,
265 },
266 self.loop,
267 )
268 # time between pings are low when it is not received and at starting
269 wait_time = (
270 self.ping_interval_boot
271 if not kafka_has_received
272 else self.ping_interval_pace
273 )
274 if not self.pings_not_received:
275 kafka_has_received = True
276 self.pings_not_received += 1
277 await asyncio.sleep(wait_time, loop=self.loop)
278 if self.pings_not_received > 10:
279 raise LcmException("It is not receiving pings from Kafka bus")
280 consecutive_errors = 0
281 first_start = False
282 except LcmException:
283 raise
284 except Exception as e:
285 # if not first_start is the first time after starting. So leave more time and wait
286 # to allow kafka starts
287 if consecutive_errors == 8 if not first_start else 30:
288 self.logger.error(
289 "Task kafka_read task exit error too many errors. Exception: {}".format(
290 e
291 )
292 )
293 raise
294 consecutive_errors += 1
295 self.logger.error(
296 "Task kafka_read retrying after Exception {}".format(e)
297 )
298 wait_time = 2 if not first_start else 5
299 await asyncio.sleep(wait_time, loop=self.loop)
300
301 def kafka_read_callback(self, topic, command, params):
302 order_id = 1
303
304 if topic != "admin" and command != "ping":
305 self.logger.debug(
306 "Task kafka_read receives {} {}: {}".format(topic, command, params)
307 )
308 self.consecutive_errors = 0
309 self.first_start = False
310 order_id += 1
311 if command == "exit":
312 raise LcmExceptionExit
313 elif command.startswith("#"):
314 return
315 elif command == "echo":
316 # just for test
317 print(params)
318 sys.stdout.flush()
319 return
320 elif command == "test":
321 asyncio.Task(self.test(params), loop=self.loop)
322 return
323
324 if topic == "admin":
325 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
326 if params.get("worker_id") != self.worker_id:
327 return
328 self.pings_not_received = 0
329 try:
330 with open(self.health_check_file, "w") as f:
331 f.write(str(time()))
332 except Exception as e:
333 self.logger.error(
334 "Cannot write into '{}' for healthcheck: {}".format(
335 self.health_check_file, e
336 )
337 )
338 return
339 elif topic == "pla":
340 if command == "placement":
341 self.ns.update_nsrs_with_pla_result(params)
342 return
343 elif topic == "k8scluster":
344 if command == "create" or command == "created":
345 k8scluster_id = params.get("_id")
346 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
347 self.lcm_tasks.register(
348 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
349 )
350 return
351 elif command == "edit" or command == "edited":
352 k8scluster_id = params.get("_id")
353 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
354 self.lcm_tasks.register(
355 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
356 )
357 return
358 elif command == "delete" or command == "deleted":
359 k8scluster_id = params.get("_id")
360 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
361 self.lcm_tasks.register(
362 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
363 )
364 return
365 elif topic == "vca":
366 if command == "create" or command == "created":
367 vca_id = params.get("_id")
368 task = asyncio.ensure_future(self.vca.create(params, order_id))
369 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
370 return
371 elif command == "edit" or command == "edited":
372 vca_id = params.get("_id")
373 task = asyncio.ensure_future(self.vca.edit(params, order_id))
374 self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
375 return
376 elif command == "delete" or command == "deleted":
377 vca_id = params.get("_id")
378 task = asyncio.ensure_future(self.vca.delete(params, order_id))
379 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
380 return
381 elif topic == "k8srepo":
382 if command == "create" or command == "created":
383 k8srepo_id = params.get("_id")
384 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
385 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
386 self.lcm_tasks.register(
387 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
388 )
389 return
390 elif command == "delete" or command == "deleted":
391 k8srepo_id = params.get("_id")
392 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
393 self.lcm_tasks.register(
394 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
395 )
396 return
397 elif topic == "ns":
398 if command == "instantiate":
399 # self.logger.debug("Deploying NS {}".format(nsr_id))
400 nslcmop = params
401 nslcmop_id = nslcmop["_id"]
402 nsr_id = nslcmop["nsInstanceId"]
403 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
404 self.lcm_tasks.register(
405 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
406 )
407 return
408 elif command == "terminate":
409 # self.logger.debug("Deleting NS {}".format(nsr_id))
410 nslcmop = params
411 nslcmop_id = nslcmop["_id"]
412 nsr_id = nslcmop["nsInstanceId"]
413 self.lcm_tasks.cancel(topic, nsr_id)
414 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
415 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
416 return
417 elif command == "vca_status_refresh":
418 nslcmop = params
419 nslcmop_id = nslcmop["_id"]
420 nsr_id = nslcmop["nsInstanceId"]
421 task = asyncio.ensure_future(
422 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
423 )
424 self.lcm_tasks.register(
425 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
426 )
427 return
428 elif command == "action":
429 # self.logger.debug("Update NS {}".format(nsr_id))
430 nslcmop = params
431 nslcmop_id = nslcmop["_id"]
432 nsr_id = nslcmop["nsInstanceId"]
433 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
434 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
435 return
436 elif command == "update":
437 # self.logger.debug("Update NS {}".format(nsr_id))
438 nslcmop = params
439 nslcmop_id = nslcmop["_id"]
440 nsr_id = nslcmop["nsInstanceId"]
441 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
442 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
443 return
444 elif command == "scale":
445 # self.logger.debug("Update NS {}".format(nsr_id))
446 nslcmop = params
447 nslcmop_id = nslcmop["_id"]
448 nsr_id = nslcmop["nsInstanceId"]
449 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
450 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
451 return
452 elif command == "heal":
453 # self.logger.debug("Healing NS {}".format(nsr_id))
454 nslcmop = params
455 nslcmop_id = nslcmop["_id"]
456 nsr_id = nslcmop["nsInstanceId"]
457 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
458 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
459 return
460 elif command == "migrate":
461 nslcmop = params
462 nslcmop_id = nslcmop["_id"]
463 nsr_id = nslcmop["nsInstanceId"]
464 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
465 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
466 return
467 elif command == "verticalscale":
468 nslcmop = params
469 nslcmop_id = nslcmop["_id"]
470 nsr_id = nslcmop["nsInstanceId"]
471 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
472 self.logger.debug(
473 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
474 )
475 self.lcm_tasks.register(
476 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
477 )
478 self.logger.debug(
479 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
480 )
481 return
482 elif command == "show":
483 nsr_id = params
484 try:
485 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
486 print(
487 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
488 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
489 "".format(
490 nsr_id,
491 db_nsr["operational-status"],
492 db_nsr["config-status"],
493 db_nsr["detailed-status"],
494 db_nsr["_admin"]["deployed"],
495 self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
496 )
497 )
498 except Exception as e:
499 print("nsr {} not found: {}".format(nsr_id, e))
500 sys.stdout.flush()
501 return
502 elif command == "deleted":
503 return # TODO cleaning of task just in case should be done
504 elif command in (
505 "vnf_terminated",
506 "policy_updated",
507 "terminated",
508 "instantiated",
509 "scaled",
510 "healed",
511 "actioned",
512 "updated",
513 "migrated",
514 "verticalscaled",
515 ): # "scaled-cooldown-time"
516 return
517
518 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
519 if command == "instantiate":
520 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
521 nsilcmop = params
522 nsilcmop_id = nsilcmop["_id"] # slice operation id
523 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
524 task = asyncio.ensure_future(
525 self.netslice.instantiate(nsir_id, nsilcmop_id)
526 )
527 self.lcm_tasks.register(
528 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
529 )
530 return
531 elif command == "terminate":
532 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
533 nsilcmop = params
534 nsilcmop_id = nsilcmop["_id"] # slice operation id
535 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
536 self.lcm_tasks.cancel(topic, nsir_id)
537 task = asyncio.ensure_future(
538 self.netslice.terminate(nsir_id, nsilcmop_id)
539 )
540 self.lcm_tasks.register(
541 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
542 )
543 return
544 elif command == "show":
545 nsir_id = params
546 try:
547 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
548 print(
549 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
550 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
551 "".format(
552 nsir_id,
553 db_nsir["operational-status"],
554 db_nsir["config-status"],
555 db_nsir["detailed-status"],
556 db_nsir["_admin"]["deployed"],
557 self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
558 )
559 )
560 except Exception as e:
561 print("nsir {} not found: {}".format(nsir_id, e))
562 sys.stdout.flush()
563 return
564 elif command == "deleted":
565 return # TODO cleaning of task just in case should be done
566 elif command in (
567 "terminated",
568 "instantiated",
569 "scaled",
570 "healed",
571 "actioned",
572 ): # "scaled-cooldown-time"
573 return
574 elif topic == "vim_account":
575 vim_id = params["_id"]
576 if command in ("create", "created"):
577 if not self.main_config.RO.ng:
578 task = asyncio.ensure_future(self.vim.create(params, order_id))
579 self.lcm_tasks.register(
580 "vim_account", vim_id, order_id, "vim_create", task
581 )
582 return
583 elif command == "delete" or command == "deleted":
584 self.lcm_tasks.cancel(topic, vim_id)
585 task = asyncio.ensure_future(self.vim.delete(params, order_id))
586 self.lcm_tasks.register(
587 "vim_account", vim_id, order_id, "vim_delete", task
588 )
589 return
590 elif command == "show":
591 print("not implemented show with vim_account")
592 sys.stdout.flush()
593 return
594 elif command in ("edit", "edited"):
595 if not self.main_config.RO.ng:
596 task = asyncio.ensure_future(self.vim.edit(params, order_id))
597 self.lcm_tasks.register(
598 "vim_account", vim_id, order_id, "vim_edit", task
599 )
600 return
601 elif command == "deleted":
602 return # TODO cleaning of task just in case should be done
603 elif topic == "wim_account":
604 wim_id = params["_id"]
605 if command in ("create", "created"):
606 if not self.main_config.RO.ng:
607 task = asyncio.ensure_future(self.wim.create(params, order_id))
608 self.lcm_tasks.register(
609 "wim_account", wim_id, order_id, "wim_create", task
610 )
611 return
612 elif command == "delete" or command == "deleted":
613 self.lcm_tasks.cancel(topic, wim_id)
614 task = asyncio.ensure_future(self.wim.delete(params, order_id))
615 self.lcm_tasks.register(
616 "wim_account", wim_id, order_id, "wim_delete", task
617 )
618 return
619 elif command == "show":
620 print("not implemented show with wim_account")
621 sys.stdout.flush()
622 return
623 elif command in ("edit", "edited"):
624 task = asyncio.ensure_future(self.wim.edit(params, order_id))
625 self.lcm_tasks.register(
626 "wim_account", wim_id, order_id, "wim_edit", task
627 )
628 return
629 elif command == "deleted":
630 return # TODO cleaning of task just in case should be done
631 elif topic == "sdn":
632 _sdn_id = params["_id"]
633 if command in ("create", "created"):
634 if not self.main_config.RO.ng:
635 task = asyncio.ensure_future(self.sdn.create(params, order_id))
636 self.lcm_tasks.register(
637 "sdn", _sdn_id, order_id, "sdn_create", task
638 )
639 return
640 elif command == "delete" or command == "deleted":
641 self.lcm_tasks.cancel(topic, _sdn_id)
642 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
643 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
644 return
645 elif command in ("edit", "edited"):
646 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
647 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
648 return
649 elif command == "deleted":
650 return # TODO cleaning of task just in case should be done
651 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
652
653 async def kafka_read(self):
654 self.logger.debug(
655 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
656 )
657 # future = asyncio.Future()
658 self.consecutive_errors = 0
659 self.first_start = True
660 while self.consecutive_errors < 10:
661 try:
662 topics = (
663 "ns",
664 "vim_account",
665 "wim_account",
666 "sdn",
667 "nsi",
668 "k8scluster",
669 "vca",
670 "k8srepo",
671 "pla",
672 )
673 topics_admin = ("admin",)
674 await asyncio.gather(
675 self.msg.aioread(
676 topics, self.loop, self.kafka_read_callback, from_beginning=True
677 ),
678 self.msg_admin.aioread(
679 topics_admin,
680 self.loop,
681 self.kafka_read_callback,
682 group_id=False,
683 ),
684 )
685
686 except LcmExceptionExit:
687 self.logger.debug("Bye!")
688 break
689 except Exception as e:
690 # if not first_start is the first time after starting. So leave more time and wait
691 # to allow kafka starts
692 if self.consecutive_errors == 8 if not self.first_start else 30:
693 self.logger.error(
694 "Task kafka_read task exit error too many errors. Exception: {}".format(
695 e
696 )
697 )
698 raise
699 self.consecutive_errors += 1
700 self.logger.error(
701 "Task kafka_read retrying after Exception {}".format(e)
702 )
703 wait_time = 2 if not self.first_start else 5
704 await asyncio.sleep(wait_time, loop=self.loop)
705
706 # self.logger.debug("Task kafka_read terminating")
707 self.logger.debug("Task kafka_read exit")
708
709 def start(self):
710
711 # check RO version
712 self.loop.run_until_complete(self.check_RO_version())
713
714 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
715 # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
716 self.netslice = netslice.NetsliceLcm(
717 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
718 )
719 self.vim = vim_sdn.VimLcm(
720 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
721 )
722 self.wim = vim_sdn.WimLcm(
723 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
724 )
725 self.sdn = vim_sdn.SdnLcm(
726 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
727 )
728 self.k8scluster = vim_sdn.K8sClusterLcm(
729 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
730 )
731 self.vca = vim_sdn.VcaLcm(
732 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
733 )
734 self.k8srepo = vim_sdn.K8sRepoLcm(
735 self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
736 )
737
738 self.loop.run_until_complete(
739 asyncio.gather(self.kafka_read(), self.kafka_ping())
740 )
741
742 # TODO
743 # self.logger.debug("Terminating cancelling creation tasks")
744 # self.lcm_tasks.cancel("ALL", "create")
745 # timeout = 200
746 # while self.is_pending_tasks():
747 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
748 # await asyncio.sleep(2, loop=self.loop)
749 # timeout -= 2
750 # if not timeout:
751 # self.lcm_tasks.cancel("ALL", "ALL")
752 self.loop.close()
753 self.loop = None
754 if self.db:
755 self.db.db_disconnect()
756 if self.msg:
757 self.msg.disconnect()
758 if self.msg_admin:
759 self.msg_admin.disconnect()
760 if self.fs:
761 self.fs.fs_disconnect()
762
763 def read_config_file(self, config_file):
764 try:
765 with open(config_file) as f:
766 return yaml.safe_load(f)
767 except Exception as e:
768 self.logger.critical("At config file '{}': {}".format(config_file, e))
769 exit(1)
770
771 @staticmethod
772 def get_process_id():
773 """
774 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
775 will provide a random one
776 :return: Obtained ID
777 """
778 # Try getting docker id. If fails, get pid
779 try:
780 with open("/proc/self/cgroup", "r") as f:
781 text_id_ = f.readline()
782 _, _, text_id = text_id_.rpartition("/")
783 text_id = text_id.replace("\n", "")[:12]
784 if text_id:
785 return text_id
786 except Exception:
787 pass
788 # Return a random id
789 return "".join(random_choice("0123456789abcdef") for _ in range(12))
790
791
792 def usage():
793 print(
794 """Usage: {} [options]
795 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
796 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
797 -h|--help: shows this help
798 """.format(
799 sys.argv[0]
800 )
801 )
802 # --log-socket-host HOST: send logs to this host")
803 # --log-socket-port PORT: send logs using this port (default: 9022)")
804
805
806 if __name__ == "__main__":
807
808 try:
809 # print("SYS.PATH='{}'".format(sys.path))
810 # load parameters and configuration
811 # -h
812 # -c value
813 # --config value
814 # --help
815 # --health-check
816 opts, args = getopt.getopt(
817 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
818 )
819 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
820 config_file = None
821 for o, a in opts:
822 if o in ("-h", "--help"):
823 usage()
824 sys.exit()
825 elif o in ("-c", "--config"):
826 config_file = a
827 elif o == "--health-check":
828 from osm_lcm.lcm_hc import health_check
829
830 health_check(config_file, Lcm.ping_interval_pace)
831 # elif o == "--log-socket-port":
832 # log_socket_port = a
833 # elif o == "--log-socket-host":
834 # log_socket_host = a
835 # elif o == "--log-file":
836 # log_file = a
837 else:
838 assert False, "Unhandled option"
839
840 if config_file:
841 if not path.isfile(config_file):
842 print(
843 "configuration file '{}' does not exist".format(config_file),
844 file=sys.stderr,
845 )
846 exit(1)
847 else:
848 for config_file in (
849 __file__[: __file__.rfind(".")] + ".cfg",
850 "./lcm.cfg",
851 "/etc/osm/lcm.cfg",
852 ):
853 if path.isfile(config_file):
854 break
855 else:
856 print(
857 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
858 file=sys.stderr,
859 )
860 exit(1)
861 lcm = Lcm(config_file)
862 lcm.start()
863 except (LcmException, getopt.GetoptError) as e:
864 print(str(e), file=sys.stderr)
865 # usage()
866 exit(1)