LCM Creates health check file according to configuration storage path
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.lcm_hc import get_health_check_file
48 from os import environ, path
49 from random import choice as random_choice
50 from n2vc import version as n2vc_version
51 import traceback
52
53 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
54 pdb.set_trace()
55
56
57 __author__ = "Alfonso Tierno"
58 min_RO_version = "6.0.2"
59 min_n2vc_version = "0.0.2"
60
61 min_common_version = "0.1.19"
62
63
64 class Lcm:
65
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70 cfg_logger_name = {
71 "message": "lcm.msg",
72 "database": "lcm.db",
73 "storage": "lcm.fs",
74 "tsdb": "lcm.prometheus",
75 }
76 # ^ contains for each section at lcm.cfg the used logger name
77
78 def __init__(self, config_file, loop=None):
79 """
80 Init, Connect to database, filesystem storage, and messaging
81 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
82 :return: None
83 """
84 self.db = None
85 self.msg = None
86 self.msg_admin = None
87 self.fs = None
88 self.pings_not_received = 1
89 self.consecutive_errors = 0
90 self.first_start = False
91
92 # logging
93 self.logger = logging.getLogger("lcm")
94 # get id
95 self.worker_id = self.get_process_id()
96 # load configuration
97 config = self.read_config_file(config_file)
98 self.config = config
99 self.health_check_file = get_health_check_file(self.config)
100 self.config["ro_config"] = {
101 "ng": config["RO"].get("ng", False),
102 "uri": config["RO"].get("uri"),
103 "tenant": config.get("tenant", "osm"),
104 "logger_name": "lcm.roclient",
105 "loglevel": config["RO"].get("loglevel", "ERROR"),
106 }
107 if not self.config["ro_config"]["uri"]:
108 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
109 config["RO"]["host"], config["RO"]["port"]
110 )
111 elif (
112 "/ro" in self.config["ro_config"]["uri"][-4:]
113 or "/openmano" in self.config["ro_config"]["uri"][-10:]
114 ):
115 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
116 index = self.config["ro_config"]["uri"][-1].rfind("/")
117 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
118
119 self.loop = loop or asyncio.get_event_loop()
120 self.ns = (
121 self.netslice
122 ) = (
123 self.vim
124 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
125
126 # logging
127 log_format_simple = (
128 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
129 )
130 log_formatter_simple = logging.Formatter(
131 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
132 )
133 config["database"]["logger_name"] = "lcm.db"
134 config["storage"]["logger_name"] = "lcm.fs"
135 config["message"]["logger_name"] = "lcm.msg"
136 if config["global"].get("logfile"):
137 file_handler = logging.handlers.RotatingFileHandler(
138 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
139 )
140 file_handler.setFormatter(log_formatter_simple)
141 self.logger.addHandler(file_handler)
142 if not config["global"].get("nologging"):
143 str_handler = logging.StreamHandler()
144 str_handler.setFormatter(log_formatter_simple)
145 self.logger.addHandler(str_handler)
146
147 if config["global"].get("loglevel"):
148 self.logger.setLevel(config["global"]["loglevel"])
149
150 # logging other modules
151 for k1, logname in self.cfg_logger_name.items():
152 config[k1]["logger_name"] = logname
153 logger_module = logging.getLogger(logname)
154 if config[k1].get("logfile"):
155 file_handler = logging.handlers.RotatingFileHandler(
156 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
157 )
158 file_handler.setFormatter(log_formatter_simple)
159 logger_module.addHandler(file_handler)
160 if config[k1].get("loglevel"):
161 logger_module.setLevel(config[k1]["loglevel"])
162 self.logger.critical(
163 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
164 )
165
166 # check version of N2VC
167 # TODO enhance with int conversion or from distutils.version import LooseVersion
168 # or with list(map(int, version.split(".")))
169 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
170 raise LcmException(
171 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
172 n2vc_version, min_n2vc_version
173 )
174 )
175 # check version of common
176 if versiontuple(common_version) < versiontuple(min_common_version):
177 raise LcmException(
178 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
179 common_version, min_common_version
180 )
181 )
182
183 try:
184 self.db = Database(config).instance.db
185
186 self.fs = Filesystem(config).instance.fs
187 self.fs.sync()
188
189 # copy message configuration in order to remove 'group_id' for msg_admin
190 config_message = config["message"].copy()
191 config_message["loop"] = self.loop
192 if config_message["driver"] == "local":
193 self.msg = msglocal.MsgLocal()
194 self.msg.connect(config_message)
195 self.msg_admin = msglocal.MsgLocal()
196 config_message.pop("group_id", None)
197 self.msg_admin.connect(config_message)
198 elif config_message["driver"] == "kafka":
199 self.msg = msgkafka.MsgKafka()
200 self.msg.connect(config_message)
201 self.msg_admin = msgkafka.MsgKafka()
202 config_message.pop("group_id", None)
203 self.msg_admin.connect(config_message)
204 else:
205 raise LcmException(
206 "Invalid configuration param '{}' at '[message]':'driver'".format(
207 config["message"]["driver"]
208 )
209 )
210 except (DbException, FsException, MsgException) as e:
211 self.logger.critical(str(e), exc_info=True)
212 raise LcmException(str(e))
213
214 # contains created tasks/futures to be able to cancel
215 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
216
217 async def check_RO_version(self):
218 tries = 14
219 last_error = None
220 while True:
221 ro_uri = self.config["ro_config"]["uri"]
222 try:
223 # try new RO, if fail old RO
224 try:
225 self.config["ro_config"]["uri"] = ro_uri + "ro"
226 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
227 ro_version = await ro_server.get_version()
228 self.config["ro_config"]["ng"] = True
229 except Exception:
230 self.config["ro_config"]["uri"] = ro_uri + "openmano"
231 ro_server = ROClient(self.loop, **self.config["ro_config"])
232 ro_version = await ro_server.get_version()
233 self.config["ro_config"]["ng"] = False
234 if versiontuple(ro_version) < versiontuple(min_RO_version):
235 raise LcmException(
236 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
237 ro_version, min_RO_version
238 )
239 )
240 self.logger.info(
241 "Connected to RO version {} new-generation version {}".format(
242 ro_version, self.config["ro_config"]["ng"]
243 )
244 )
245 return
246 except (ROClientException, NgRoException) as e:
247 self.config["ro_config"]["uri"] = ro_uri
248 tries -= 1
249 traceback.print_tb(e.__traceback__)
250 error_text = "Error while connecting to RO on {}: {}".format(
251 self.config["ro_config"]["uri"], e
252 )
253 if tries <= 0:
254 self.logger.critical(error_text)
255 raise LcmException(error_text)
256 if last_error != error_text:
257 last_error = error_text
258 self.logger.error(
259 error_text + ". Waiting until {} seconds".format(5 * tries)
260 )
261 await asyncio.sleep(5)
262
263 async def test(self, param=None):
264 self.logger.debug("Starting/Ending test task: {}".format(param))
265
266 async def kafka_ping(self):
267 self.logger.debug("Task kafka_ping Enter")
268 consecutive_errors = 0
269 first_start = True
270 kafka_has_received = False
271 self.pings_not_received = 1
272 while True:
273 try:
274 await self.msg_admin.aiowrite(
275 "admin",
276 "ping",
277 {
278 "from": "lcm",
279 "to": "lcm",
280 "worker_id": self.worker_id,
281 "version": lcm_version,
282 },
283 self.loop,
284 )
285 # time between pings are low when it is not received and at starting
286 wait_time = (
287 self.ping_interval_boot
288 if not kafka_has_received
289 else self.ping_interval_pace
290 )
291 if not self.pings_not_received:
292 kafka_has_received = True
293 self.pings_not_received += 1
294 await asyncio.sleep(wait_time, loop=self.loop)
295 if self.pings_not_received > 10:
296 raise LcmException("It is not receiving pings from Kafka bus")
297 consecutive_errors = 0
298 first_start = False
299 except LcmException:
300 raise
301 except Exception as e:
302 # if not first_start is the first time after starting. So leave more time and wait
303 # to allow kafka starts
304 if consecutive_errors == 8 if not first_start else 30:
305 self.logger.error(
306 "Task kafka_read task exit error too many errors. Exception: {}".format(
307 e
308 )
309 )
310 raise
311 consecutive_errors += 1
312 self.logger.error(
313 "Task kafka_read retrying after Exception {}".format(e)
314 )
315 wait_time = 2 if not first_start else 5
316 await asyncio.sleep(wait_time, loop=self.loop)
317
318 def kafka_read_callback(self, topic, command, params):
319 order_id = 1
320
321 if topic != "admin" and command != "ping":
322 self.logger.debug(
323 "Task kafka_read receives {} {}: {}".format(topic, command, params)
324 )
325 self.consecutive_errors = 0
326 self.first_start = False
327 order_id += 1
328 if command == "exit":
329 raise LcmExceptionExit
330 elif command.startswith("#"):
331 return
332 elif command == "echo":
333 # just for test
334 print(params)
335 sys.stdout.flush()
336 return
337 elif command == "test":
338 asyncio.Task(self.test(params), loop=self.loop)
339 return
340
341 if topic == "admin":
342 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
343 if params.get("worker_id") != self.worker_id:
344 return
345 self.pings_not_received = 0
346 try:
347 with open(self.health_check_file, "w") as f:
348 f.write(str(time()))
349 except Exception as e:
350 self.logger.error(
351 "Cannot write into '{}' for healthcheck: {}".format(
352 self.health_check_file, e
353 )
354 )
355 return
356 elif topic == "pla":
357 if command == "placement":
358 self.ns.update_nsrs_with_pla_result(params)
359 return
360 elif topic == "k8scluster":
361 if command == "create" or command == "created":
362 k8scluster_id = params.get("_id")
363 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
364 self.lcm_tasks.register(
365 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
366 )
367 return
368 elif command == "delete" or command == "deleted":
369 k8scluster_id = params.get("_id")
370 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
371 self.lcm_tasks.register(
372 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
373 )
374 return
375 elif topic == "vca":
376 if command == "create" or command == "created":
377 vca_id = params.get("_id")
378 task = asyncio.ensure_future(self.vca.create(params, order_id))
379 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
380 return
381 elif command == "delete" or command == "deleted":
382 vca_id = params.get("_id")
383 task = asyncio.ensure_future(self.vca.delete(params, order_id))
384 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
385 return
386 elif topic == "k8srepo":
387 if command == "create" or command == "created":
388 k8srepo_id = params.get("_id")
389 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
390 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
391 self.lcm_tasks.register(
392 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
393 )
394 return
395 elif command == "delete" or command == "deleted":
396 k8srepo_id = params.get("_id")
397 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
398 self.lcm_tasks.register(
399 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
400 )
401 return
402 elif topic == "ns":
403 if command == "instantiate":
404 # self.logger.debug("Deploying NS {}".format(nsr_id))
405 nslcmop = params
406 nslcmop_id = nslcmop["_id"]
407 nsr_id = nslcmop["nsInstanceId"]
408 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
409 self.lcm_tasks.register(
410 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
411 )
412 return
413 elif command == "terminate":
414 # self.logger.debug("Deleting NS {}".format(nsr_id))
415 nslcmop = params
416 nslcmop_id = nslcmop["_id"]
417 nsr_id = nslcmop["nsInstanceId"]
418 self.lcm_tasks.cancel(topic, nsr_id)
419 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
420 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
421 return
422 elif command == "vca_status_refresh":
423 nslcmop = params
424 nslcmop_id = nslcmop["_id"]
425 nsr_id = nslcmop["nsInstanceId"]
426 task = asyncio.ensure_future(
427 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
428 )
429 self.lcm_tasks.register(
430 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
431 )
432 return
433 elif command == "action":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
440 return
441 elif command == "update":
442 # self.logger.debug("Update NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
448 return
449 elif command == "scale":
450 # self.logger.debug("Update NS {}".format(nsr_id))
451 nslcmop = params
452 nslcmop_id = nslcmop["_id"]
453 nsr_id = nslcmop["nsInstanceId"]
454 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
455 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
456 return
457 elif command == "heal":
458 # self.logger.debug("Healing NS {}".format(nsr_id))
459 nslcmop = params
460 nslcmop_id = nslcmop["_id"]
461 nsr_id = nslcmop["nsInstanceId"]
462 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
463 self.lcm_tasks.register(
464 "ns", nsr_id, nslcmop_id, "ns_heal", task
465 )
466 return
467 elif command == "migrate":
468 nslcmop = params
469 nslcmop_id = nslcmop["_id"]
470 nsr_id = nslcmop["nsInstanceId"]
471 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
472 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
473 return
474 elif command == "verticalscale":
475 nslcmop = params
476 nslcmop_id = nslcmop["_id"]
477 nsr_id = nslcmop["nsInstanceId"]
478 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
479 self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task))
480 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task)
481 self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task))
482 return
483 elif command == "show":
484 nsr_id = params
485 try:
486 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
487 print(
488 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
489 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
490 "".format(
491 nsr_id,
492 db_nsr["operational-status"],
493 db_nsr["config-status"],
494 db_nsr["detailed-status"],
495 db_nsr["_admin"]["deployed"],
496 self.lcm_ns_tasks.get(nsr_id),
497 )
498 )
499 except Exception as e:
500 print("nsr {} not found: {}".format(nsr_id, e))
501 sys.stdout.flush()
502 return
503 elif command == "deleted":
504 return # TODO cleaning of task just in case should be done
505 elif command in (
506 "vnf_terminated",
507 "policy_updated",
508 "terminated",
509 "instantiated",
510 "scaled",
511 "healed",
512 "actioned",
513 "updated",
514 "migrated",
515 "verticalscaled",
516 ): # "scaled-cooldown-time"
517 return
518
519 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
520 if command == "instantiate":
521 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
522 nsilcmop = params
523 nsilcmop_id = nsilcmop["_id"] # slice operation id
524 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
525 task = asyncio.ensure_future(
526 self.netslice.instantiate(nsir_id, nsilcmop_id)
527 )
528 self.lcm_tasks.register(
529 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
530 )
531 return
532 elif command == "terminate":
533 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
534 nsilcmop = params
535 nsilcmop_id = nsilcmop["_id"] # slice operation id
536 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
537 self.lcm_tasks.cancel(topic, nsir_id)
538 task = asyncio.ensure_future(
539 self.netslice.terminate(nsir_id, nsilcmop_id)
540 )
541 self.lcm_tasks.register(
542 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
543 )
544 return
545 elif command == "show":
546 nsir_id = params
547 try:
548 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
549 print(
550 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
551 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
552 "".format(
553 nsir_id,
554 db_nsir["operational-status"],
555 db_nsir["config-status"],
556 db_nsir["detailed-status"],
557 db_nsir["_admin"]["deployed"],
558 self.lcm_netslice_tasks.get(nsir_id),
559 )
560 )
561 except Exception as e:
562 print("nsir {} not found: {}".format(nsir_id, e))
563 sys.stdout.flush()
564 return
565 elif command == "deleted":
566 return # TODO cleaning of task just in case should be done
567 elif command in (
568 "terminated",
569 "instantiated",
570 "scaled",
571 "healed",
572 "actioned",
573 ): # "scaled-cooldown-time"
574 return
575 elif topic == "vim_account":
576 vim_id = params["_id"]
577 if command in ("create", "created"):
578 if not self.config["ro_config"].get("ng"):
579 task = asyncio.ensure_future(self.vim.create(params, order_id))
580 self.lcm_tasks.register(
581 "vim_account", vim_id, order_id, "vim_create", task
582 )
583 return
584 elif command == "delete" or command == "deleted":
585 self.lcm_tasks.cancel(topic, vim_id)
586 task = asyncio.ensure_future(self.vim.delete(params, order_id))
587 self.lcm_tasks.register(
588 "vim_account", vim_id, order_id, "vim_delete", task
589 )
590 return
591 elif command == "show":
592 print("not implemented show with vim_account")
593 sys.stdout.flush()
594 return
595 elif command in ("edit", "edited"):
596 if not self.config["ro_config"].get("ng"):
597 task = asyncio.ensure_future(self.vim.edit(params, order_id))
598 self.lcm_tasks.register(
599 "vim_account", vim_id, order_id, "vim_edit", task
600 )
601 return
602 elif command == "deleted":
603 return # TODO cleaning of task just in case should be done
604 elif topic == "wim_account":
605 wim_id = params["_id"]
606 if command in ("create", "created"):
607 if not self.config["ro_config"].get("ng"):
608 task = asyncio.ensure_future(self.wim.create(params, order_id))
609 self.lcm_tasks.register(
610 "wim_account", wim_id, order_id, "wim_create", task
611 )
612 return
613 elif command == "delete" or command == "deleted":
614 self.lcm_tasks.cancel(topic, wim_id)
615 task = asyncio.ensure_future(self.wim.delete(params, order_id))
616 self.lcm_tasks.register(
617 "wim_account", wim_id, order_id, "wim_delete", task
618 )
619 return
620 elif command == "show":
621 print("not implemented show with wim_account")
622 sys.stdout.flush()
623 return
624 elif command in ("edit", "edited"):
625 task = asyncio.ensure_future(self.wim.edit(params, order_id))
626 self.lcm_tasks.register(
627 "wim_account", wim_id, order_id, "wim_edit", task
628 )
629 return
630 elif command == "deleted":
631 return # TODO cleaning of task just in case should be done
632 elif topic == "sdn":
633 _sdn_id = params["_id"]
634 if command in ("create", "created"):
635 if not self.config["ro_config"].get("ng"):
636 task = asyncio.ensure_future(self.sdn.create(params, order_id))
637 self.lcm_tasks.register(
638 "sdn", _sdn_id, order_id, "sdn_create", task
639 )
640 return
641 elif command == "delete" or command == "deleted":
642 self.lcm_tasks.cancel(topic, _sdn_id)
643 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
644 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
645 return
646 elif command in ("edit", "edited"):
647 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
648 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
649 return
650 elif command == "deleted":
651 return # TODO cleaning of task just in case should be done
652 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
653
654 async def kafka_read(self):
655 self.logger.debug(
656 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
657 )
658 # future = asyncio.Future()
659 self.consecutive_errors = 0
660 self.first_start = True
661 while self.consecutive_errors < 10:
662 try:
663 topics = (
664 "ns",
665 "vim_account",
666 "wim_account",
667 "sdn",
668 "nsi",
669 "k8scluster",
670 "vca",
671 "k8srepo",
672 "pla",
673 )
674 topics_admin = ("admin",)
675 await asyncio.gather(
676 self.msg.aioread(
677 topics, self.loop, self.kafka_read_callback, from_beginning=True
678 ),
679 self.msg_admin.aioread(
680 topics_admin,
681 self.loop,
682 self.kafka_read_callback,
683 group_id=False,
684 ),
685 )
686
687 except LcmExceptionExit:
688 self.logger.debug("Bye!")
689 break
690 except Exception as e:
691 # if not first_start is the first time after starting. So leave more time and wait
692 # to allow kafka starts
693 if self.consecutive_errors == 8 if not self.first_start else 30:
694 self.logger.error(
695 "Task kafka_read task exit error too many errors. Exception: {}".format(
696 e
697 )
698 )
699 raise
700 self.consecutive_errors += 1
701 self.logger.error(
702 "Task kafka_read retrying after Exception {}".format(e)
703 )
704 wait_time = 2 if not self.first_start else 5
705 await asyncio.sleep(wait_time, loop=self.loop)
706
707 # self.logger.debug("Task kafka_read terminating")
708 self.logger.debug("Task kafka_read exit")
709
710 def start(self):
711
712 # check RO version
713 self.loop.run_until_complete(self.check_RO_version())
714
715 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
716 self.netslice = netslice.NetsliceLcm(
717 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
718 )
719 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
720 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
721 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
722 self.k8scluster = vim_sdn.K8sClusterLcm(
723 self.msg, self.lcm_tasks, self.config, self.loop
724 )
725 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
726 self.k8srepo = vim_sdn.K8sRepoLcm(
727 self.msg, self.lcm_tasks, self.config, self.loop
728 )
729
730 self.loop.run_until_complete(
731 asyncio.gather(self.kafka_read(), self.kafka_ping())
732 )
733
734 # TODO
735 # self.logger.debug("Terminating cancelling creation tasks")
736 # self.lcm_tasks.cancel("ALL", "create")
737 # timeout = 200
738 # while self.is_pending_tasks():
739 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
740 # await asyncio.sleep(2, loop=self.loop)
741 # timeout -= 2
742 # if not timeout:
743 # self.lcm_tasks.cancel("ALL", "ALL")
744 self.loop.close()
745 self.loop = None
746 if self.db:
747 self.db.db_disconnect()
748 if self.msg:
749 self.msg.disconnect()
750 if self.msg_admin:
751 self.msg_admin.disconnect()
752 if self.fs:
753 self.fs.fs_disconnect()
754
755 def read_config_file(self, config_file):
756 # TODO make a [ini] + yaml inside parser
757 # the configparser library is not suitable, because it does not admit comments at the end of line,
758 # and not parse integer or boolean
759 try:
760 # read file as yaml format
761 with open(config_file) as f:
762 conf = yaml.load(f, Loader=yaml.Loader)
763 # Ensure all sections are not empty
764 for k in (
765 "global",
766 "timeout",
767 "RO",
768 "VCA",
769 "database",
770 "storage",
771 "message",
772 ):
773 if not conf.get(k):
774 conf[k] = {}
775
776 # read all environ that starts with OSMLCM_
777 for k, v in environ.items():
778 if not k.startswith("OSMLCM_"):
779 continue
780 subject, _, item = k[7:].lower().partition("_")
781 if not item:
782 continue
783 if subject in ("ro", "vca"):
784 # put in capital letter
785 subject = subject.upper()
786 try:
787 if item == "port" or subject == "timeout":
788 conf[subject][item] = int(v)
789 else:
790 conf[subject][item] = v
791 except Exception as e:
792 self.logger.warning(
793 "skipping environ '{}' on exception '{}'".format(k, e)
794 )
795
796 # backward compatibility of VCA parameters
797
798 if "pubkey" in conf["VCA"]:
799 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
800 if "cacert" in conf["VCA"]:
801 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
802 if "apiproxy" in conf["VCA"]:
803 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
804
805 if "enableosupgrade" in conf["VCA"]:
806 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
807 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
808 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
809 conf["VCA"]["enable_os_upgrade"] = False
810 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
811 conf["VCA"]["enable_os_upgrade"] = True
812
813 if "aptmirror" in conf["VCA"]:
814 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
815
816 return conf
817 except Exception as e:
818 self.logger.critical("At config file '{}': {}".format(config_file, e))
819 exit(1)
820
821 @staticmethod
822 def get_process_id():
823 """
824 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
825 will provide a random one
826 :return: Obtained ID
827 """
828 # Try getting docker id. If fails, get pid
829 try:
830 with open("/proc/self/cgroup", "r") as f:
831 text_id_ = f.readline()
832 _, _, text_id = text_id_.rpartition("/")
833 text_id = text_id.replace("\n", "")[:12]
834 if text_id:
835 return text_id
836 except Exception:
837 pass
838 # Return a random id
839 return "".join(random_choice("0123456789abcdef") for _ in range(12))
840
841
842 def usage():
843 print(
844 """Usage: {} [options]
845 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
846 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
847 -h|--help: shows this help
848 """.format(
849 sys.argv[0]
850 )
851 )
852 # --log-socket-host HOST: send logs to this host")
853 # --log-socket-port PORT: send logs using this port (default: 9022)")
854
855
856 if __name__ == "__main__":
857
858 try:
859 # print("SYS.PATH='{}'".format(sys.path))
860 # load parameters and configuration
861 # -h
862 # -c value
863 # --config value
864 # --help
865 # --health-check
866 opts, args = getopt.getopt(
867 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
868 )
869 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
870 config_file = None
871 for o, a in opts:
872 if o in ("-h", "--help"):
873 usage()
874 sys.exit()
875 elif o in ("-c", "--config"):
876 config_file = a
877 elif o == "--health-check":
878 from osm_lcm.lcm_hc import health_check
879
880 health_check(config_file, Lcm.ping_interval_pace)
881 # elif o == "--log-socket-port":
882 # log_socket_port = a
883 # elif o == "--log-socket-host":
884 # log_socket_host = a
885 # elif o == "--log-file":
886 # log_file = a
887 else:
888 assert False, "Unhandled option"
889
890 if config_file:
891 if not path.isfile(config_file):
892 print(
893 "configuration file '{}' does not exist".format(config_file),
894 file=sys.stderr,
895 )
896 exit(1)
897 else:
898 for config_file in (
899 __file__[: __file__.rfind(".")] + ".cfg",
900 "./lcm.cfg",
901 "/etc/osm/lcm.cfg",
902 ):
903 if path.isfile(config_file):
904 break
905 else:
906 print(
907 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
908 file=sys.stderr,
909 )
910 exit(1)
911 lcm = Lcm(config_file)
912 lcm.start()
913 except (LcmException, getopt.GetoptError) as e:
914 print(str(e), file=sys.stderr)
915 # usage()
916 exit(1)