Bug 2175: K8s-Cluster Update is not implemented
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from osm_lcm.lcm_hc import get_health_check_file
48 from os import environ, path
49 from random import choice as random_choice
50 from n2vc import version as n2vc_version
51 import traceback
52
53 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
54 pdb.set_trace()
55
56
57 __author__ = "Alfonso Tierno"
58 min_RO_version = "6.0.2"
59 min_n2vc_version = "0.0.2"
60
61 min_common_version = "0.1.19"
62
63
64 class Lcm:
65
66 ping_interval_pace = (
67 120 # how many time ping is send once is confirmed all is running
68 )
69 ping_interval_boot = 5 # how many time ping is sent when booting
70 cfg_logger_name = {
71 "message": "lcm.msg",
72 "database": "lcm.db",
73 "storage": "lcm.fs",
74 "tsdb": "lcm.prometheus",
75 }
76 # ^ contains for each section at lcm.cfg the used logger name
77
78 def __init__(self, config_file, loop=None):
79 """
80 Init, Connect to database, filesystem storage, and messaging
81 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
82 :return: None
83 """
84 self.db = None
85 self.msg = None
86 self.msg_admin = None
87 self.fs = None
88 self.pings_not_received = 1
89 self.consecutive_errors = 0
90 self.first_start = False
91
92 # logging
93 self.logger = logging.getLogger("lcm")
94 # get id
95 self.worker_id = self.get_process_id()
96 # load configuration
97 config = self.read_config_file(config_file)
98 self.config = config
99 self.health_check_file = get_health_check_file(self.config)
100 self.config["ro_config"] = {
101 "ng": config["RO"].get("ng", False),
102 "uri": config["RO"].get("uri"),
103 "tenant": config.get("tenant", "osm"),
104 "logger_name": "lcm.roclient",
105 "loglevel": config["RO"].get("loglevel", "ERROR"),
106 }
107 if not self.config["ro_config"]["uri"]:
108 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
109 config["RO"]["host"], config["RO"]["port"]
110 )
111 elif (
112 "/ro" in self.config["ro_config"]["uri"][-4:]
113 or "/openmano" in self.config["ro_config"]["uri"][-10:]
114 ):
115 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
116 index = self.config["ro_config"]["uri"][-1].rfind("/")
117 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
118
119 self.loop = loop or asyncio.get_event_loop()
120 self.ns = (
121 self.netslice
122 ) = (
123 self.vim
124 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
125
126 # logging
127 log_format_simple = (
128 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
129 )
130 log_formatter_simple = logging.Formatter(
131 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
132 )
133 config["database"]["logger_name"] = "lcm.db"
134 config["storage"]["logger_name"] = "lcm.fs"
135 config["message"]["logger_name"] = "lcm.msg"
136 if config["global"].get("logfile"):
137 file_handler = logging.handlers.RotatingFileHandler(
138 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
139 )
140 file_handler.setFormatter(log_formatter_simple)
141 self.logger.addHandler(file_handler)
142 if not config["global"].get("nologging"):
143 str_handler = logging.StreamHandler()
144 str_handler.setFormatter(log_formatter_simple)
145 self.logger.addHandler(str_handler)
146
147 if config["global"].get("loglevel"):
148 self.logger.setLevel(config["global"]["loglevel"])
149
150 # logging other modules
151 for k1, logname in self.cfg_logger_name.items():
152 config[k1]["logger_name"] = logname
153 logger_module = logging.getLogger(logname)
154 if config[k1].get("logfile"):
155 file_handler = logging.handlers.RotatingFileHandler(
156 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
157 )
158 file_handler.setFormatter(log_formatter_simple)
159 logger_module.addHandler(file_handler)
160 if config[k1].get("loglevel"):
161 logger_module.setLevel(config[k1]["loglevel"])
162 self.logger.critical(
163 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
164 )
165
166 # check version of N2VC
167 # TODO enhance with int conversion or from distutils.version import LooseVersion
168 # or with list(map(int, version.split(".")))
169 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
170 raise LcmException(
171 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
172 n2vc_version, min_n2vc_version
173 )
174 )
175 # check version of common
176 if versiontuple(common_version) < versiontuple(min_common_version):
177 raise LcmException(
178 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
179 common_version, min_common_version
180 )
181 )
182
183 try:
184 self.db = Database(config).instance.db
185
186 self.fs = Filesystem(config).instance.fs
187 self.fs.sync()
188
189 # copy message configuration in order to remove 'group_id' for msg_admin
190 config_message = config["message"].copy()
191 config_message["loop"] = self.loop
192 if config_message["driver"] == "local":
193 self.msg = msglocal.MsgLocal()
194 self.msg.connect(config_message)
195 self.msg_admin = msglocal.MsgLocal()
196 config_message.pop("group_id", None)
197 self.msg_admin.connect(config_message)
198 elif config_message["driver"] == "kafka":
199 self.msg = msgkafka.MsgKafka()
200 self.msg.connect(config_message)
201 self.msg_admin = msgkafka.MsgKafka()
202 config_message.pop("group_id", None)
203 self.msg_admin.connect(config_message)
204 else:
205 raise LcmException(
206 "Invalid configuration param '{}' at '[message]':'driver'".format(
207 config["message"]["driver"]
208 )
209 )
210 except (DbException, FsException, MsgException) as e:
211 self.logger.critical(str(e), exc_info=True)
212 raise LcmException(str(e))
213
214 # contains created tasks/futures to be able to cancel
215 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
216
217 async def check_RO_version(self):
218 tries = 14
219 last_error = None
220 while True:
221 ro_uri = self.config["ro_config"]["uri"]
222 try:
223 # try new RO, if fail old RO
224 try:
225 self.config["ro_config"]["uri"] = ro_uri + "ro"
226 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
227 ro_version = await ro_server.get_version()
228 self.config["ro_config"]["ng"] = True
229 except Exception:
230 self.config["ro_config"]["uri"] = ro_uri + "openmano"
231 ro_server = ROClient(self.loop, **self.config["ro_config"])
232 ro_version = await ro_server.get_version()
233 self.config["ro_config"]["ng"] = False
234 if versiontuple(ro_version) < versiontuple(min_RO_version):
235 raise LcmException(
236 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
237 ro_version, min_RO_version
238 )
239 )
240 self.logger.info(
241 "Connected to RO version {} new-generation version {}".format(
242 ro_version, self.config["ro_config"]["ng"]
243 )
244 )
245 return
246 except (ROClientException, NgRoException) as e:
247 self.config["ro_config"]["uri"] = ro_uri
248 tries -= 1
249 traceback.print_tb(e.__traceback__)
250 error_text = "Error while connecting to RO on {}: {}".format(
251 self.config["ro_config"]["uri"], e
252 )
253 if tries <= 0:
254 self.logger.critical(error_text)
255 raise LcmException(error_text)
256 if last_error != error_text:
257 last_error = error_text
258 self.logger.error(
259 error_text + ". Waiting until {} seconds".format(5 * tries)
260 )
261 await asyncio.sleep(5)
262
263 async def test(self, param=None):
264 self.logger.debug("Starting/Ending test task: {}".format(param))
265
266 async def kafka_ping(self):
267 self.logger.debug("Task kafka_ping Enter")
268 consecutive_errors = 0
269 first_start = True
270 kafka_has_received = False
271 self.pings_not_received = 1
272 while True:
273 try:
274 await self.msg_admin.aiowrite(
275 "admin",
276 "ping",
277 {
278 "from": "lcm",
279 "to": "lcm",
280 "worker_id": self.worker_id,
281 "version": lcm_version,
282 },
283 self.loop,
284 )
285 # time between pings are low when it is not received and at starting
286 wait_time = (
287 self.ping_interval_boot
288 if not kafka_has_received
289 else self.ping_interval_pace
290 )
291 if not self.pings_not_received:
292 kafka_has_received = True
293 self.pings_not_received += 1
294 await asyncio.sleep(wait_time, loop=self.loop)
295 if self.pings_not_received > 10:
296 raise LcmException("It is not receiving pings from Kafka bus")
297 consecutive_errors = 0
298 first_start = False
299 except LcmException:
300 raise
301 except Exception as e:
302 # if not first_start is the first time after starting. So leave more time and wait
303 # to allow kafka starts
304 if consecutive_errors == 8 if not first_start else 30:
305 self.logger.error(
306 "Task kafka_read task exit error too many errors. Exception: {}".format(
307 e
308 )
309 )
310 raise
311 consecutive_errors += 1
312 self.logger.error(
313 "Task kafka_read retrying after Exception {}".format(e)
314 )
315 wait_time = 2 if not first_start else 5
316 await asyncio.sleep(wait_time, loop=self.loop)
317
318 def kafka_read_callback(self, topic, command, params):
319 order_id = 1
320
321 if topic != "admin" and command != "ping":
322 self.logger.debug(
323 "Task kafka_read receives {} {}: {}".format(topic, command, params)
324 )
325 self.consecutive_errors = 0
326 self.first_start = False
327 order_id += 1
328 if command == "exit":
329 raise LcmExceptionExit
330 elif command.startswith("#"):
331 return
332 elif command == "echo":
333 # just for test
334 print(params)
335 sys.stdout.flush()
336 return
337 elif command == "test":
338 asyncio.Task(self.test(params), loop=self.loop)
339 return
340
341 if topic == "admin":
342 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
343 if params.get("worker_id") != self.worker_id:
344 return
345 self.pings_not_received = 0
346 try:
347 with open(self.health_check_file, "w") as f:
348 f.write(str(time()))
349 except Exception as e:
350 self.logger.error(
351 "Cannot write into '{}' for healthcheck: {}".format(
352 self.health_check_file, e
353 )
354 )
355 return
356 elif topic == "pla":
357 if command == "placement":
358 self.ns.update_nsrs_with_pla_result(params)
359 return
360 elif topic == "k8scluster":
361 if command == "create" or command == "created":
362 k8scluster_id = params.get("_id")
363 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
364 self.lcm_tasks.register(
365 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
366 )
367 return
368 elif command == "edit" or command == "edited":
369 k8scluster_id = params.get("_id")
370 task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
371 self.lcm_tasks.register(
372 "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
373 )
374 return
375 elif command == "delete" or command == "deleted":
376 k8scluster_id = params.get("_id")
377 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
378 self.lcm_tasks.register(
379 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
380 )
381 return
382 elif topic == "vca":
383 if command == "create" or command == "created":
384 vca_id = params.get("_id")
385 task = asyncio.ensure_future(self.vca.create(params, order_id))
386 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
387 return
388 elif command == "delete" or command == "deleted":
389 vca_id = params.get("_id")
390 task = asyncio.ensure_future(self.vca.delete(params, order_id))
391 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
392 return
393 elif topic == "k8srepo":
394 if command == "create" or command == "created":
395 k8srepo_id = params.get("_id")
396 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
397 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
398 self.lcm_tasks.register(
399 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
400 )
401 return
402 elif command == "delete" or command == "deleted":
403 k8srepo_id = params.get("_id")
404 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
405 self.lcm_tasks.register(
406 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
407 )
408 return
409 elif topic == "ns":
410 if command == "instantiate":
411 # self.logger.debug("Deploying NS {}".format(nsr_id))
412 nslcmop = params
413 nslcmop_id = nslcmop["_id"]
414 nsr_id = nslcmop["nsInstanceId"]
415 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
416 self.lcm_tasks.register(
417 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
418 )
419 return
420 elif command == "terminate":
421 # self.logger.debug("Deleting NS {}".format(nsr_id))
422 nslcmop = params
423 nslcmop_id = nslcmop["_id"]
424 nsr_id = nslcmop["nsInstanceId"]
425 self.lcm_tasks.cancel(topic, nsr_id)
426 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
427 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
428 return
429 elif command == "vca_status_refresh":
430 nslcmop = params
431 nslcmop_id = nslcmop["_id"]
432 nsr_id = nslcmop["nsInstanceId"]
433 task = asyncio.ensure_future(
434 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
435 )
436 self.lcm_tasks.register(
437 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
438 )
439 return
440 elif command == "action":
441 # self.logger.debug("Update NS {}".format(nsr_id))
442 nslcmop = params
443 nslcmop_id = nslcmop["_id"]
444 nsr_id = nslcmop["nsInstanceId"]
445 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
446 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
447 return
448 elif command == "update":
449 # self.logger.debug("Update NS {}".format(nsr_id))
450 nslcmop = params
451 nslcmop_id = nslcmop["_id"]
452 nsr_id = nslcmop["nsInstanceId"]
453 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
454 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
455 return
456 elif command == "scale":
457 # self.logger.debug("Update NS {}".format(nsr_id))
458 nslcmop = params
459 nslcmop_id = nslcmop["_id"]
460 nsr_id = nslcmop["nsInstanceId"]
461 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
462 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
463 return
464 elif command == "heal":
465 # self.logger.debug("Healing NS {}".format(nsr_id))
466 nslcmop = params
467 nslcmop_id = nslcmop["_id"]
468 nsr_id = nslcmop["nsInstanceId"]
469 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
470 self.lcm_tasks.register(
471 "ns", nsr_id, nslcmop_id, "ns_heal", task
472 )
473 return
474 elif command == "migrate":
475 nslcmop = params
476 nslcmop_id = nslcmop["_id"]
477 nsr_id = nslcmop["nsInstanceId"]
478 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
479 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
480 return
481 elif command == "verticalscale":
482 nslcmop = params
483 nslcmop_id = nslcmop["_id"]
484 nsr_id = nslcmop["nsInstanceId"]
485 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
486 self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task))
487 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task)
488 self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task))
489 return
490 elif command == "show":
491 nsr_id = params
492 try:
493 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
494 print(
495 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
496 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
497 "".format(
498 nsr_id,
499 db_nsr["operational-status"],
500 db_nsr["config-status"],
501 db_nsr["detailed-status"],
502 db_nsr["_admin"]["deployed"],
503 self.lcm_ns_tasks.get(nsr_id),
504 )
505 )
506 except Exception as e:
507 print("nsr {} not found: {}".format(nsr_id, e))
508 sys.stdout.flush()
509 return
510 elif command == "deleted":
511 return # TODO cleaning of task just in case should be done
512 elif command in (
513 "vnf_terminated",
514 "policy_updated",
515 "terminated",
516 "instantiated",
517 "scaled",
518 "healed",
519 "actioned",
520 "updated",
521 "migrated",
522 "verticalscaled",
523 ): # "scaled-cooldown-time"
524 return
525
526 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
527 if command == "instantiate":
528 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
529 nsilcmop = params
530 nsilcmop_id = nsilcmop["_id"] # slice operation id
531 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
532 task = asyncio.ensure_future(
533 self.netslice.instantiate(nsir_id, nsilcmop_id)
534 )
535 self.lcm_tasks.register(
536 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
537 )
538 return
539 elif command == "terminate":
540 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
541 nsilcmop = params
542 nsilcmop_id = nsilcmop["_id"] # slice operation id
543 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
544 self.lcm_tasks.cancel(topic, nsir_id)
545 task = asyncio.ensure_future(
546 self.netslice.terminate(nsir_id, nsilcmop_id)
547 )
548 self.lcm_tasks.register(
549 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
550 )
551 return
552 elif command == "show":
553 nsir_id = params
554 try:
555 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
556 print(
557 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
558 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
559 "".format(
560 nsir_id,
561 db_nsir["operational-status"],
562 db_nsir["config-status"],
563 db_nsir["detailed-status"],
564 db_nsir["_admin"]["deployed"],
565 self.lcm_netslice_tasks.get(nsir_id),
566 )
567 )
568 except Exception as e:
569 print("nsir {} not found: {}".format(nsir_id, e))
570 sys.stdout.flush()
571 return
572 elif command == "deleted":
573 return # TODO cleaning of task just in case should be done
574 elif command in (
575 "terminated",
576 "instantiated",
577 "scaled",
578 "healed",
579 "actioned",
580 ): # "scaled-cooldown-time"
581 return
582 elif topic == "vim_account":
583 vim_id = params["_id"]
584 if command in ("create", "created"):
585 if not self.config["ro_config"].get("ng"):
586 task = asyncio.ensure_future(self.vim.create(params, order_id))
587 self.lcm_tasks.register(
588 "vim_account", vim_id, order_id, "vim_create", task
589 )
590 return
591 elif command == "delete" or command == "deleted":
592 self.lcm_tasks.cancel(topic, vim_id)
593 task = asyncio.ensure_future(self.vim.delete(params, order_id))
594 self.lcm_tasks.register(
595 "vim_account", vim_id, order_id, "vim_delete", task
596 )
597 return
598 elif command == "show":
599 print("not implemented show with vim_account")
600 sys.stdout.flush()
601 return
602 elif command in ("edit", "edited"):
603 if not self.config["ro_config"].get("ng"):
604 task = asyncio.ensure_future(self.vim.edit(params, order_id))
605 self.lcm_tasks.register(
606 "vim_account", vim_id, order_id, "vim_edit", task
607 )
608 return
609 elif command == "deleted":
610 return # TODO cleaning of task just in case should be done
611 elif topic == "wim_account":
612 wim_id = params["_id"]
613 if command in ("create", "created"):
614 if not self.config["ro_config"].get("ng"):
615 task = asyncio.ensure_future(self.wim.create(params, order_id))
616 self.lcm_tasks.register(
617 "wim_account", wim_id, order_id, "wim_create", task
618 )
619 return
620 elif command == "delete" or command == "deleted":
621 self.lcm_tasks.cancel(topic, wim_id)
622 task = asyncio.ensure_future(self.wim.delete(params, order_id))
623 self.lcm_tasks.register(
624 "wim_account", wim_id, order_id, "wim_delete", task
625 )
626 return
627 elif command == "show":
628 print("not implemented show with wim_account")
629 sys.stdout.flush()
630 return
631 elif command in ("edit", "edited"):
632 task = asyncio.ensure_future(self.wim.edit(params, order_id))
633 self.lcm_tasks.register(
634 "wim_account", wim_id, order_id, "wim_edit", task
635 )
636 return
637 elif command == "deleted":
638 return # TODO cleaning of task just in case should be done
639 elif topic == "sdn":
640 _sdn_id = params["_id"]
641 if command in ("create", "created"):
642 if not self.config["ro_config"].get("ng"):
643 task = asyncio.ensure_future(self.sdn.create(params, order_id))
644 self.lcm_tasks.register(
645 "sdn", _sdn_id, order_id, "sdn_create", task
646 )
647 return
648 elif command == "delete" or command == "deleted":
649 self.lcm_tasks.cancel(topic, _sdn_id)
650 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
651 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
652 return
653 elif command in ("edit", "edited"):
654 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
655 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
656 return
657 elif command == "deleted":
658 return # TODO cleaning of task just in case should be done
659 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
660
661 async def kafka_read(self):
662 self.logger.debug(
663 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
664 )
665 # future = asyncio.Future()
666 self.consecutive_errors = 0
667 self.first_start = True
668 while self.consecutive_errors < 10:
669 try:
670 topics = (
671 "ns",
672 "vim_account",
673 "wim_account",
674 "sdn",
675 "nsi",
676 "k8scluster",
677 "vca",
678 "k8srepo",
679 "pla",
680 )
681 topics_admin = ("admin",)
682 await asyncio.gather(
683 self.msg.aioread(
684 topics, self.loop, self.kafka_read_callback, from_beginning=True
685 ),
686 self.msg_admin.aioread(
687 topics_admin,
688 self.loop,
689 self.kafka_read_callback,
690 group_id=False,
691 ),
692 )
693
694 except LcmExceptionExit:
695 self.logger.debug("Bye!")
696 break
697 except Exception as e:
698 # if not first_start is the first time after starting. So leave more time and wait
699 # to allow kafka starts
700 if self.consecutive_errors == 8 if not self.first_start else 30:
701 self.logger.error(
702 "Task kafka_read task exit error too many errors. Exception: {}".format(
703 e
704 )
705 )
706 raise
707 self.consecutive_errors += 1
708 self.logger.error(
709 "Task kafka_read retrying after Exception {}".format(e)
710 )
711 wait_time = 2 if not self.first_start else 5
712 await asyncio.sleep(wait_time, loop=self.loop)
713
714 # self.logger.debug("Task kafka_read terminating")
715 self.logger.debug("Task kafka_read exit")
716
717 def start(self):
718
719 # check RO version
720 self.loop.run_until_complete(self.check_RO_version())
721
722 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
723 self.netslice = netslice.NetsliceLcm(
724 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
725 )
726 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
727 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
728 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
729 self.k8scluster = vim_sdn.K8sClusterLcm(
730 self.msg, self.lcm_tasks, self.config, self.loop
731 )
732 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
733 self.k8srepo = vim_sdn.K8sRepoLcm(
734 self.msg, self.lcm_tasks, self.config, self.loop
735 )
736
737 self.loop.run_until_complete(
738 asyncio.gather(self.kafka_read(), self.kafka_ping())
739 )
740
741 # TODO
742 # self.logger.debug("Terminating cancelling creation tasks")
743 # self.lcm_tasks.cancel("ALL", "create")
744 # timeout = 200
745 # while self.is_pending_tasks():
746 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
747 # await asyncio.sleep(2, loop=self.loop)
748 # timeout -= 2
749 # if not timeout:
750 # self.lcm_tasks.cancel("ALL", "ALL")
751 self.loop.close()
752 self.loop = None
753 if self.db:
754 self.db.db_disconnect()
755 if self.msg:
756 self.msg.disconnect()
757 if self.msg_admin:
758 self.msg_admin.disconnect()
759 if self.fs:
760 self.fs.fs_disconnect()
761
762 def read_config_file(self, config_file):
763 # TODO make a [ini] + yaml inside parser
764 # the configparser library is not suitable, because it does not admit comments at the end of line,
765 # and not parse integer or boolean
766 try:
767 # read file as yaml format
768 with open(config_file) as f:
769 conf = yaml.safe_load(f)
770 # Ensure all sections are not empty
771 for k in (
772 "global",
773 "timeout",
774 "RO",
775 "VCA",
776 "database",
777 "storage",
778 "message",
779 ):
780 if not conf.get(k):
781 conf[k] = {}
782
783 # read all environ that starts with OSMLCM_
784 for k, v in environ.items():
785 if not k.startswith("OSMLCM_"):
786 continue
787 subject, _, item = k[7:].lower().partition("_")
788 if not item:
789 continue
790 if subject in ("ro", "vca"):
791 # put in capital letter
792 subject = subject.upper()
793 try:
794 if item == "port" or subject == "timeout":
795 conf[subject][item] = int(v)
796 else:
797 conf[subject][item] = v
798 except Exception as e:
799 self.logger.warning(
800 "skipping environ '{}' on exception '{}'".format(k, e)
801 )
802
803 # backward compatibility of VCA parameters
804
805 if "pubkey" in conf["VCA"]:
806 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
807 if "cacert" in conf["VCA"]:
808 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
809 if "apiproxy" in conf["VCA"]:
810 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
811
812 if "enableosupgrade" in conf["VCA"]:
813 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
814 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
815 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
816 conf["VCA"]["enable_os_upgrade"] = False
817 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
818 conf["VCA"]["enable_os_upgrade"] = True
819
820 if "aptmirror" in conf["VCA"]:
821 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
822
823 return conf
824 except Exception as e:
825 self.logger.critical("At config file '{}': {}".format(config_file, e))
826 exit(1)
827
828 @staticmethod
829 def get_process_id():
830 """
831 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
832 will provide a random one
833 :return: Obtained ID
834 """
835 # Try getting docker id. If fails, get pid
836 try:
837 with open("/proc/self/cgroup", "r") as f:
838 text_id_ = f.readline()
839 _, _, text_id = text_id_.rpartition("/")
840 text_id = text_id.replace("\n", "")[:12]
841 if text_id:
842 return text_id
843 except Exception:
844 pass
845 # Return a random id
846 return "".join(random_choice("0123456789abcdef") for _ in range(12))
847
848
849 def usage():
850 print(
851 """Usage: {} [options]
852 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
853 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
854 -h|--help: shows this help
855 """.format(
856 sys.argv[0]
857 )
858 )
859 # --log-socket-host HOST: send logs to this host")
860 # --log-socket-port PORT: send logs using this port (default: 9022)")
861
862
863 if __name__ == "__main__":
864
865 try:
866 # print("SYS.PATH='{}'".format(sys.path))
867 # load parameters and configuration
868 # -h
869 # -c value
870 # --config value
871 # --help
872 # --health-check
873 opts, args = getopt.getopt(
874 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
875 )
876 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
877 config_file = None
878 for o, a in opts:
879 if o in ("-h", "--help"):
880 usage()
881 sys.exit()
882 elif o in ("-c", "--config"):
883 config_file = a
884 elif o == "--health-check":
885 from osm_lcm.lcm_hc import health_check
886
887 health_check(config_file, Lcm.ping_interval_pace)
888 # elif o == "--log-socket-port":
889 # log_socket_port = a
890 # elif o == "--log-socket-host":
891 # log_socket_host = a
892 # elif o == "--log-file":
893 # log_file = a
894 else:
895 assert False, "Unhandled option"
896
897 if config_file:
898 if not path.isfile(config_file):
899 print(
900 "configuration file '{}' does not exist".format(config_file),
901 file=sys.stderr,
902 )
903 exit(1)
904 else:
905 for config_file in (
906 __file__[: __file__.rfind(".")] + ".cfg",
907 "./lcm.cfg",
908 "/etc/osm/lcm.cfg",
909 ):
910 if path.isfile(config_file):
911 break
912 else:
913 print(
914 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
915 file=sys.stderr,
916 )
917 exit(1)
918 lcm = Lcm(config_file)
919 lcm.start()
920 except (LcmException, getopt.GetoptError) as e:
921 print(str(e), file=sys.stderr)
922 # usage()
923 exit(1)