273edc18e06e286c0e24592a566f8f583954bdcd
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.lcm_hc import get_health_check_file
48 from os import environ, path
49 from random import choice as random_choice
50 from n2vc import version as n2vc_version
51 import traceback
52
53 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
54 pdb.set_trace()
55
56
57 __author__ = "Alfonso Tierno"
58 min_RO_version = "6.0.2"
59 min_n2vc_version = "0.0.2"
60
61 min_common_version = "0.1.19"
62
63
64 class Lcm:
65
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70 cfg_logger_name = {
71 "message": "lcm.msg",
72 "database": "lcm.db",
73 "storage": "lcm.fs",
74 "tsdb": "lcm.prometheus",
75 }
76 # ^ contains for each section at lcm.cfg the used logger name
77
78 def __init__(self, config_file, loop=None):
79 """
80 Init, Connect to database, filesystem storage, and messaging
81 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
82 :return: None
83 """
84 self.db = None
85 self.msg = None
86 self.msg_admin = None
87 self.fs = None
88 self.pings_not_received = 1
89 self.consecutive_errors = 0
90 self.first_start = False
91
92 # logging
93 self.logger = logging.getLogger("lcm")
94 # get id
95 self.worker_id = self.get_process_id()
96 # load configuration
97 config = self.read_config_file(config_file)
98 self.config = config
99 self.health_check_file = get_health_check_file(self.config)
100 self.config["ro_config"] = {
101 "ng": config["RO"].get("ng", False),
102 "uri": config["RO"].get("uri"),
103 "tenant": config.get("tenant", "osm"),
104 "logger_name": "lcm.roclient",
105 "loglevel": config["RO"].get("loglevel", "ERROR"),
106 }
107 if not self.config["ro_config"]["uri"]:
108 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
109 config["RO"]["host"], config["RO"]["port"]
110 )
111 elif (
112 "/ro" in self.config["ro_config"]["uri"][-4:]
113 or "/openmano" in self.config["ro_config"]["uri"][-10:]
114 ):
115 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
116 index = self.config["ro_config"]["uri"][-1].rfind("/")
117 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
118
119 self.loop = loop or asyncio.get_event_loop()
120 self.ns = (
121 self.netslice
122 ) = (
123 self.vim
124 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
125
126 # logging
127 log_format_simple = (
128 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
129 )
130 log_formatter_simple = logging.Formatter(
131 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
132 )
133 config["database"]["logger_name"] = "lcm.db"
134 config["storage"]["logger_name"] = "lcm.fs"
135 config["message"]["logger_name"] = "lcm.msg"
136 if config["global"].get("logfile"):
137 file_handler = logging.handlers.RotatingFileHandler(
138 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
139 )
140 file_handler.setFormatter(log_formatter_simple)
141 self.logger.addHandler(file_handler)
142 if not config["global"].get("nologging"):
143 str_handler = logging.StreamHandler()
144 str_handler.setFormatter(log_formatter_simple)
145 self.logger.addHandler(str_handler)
146
147 if config["global"].get("loglevel"):
148 self.logger.setLevel(config["global"]["loglevel"])
149
150 # logging other modules
151 for k1, logname in self.cfg_logger_name.items():
152 config[k1]["logger_name"] = logname
153 logger_module = logging.getLogger(logname)
154 if config[k1].get("logfile"):
155 file_handler = logging.handlers.RotatingFileHandler(
156 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
157 )
158 file_handler.setFormatter(log_formatter_simple)
159 logger_module.addHandler(file_handler)
160 if config[k1].get("loglevel"):
161 logger_module.setLevel(config[k1]["loglevel"])
162 self.logger.critical(
163 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
164 )
165
166 # check version of N2VC
167 # TODO enhance with int conversion or from distutils.version import LooseVersion
168 # or with list(map(int, version.split(".")))
169 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
170 raise LcmException(
171 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
172 n2vc_version, min_n2vc_version
173 )
174 )
175 # check version of common
176 if versiontuple(common_version) < versiontuple(min_common_version):
177 raise LcmException(
178 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
179 common_version, min_common_version
180 )
181 )
182
183 try:
184 self.db = Database(config).instance.db
185
186 self.fs = Filesystem(config).instance.fs
187 self.fs.sync()
188
189 # copy message configuration in order to remove 'group_id' for msg_admin
190 config_message = config["message"].copy()
191 config_message["loop"] = self.loop
192 if config_message["driver"] == "local":
193 self.msg = msglocal.MsgLocal()
194 self.msg.connect(config_message)
195 self.msg_admin = msglocal.MsgLocal()
196 config_message.pop("group_id", None)
197 self.msg_admin.connect(config_message)
198 elif config_message["driver"] == "kafka":
199 self.msg = msgkafka.MsgKafka()
200 self.msg.connect(config_message)
201 self.msg_admin = msgkafka.MsgKafka()
202 config_message.pop("group_id", None)
203 self.msg_admin.connect(config_message)
204 else:
205 raise LcmException(
206 "Invalid configuration param '{}' at '[message]':'driver'".format(
207 config["message"]["driver"]
208 )
209 )
210 except (DbException, FsException, MsgException) as e:
211 self.logger.critical(str(e), exc_info=True)
212 raise LcmException(str(e))
213
214 # contains created tasks/futures to be able to cancel
215 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
216
217 async def check_RO_version(self):
218 tries = 14
219 last_error = None
220 while True:
221 ro_uri = self.config["ro_config"]["uri"]
222 try:
223 # try new RO, if fail old RO
224 try:
225 self.config["ro_config"]["uri"] = ro_uri + "ro"
226 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
227 ro_version = await ro_server.get_version()
228 self.config["ro_config"]["ng"] = True
229 except Exception:
230 self.config["ro_config"]["uri"] = ro_uri + "openmano"
231 ro_server = ROClient(self.loop, **self.config["ro_config"])
232 ro_version = await ro_server.get_version()
233 self.config["ro_config"]["ng"] = False
234 if versiontuple(ro_version) < versiontuple(min_RO_version):
235 raise LcmException(
236 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
237 ro_version, min_RO_version
238 )
239 )
240 self.logger.info(
241 "Connected to RO version {} new-generation version {}".format(
242 ro_version, self.config["ro_config"]["ng"]
243 )
244 )
245 return
246 except (ROClientException, NgRoException) as e:
247 self.config["ro_config"]["uri"] = ro_uri
248 tries -= 1
249 traceback.print_tb(e.__traceback__)
250 error_text = "Error while connecting to RO on {}: {}".format(
251 self.config["ro_config"]["uri"], e
252 )
253 if tries <= 0:
254 self.logger.critical(error_text)
255 raise LcmException(error_text)
256 if last_error != error_text:
257 last_error = error_text
258 self.logger.error(
259 error_text + ". Waiting until {} seconds".format(5 * tries)
260 )
261 await asyncio.sleep(5)
262
263 async def test(self, param=None):
264 self.logger.debug("Starting/Ending test task: {}".format(param))
265
266 async def kafka_ping(self):
267 self.logger.debug("Task kafka_ping Enter")
268 consecutive_errors = 0
269 first_start = True
270 kafka_has_received = False
271 self.pings_not_received = 1
272 while True:
273 try:
274 await self.msg_admin.aiowrite(
275 "admin",
276 "ping",
277 {
278 "from": "lcm",
279 "to": "lcm",
280 "worker_id": self.worker_id,
281 "version": lcm_version,
282 },
283 self.loop,
284 )
285 # time between pings are low when it is not received and at starting
286 wait_time = (
287 self.ping_interval_boot
288 if not kafka_has_received
289 else self.ping_interval_pace
290 )
291 if not self.pings_not_received:
292 kafka_has_received = True
293 self.pings_not_received += 1
294 await asyncio.sleep(wait_time, loop=self.loop)
295 if self.pings_not_received > 10:
296 raise LcmException("It is not receiving pings from Kafka bus")
297 consecutive_errors = 0
298 first_start = False
299 except LcmException:
300 raise
301 except Exception as e:
302 # if not first_start is the first time after starting. So leave more time and wait
303 # to allow kafka starts
304 if consecutive_errors == 8 if not first_start else 30:
305 self.logger.error(
306 "Task kafka_read task exit error too many errors. Exception: {}".format(
307 e
308 )
309 )
310 raise
311 consecutive_errors += 1
312 self.logger.error(
313 "Task kafka_read retrying after Exception {}".format(e)
314 )
315 wait_time = 2 if not first_start else 5
316 await asyncio.sleep(wait_time, loop=self.loop)
317
318 def kafka_read_callback(self, topic, command, params):
319 order_id = 1
320
321 if topic != "admin" and command != "ping":
322 self.logger.debug(
323 "Task kafka_read receives {} {}: {}".format(topic, command, params)
324 )
325 self.consecutive_errors = 0
326 self.first_start = False
327 order_id += 1
328 if command == "exit":
329 raise LcmExceptionExit
330 elif command.startswith("#"):
331 return
332 elif command == "echo":
333 # just for test
334 print(params)
335 sys.stdout.flush()
336 return
337 elif command == "test":
338 asyncio.Task(self.test(params), loop=self.loop)
339 return
340
341 if topic == "admin":
342 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
343 if params.get("worker_id") != self.worker_id:
344 return
345 self.pings_not_received = 0
346 try:
347 with open(self.health_check_file, "w") as f:
348 f.write(str(time()))
349 except Exception as e:
350 self.logger.error(
351 "Cannot write into '{}' for healthcheck: {}".format(
352 self.health_check_file, e
353 )
354 )
355 return
356 elif topic == "pla":
357 if command == "placement":
358 self.ns.update_nsrs_with_pla_result(params)
359 return
360 elif topic == "k8scluster":
361 if command == "create" or command == "created":
362 k8scluster_id = params.get("_id")
363 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
364 self.lcm_tasks.register(
365 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
366 )
367 return
368 elif command == "delete" or command == "deleted":
369 k8scluster_id = params.get("_id")
370 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
371 self.lcm_tasks.register(
372 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
373 )
374 return
375 elif topic == "vca":
376 if command == "create" or command == "created":
377 vca_id = params.get("_id")
378 task = asyncio.ensure_future(self.vca.create(params, order_id))
379 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
380 return
381 elif command == "delete" or command == "deleted":
382 vca_id = params.get("_id")
383 task = asyncio.ensure_future(self.vca.delete(params, order_id))
384 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
385 return
386 elif topic == "k8srepo":
387 if command == "create" or command == "created":
388 k8srepo_id = params.get("_id")
389 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
390 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
391 self.lcm_tasks.register(
392 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
393 )
394 return
395 elif command == "delete" or command == "deleted":
396 k8srepo_id = params.get("_id")
397 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
398 self.lcm_tasks.register(
399 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
400 )
401 return
402 elif topic == "ns":
403 if command == "instantiate":
404 # self.logger.debug("Deploying NS {}".format(nsr_id))
405 nslcmop = params
406 nslcmop_id = nslcmop["_id"]
407 nsr_id = nslcmop["nsInstanceId"]
408 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
409 self.lcm_tasks.register(
410 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
411 )
412 return
413 elif command == "terminate":
414 # self.logger.debug("Deleting NS {}".format(nsr_id))
415 nslcmop = params
416 nslcmop_id = nslcmop["_id"]
417 nsr_id = nslcmop["nsInstanceId"]
418 self.lcm_tasks.cancel(topic, nsr_id)
419 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
420 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
421 return
422 elif command == "vca_status_refresh":
423 nslcmop = params
424 nslcmop_id = nslcmop["_id"]
425 nsr_id = nslcmop["nsInstanceId"]
426 task = asyncio.ensure_future(
427 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
428 )
429 self.lcm_tasks.register(
430 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
431 )
432 return
433 elif command == "action":
434 # self.logger.debug("Update NS {}".format(nsr_id))
435 nslcmop = params
436 nslcmop_id = nslcmop["_id"]
437 nsr_id = nslcmop["nsInstanceId"]
438 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
439 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
440 return
441 elif command == "update":
442 # self.logger.debug("Update NS {}".format(nsr_id))
443 nslcmop = params
444 nslcmop_id = nslcmop["_id"]
445 nsr_id = nslcmop["nsInstanceId"]
446 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
447 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
448 return
449 elif command == "scale":
450 # self.logger.debug("Update NS {}".format(nsr_id))
451 nslcmop = params
452 nslcmop_id = nslcmop["_id"]
453 nsr_id = nslcmop["nsInstanceId"]
454 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
455 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
456 return
457 elif command == "heal":
458 # self.logger.debug("Healing NS {}".format(nsr_id))
459 nslcmop = params
460 nslcmop_id = nslcmop["_id"]
461 nsr_id = nslcmop["nsInstanceId"]
462 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
463 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
464 return
465 elif command == "migrate":
466 nslcmop = params
467 nslcmop_id = nslcmop["_id"]
468 nsr_id = nslcmop["nsInstanceId"]
469 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
470 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
471 return
472 elif command == "verticalscale":
473 nslcmop = params
474 nslcmop_id = nslcmop["_id"]
475 nsr_id = nslcmop["nsInstanceId"]
476 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
477 self.logger.debug(
478 "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
479 )
480 self.lcm_tasks.register(
481 "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
482 )
483 self.logger.debug(
484 "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
485 )
486 return
487 elif command == "show":
488 nsr_id = params
489 try:
490 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
491 print(
492 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
493 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
494 "".format(
495 nsr_id,
496 db_nsr["operational-status"],
497 db_nsr["config-status"],
498 db_nsr["detailed-status"],
499 db_nsr["_admin"]["deployed"],
500 self.lcm_ns_tasks.get(nsr_id),
501 )
502 )
503 except Exception as e:
504 print("nsr {} not found: {}".format(nsr_id, e))
505 sys.stdout.flush()
506 return
507 elif command == "deleted":
508 return # TODO cleaning of task just in case should be done
509 elif command in (
510 "vnf_terminated",
511 "policy_updated",
512 "terminated",
513 "instantiated",
514 "scaled",
515 "healed",
516 "actioned",
517 "updated",
518 "migrated",
519 "verticalscaled",
520 ): # "scaled-cooldown-time"
521 return
522
523 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
524 if command == "instantiate":
525 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
526 nsilcmop = params
527 nsilcmop_id = nsilcmop["_id"] # slice operation id
528 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
529 task = asyncio.ensure_future(
530 self.netslice.instantiate(nsir_id, nsilcmop_id)
531 )
532 self.lcm_tasks.register(
533 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
534 )
535 return
536 elif command == "terminate":
537 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
538 nsilcmop = params
539 nsilcmop_id = nsilcmop["_id"] # slice operation id
540 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
541 self.lcm_tasks.cancel(topic, nsir_id)
542 task = asyncio.ensure_future(
543 self.netslice.terminate(nsir_id, nsilcmop_id)
544 )
545 self.lcm_tasks.register(
546 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
547 )
548 return
549 elif command == "show":
550 nsir_id = params
551 try:
552 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
553 print(
554 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
555 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
556 "".format(
557 nsir_id,
558 db_nsir["operational-status"],
559 db_nsir["config-status"],
560 db_nsir["detailed-status"],
561 db_nsir["_admin"]["deployed"],
562 self.lcm_netslice_tasks.get(nsir_id),
563 )
564 )
565 except Exception as e:
566 print("nsir {} not found: {}".format(nsir_id, e))
567 sys.stdout.flush()
568 return
569 elif command == "deleted":
570 return # TODO cleaning of task just in case should be done
571 elif command in (
572 "terminated",
573 "instantiated",
574 "scaled",
575 "healed",
576 "actioned",
577 ): # "scaled-cooldown-time"
578 return
579 elif topic == "vim_account":
580 vim_id = params["_id"]
581 if command in ("create", "created"):
582 if not self.config["ro_config"].get("ng"):
583 task = asyncio.ensure_future(self.vim.create(params, order_id))
584 self.lcm_tasks.register(
585 "vim_account", vim_id, order_id, "vim_create", task
586 )
587 return
588 elif command == "delete" or command == "deleted":
589 self.lcm_tasks.cancel(topic, vim_id)
590 task = asyncio.ensure_future(self.vim.delete(params, order_id))
591 self.lcm_tasks.register(
592 "vim_account", vim_id, order_id, "vim_delete", task
593 )
594 return
595 elif command == "show":
596 print("not implemented show with vim_account")
597 sys.stdout.flush()
598 return
599 elif command in ("edit", "edited"):
600 if not self.config["ro_config"].get("ng"):
601 task = asyncio.ensure_future(self.vim.edit(params, order_id))
602 self.lcm_tasks.register(
603 "vim_account", vim_id, order_id, "vim_edit", task
604 )
605 return
606 elif command == "deleted":
607 return # TODO cleaning of task just in case should be done
608 elif topic == "wim_account":
609 wim_id = params["_id"]
610 if command in ("create", "created"):
611 if not self.config["ro_config"].get("ng"):
612 task = asyncio.ensure_future(self.wim.create(params, order_id))
613 self.lcm_tasks.register(
614 "wim_account", wim_id, order_id, "wim_create", task
615 )
616 return
617 elif command == "delete" or command == "deleted":
618 self.lcm_tasks.cancel(topic, wim_id)
619 task = asyncio.ensure_future(self.wim.delete(params, order_id))
620 self.lcm_tasks.register(
621 "wim_account", wim_id, order_id, "wim_delete", task
622 )
623 return
624 elif command == "show":
625 print("not implemented show with wim_account")
626 sys.stdout.flush()
627 return
628 elif command in ("edit", "edited"):
629 task = asyncio.ensure_future(self.wim.edit(params, order_id))
630 self.lcm_tasks.register(
631 "wim_account", wim_id, order_id, "wim_edit", task
632 )
633 return
634 elif command == "deleted":
635 return # TODO cleaning of task just in case should be done
636 elif topic == "sdn":
637 _sdn_id = params["_id"]
638 if command in ("create", "created"):
639 if not self.config["ro_config"].get("ng"):
640 task = asyncio.ensure_future(self.sdn.create(params, order_id))
641 self.lcm_tasks.register(
642 "sdn", _sdn_id, order_id, "sdn_create", task
643 )
644 return
645 elif command == "delete" or command == "deleted":
646 self.lcm_tasks.cancel(topic, _sdn_id)
647 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
648 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
649 return
650 elif command in ("edit", "edited"):
651 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
652 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
653 return
654 elif command == "deleted":
655 return # TODO cleaning of task just in case should be done
656 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
657
658 async def kafka_read(self):
659 self.logger.debug(
660 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
661 )
662 # future = asyncio.Future()
663 self.consecutive_errors = 0
664 self.first_start = True
665 while self.consecutive_errors < 10:
666 try:
667 topics = (
668 "ns",
669 "vim_account",
670 "wim_account",
671 "sdn",
672 "nsi",
673 "k8scluster",
674 "vca",
675 "k8srepo",
676 "pla",
677 )
678 topics_admin = ("admin",)
679 await asyncio.gather(
680 self.msg.aioread(
681 topics, self.loop, self.kafka_read_callback, from_beginning=True
682 ),
683 self.msg_admin.aioread(
684 topics_admin,
685 self.loop,
686 self.kafka_read_callback,
687 group_id=False,
688 ),
689 )
690
691 except LcmExceptionExit:
692 self.logger.debug("Bye!")
693 break
694 except Exception as e:
695 # if not first_start is the first time after starting. So leave more time and wait
696 # to allow kafka starts
697 if self.consecutive_errors == 8 if not self.first_start else 30:
698 self.logger.error(
699 "Task kafka_read task exit error too many errors. Exception: {}".format(
700 e
701 )
702 )
703 raise
704 self.consecutive_errors += 1
705 self.logger.error(
706 "Task kafka_read retrying after Exception {}".format(e)
707 )
708 wait_time = 2 if not self.first_start else 5
709 await asyncio.sleep(wait_time, loop=self.loop)
710
711 # self.logger.debug("Task kafka_read terminating")
712 self.logger.debug("Task kafka_read exit")
713
714 def start(self):
715
716 # check RO version
717 self.loop.run_until_complete(self.check_RO_version())
718
719 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
720 self.netslice = netslice.NetsliceLcm(
721 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
722 )
723 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
724 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
725 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
726 self.k8scluster = vim_sdn.K8sClusterLcm(
727 self.msg, self.lcm_tasks, self.config, self.loop
728 )
729 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
730 self.k8srepo = vim_sdn.K8sRepoLcm(
731 self.msg, self.lcm_tasks, self.config, self.loop
732 )
733
734 self.loop.run_until_complete(
735 asyncio.gather(self.kafka_read(), self.kafka_ping())
736 )
737
738 # TODO
739 # self.logger.debug("Terminating cancelling creation tasks")
740 # self.lcm_tasks.cancel("ALL", "create")
741 # timeout = 200
742 # while self.is_pending_tasks():
743 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
744 # await asyncio.sleep(2, loop=self.loop)
745 # timeout -= 2
746 # if not timeout:
747 # self.lcm_tasks.cancel("ALL", "ALL")
748 self.loop.close()
749 self.loop = None
750 if self.db:
751 self.db.db_disconnect()
752 if self.msg:
753 self.msg.disconnect()
754 if self.msg_admin:
755 self.msg_admin.disconnect()
756 if self.fs:
757 self.fs.fs_disconnect()
758
759 def read_config_file(self, config_file):
760 # TODO make a [ini] + yaml inside parser
761 # the configparser library is not suitable, because it does not admit comments at the end of line,
762 # and not parse integer or boolean
763 try:
764 # read file as yaml format
765 with open(config_file) as f:
766 conf = yaml.safe_load(f)
767 # Ensure all sections are not empty
768 for k in (
769 "global",
770 "timeout",
771 "RO",
772 "VCA",
773 "database",
774 "storage",
775 "message",
776 ):
777 if not conf.get(k):
778 conf[k] = {}
779
780 # read all environ that starts with OSMLCM_
781 for k, v in environ.items():
782 if not k.startswith("OSMLCM_"):
783 continue
784 subject, _, item = k[7:].lower().partition("_")
785 if not item:
786 continue
787 if subject in ("ro", "vca"):
788 # put in capital letter
789 subject = subject.upper()
790 try:
791 if item == "port" or subject == "timeout":
792 conf[subject][item] = int(v)
793 else:
794 conf[subject][item] = v
795 except Exception as e:
796 self.logger.warning(
797 "skipping environ '{}' on exception '{}'".format(k, e)
798 )
799
800 # backward compatibility of VCA parameters
801
802 if "pubkey" in conf["VCA"]:
803 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
804 if "cacert" in conf["VCA"]:
805 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
806 if "apiproxy" in conf["VCA"]:
807 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
808
809 if "enableosupgrade" in conf["VCA"]:
810 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
811 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
812 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
813 conf["VCA"]["enable_os_upgrade"] = False
814 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
815 conf["VCA"]["enable_os_upgrade"] = True
816
817 if "aptmirror" in conf["VCA"]:
818 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
819
820 return conf
821 except Exception as e:
822 self.logger.critical("At config file '{}': {}".format(config_file, e))
823 exit(1)
824
825 @staticmethod
826 def get_process_id():
827 """
828 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
829 will provide a random one
830 :return: Obtained ID
831 """
832 # Try getting docker id. If fails, get pid
833 try:
834 with open("/proc/self/cgroup", "r") as f:
835 text_id_ = f.readline()
836 _, _, text_id = text_id_.rpartition("/")
837 text_id = text_id.replace("\n", "")[:12]
838 if text_id:
839 return text_id
840 except Exception:
841 pass
842 # Return a random id
843 return "".join(random_choice("0123456789abcdef") for _ in range(12))
844
845
846 def usage():
847 print(
848 """Usage: {} [options]
849 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
850 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
851 -h|--help: shows this help
852 """.format(
853 sys.argv[0]
854 )
855 )
856 # --log-socket-host HOST: send logs to this host")
857 # --log-socket-port PORT: send logs using this port (default: 9022)")
858
859
860 if __name__ == "__main__":
861
862 try:
863 # print("SYS.PATH='{}'".format(sys.path))
864 # load parameters and configuration
865 # -h
866 # -c value
867 # --config value
868 # --help
869 # --health-check
870 opts, args = getopt.getopt(
871 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
872 )
873 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
874 config_file = None
875 for o, a in opts:
876 if o in ("-h", "--help"):
877 usage()
878 sys.exit()
879 elif o in ("-c", "--config"):
880 config_file = a
881 elif o == "--health-check":
882 from osm_lcm.lcm_hc import health_check
883
884 health_check(config_file, Lcm.ping_interval_pace)
885 # elif o == "--log-socket-port":
886 # log_socket_port = a
887 # elif o == "--log-socket-host":
888 # log_socket_host = a
889 # elif o == "--log-file":
890 # log_file = a
891 else:
892 assert False, "Unhandled option"
893
894 if config_file:
895 if not path.isfile(config_file):
896 print(
897 "configuration file '{}' does not exist".format(config_file),
898 file=sys.stderr,
899 )
900 exit(1)
901 else:
902 for config_file in (
903 __file__[: __file__.rfind(".")] + ".cfg",
904 "./lcm.cfg",
905 "/etc/osm/lcm.cfg",
906 ):
907 if path.isfile(config_file):
908 break
909 else:
910 print(
911 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
912 file=sys.stderr,
913 )
914 exit(1)
915 lcm = Lcm(config_file)
916 lcm.start()
917 except (LcmException, getopt.GetoptError) as e:
918 print(str(e), file=sys.stderr)
919 # usage()
920 exit(1)