Feature 10911-Vertical scaling of VM instances from OSM
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = (
62 path.expanduser("~") + "/time_last_ping"
63 ) # TODO find better location for this file
64
65
66 class Lcm:
67
68 ping_interval_pace = (
69 120 # how many time ping is send once is confirmed all is running
70 )
71 ping_interval_boot = 5 # how many time ping is sent when booting
72 cfg_logger_name = {
73 "message": "lcm.msg",
74 "database": "lcm.db",
75 "storage": "lcm.fs",
76 "tsdb": "lcm.prometheus",
77 }
78 # ^ contains for each section at lcm.cfg the used logger name
79
80 def __init__(self, config_file, loop=None):
81 """
82 Init, Connect to database, filesystem storage, and messaging
83 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 :return: None
85 """
86 self.db = None
87 self.msg = None
88 self.msg_admin = None
89 self.fs = None
90 self.pings_not_received = 1
91 self.consecutive_errors = 0
92 self.first_start = False
93
94 # logging
95 self.logger = logging.getLogger("lcm")
96 # get id
97 self.worker_id = self.get_process_id()
98 # load configuration
99 config = self.read_config_file(config_file)
100 self.config = config
101 self.config["ro_config"] = {
102 "ng": config["RO"].get("ng", False),
103 "uri": config["RO"].get("uri"),
104 "tenant": config.get("tenant", "osm"),
105 "logger_name": "lcm.roclient",
106 "loglevel": config["RO"].get("loglevel", "ERROR"),
107 }
108 if not self.config["ro_config"]["uri"]:
109 self.config["ro_config"]["uri"] = "http://{}:{}/".format(
110 config["RO"]["host"], config["RO"]["port"]
111 )
112 elif (
113 "/ro" in self.config["ro_config"]["uri"][-4:]
114 or "/openmano" in self.config["ro_config"]["uri"][-10:]
115 ):
116 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
117 index = self.config["ro_config"]["uri"][-1].rfind("/")
118 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index + 1]
119
120 self.loop = loop or asyncio.get_event_loop()
121 self.ns = (
122 self.netslice
123 ) = (
124 self.vim
125 ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
126
127 # logging
128 log_format_simple = (
129 "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
130 )
131 log_formatter_simple = logging.Formatter(
132 log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
133 )
134 config["database"]["logger_name"] = "lcm.db"
135 config["storage"]["logger_name"] = "lcm.fs"
136 config["message"]["logger_name"] = "lcm.msg"
137 if config["global"].get("logfile"):
138 file_handler = logging.handlers.RotatingFileHandler(
139 config["global"]["logfile"], maxBytes=100e6, backupCount=9, delay=0
140 )
141 file_handler.setFormatter(log_formatter_simple)
142 self.logger.addHandler(file_handler)
143 if not config["global"].get("nologging"):
144 str_handler = logging.StreamHandler()
145 str_handler.setFormatter(log_formatter_simple)
146 self.logger.addHandler(str_handler)
147
148 if config["global"].get("loglevel"):
149 self.logger.setLevel(config["global"]["loglevel"])
150
151 # logging other modules
152 for k1, logname in self.cfg_logger_name.items():
153 config[k1]["logger_name"] = logname
154 logger_module = logging.getLogger(logname)
155 if config[k1].get("logfile"):
156 file_handler = logging.handlers.RotatingFileHandler(
157 config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
158 )
159 file_handler.setFormatter(log_formatter_simple)
160 logger_module.addHandler(file_handler)
161 if config[k1].get("loglevel"):
162 logger_module.setLevel(config[k1]["loglevel"])
163 self.logger.critical(
164 "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
165 )
166
167 # check version of N2VC
168 # TODO enhance with int conversion or from distutils.version import LooseVersion
169 # or with list(map(int, version.split(".")))
170 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
171 raise LcmException(
172 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
173 n2vc_version, min_n2vc_version
174 )
175 )
176 # check version of common
177 if versiontuple(common_version) < versiontuple(min_common_version):
178 raise LcmException(
179 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
180 common_version, min_common_version
181 )
182 )
183
184 try:
185 self.db = Database(config).instance.db
186
187 self.fs = Filesystem(config).instance.fs
188 self.fs.sync()
189
190 # copy message configuration in order to remove 'group_id' for msg_admin
191 config_message = config["message"].copy()
192 config_message["loop"] = self.loop
193 if config_message["driver"] == "local":
194 self.msg = msglocal.MsgLocal()
195 self.msg.connect(config_message)
196 self.msg_admin = msglocal.MsgLocal()
197 config_message.pop("group_id", None)
198 self.msg_admin.connect(config_message)
199 elif config_message["driver"] == "kafka":
200 self.msg = msgkafka.MsgKafka()
201 self.msg.connect(config_message)
202 self.msg_admin = msgkafka.MsgKafka()
203 config_message.pop("group_id", None)
204 self.msg_admin.connect(config_message)
205 else:
206 raise LcmException(
207 "Invalid configuration param '{}' at '[message]':'driver'".format(
208 config["message"]["driver"]
209 )
210 )
211 except (DbException, FsException, MsgException) as e:
212 self.logger.critical(str(e), exc_info=True)
213 raise LcmException(str(e))
214
215 # contains created tasks/futures to be able to cancel
216 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
217
218 async def check_RO_version(self):
219 tries = 14
220 last_error = None
221 while True:
222 ro_uri = self.config["ro_config"]["uri"]
223 try:
224 # try new RO, if fail old RO
225 try:
226 self.config["ro_config"]["uri"] = ro_uri + "ro"
227 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
228 ro_version = await ro_server.get_version()
229 self.config["ro_config"]["ng"] = True
230 except Exception:
231 self.config["ro_config"]["uri"] = ro_uri + "openmano"
232 ro_server = ROClient(self.loop, **self.config["ro_config"])
233 ro_version = await ro_server.get_version()
234 self.config["ro_config"]["ng"] = False
235 if versiontuple(ro_version) < versiontuple(min_RO_version):
236 raise LcmException(
237 "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
238 ro_version, min_RO_version
239 )
240 )
241 self.logger.info(
242 "Connected to RO version {} new-generation version {}".format(
243 ro_version, self.config["ro_config"]["ng"]
244 )
245 )
246 return
247 except (ROClientException, NgRoException) as e:
248 self.config["ro_config"]["uri"] = ro_uri
249 tries -= 1
250 traceback.print_tb(e.__traceback__)
251 error_text = "Error while connecting to RO on {}: {}".format(
252 self.config["ro_config"]["uri"], e
253 )
254 if tries <= 0:
255 self.logger.critical(error_text)
256 raise LcmException(error_text)
257 if last_error != error_text:
258 last_error = error_text
259 self.logger.error(
260 error_text + ". Waiting until {} seconds".format(5 * tries)
261 )
262 await asyncio.sleep(5)
263
264 async def test(self, param=None):
265 self.logger.debug("Starting/Ending test task: {}".format(param))
266
267 async def kafka_ping(self):
268 self.logger.debug("Task kafka_ping Enter")
269 consecutive_errors = 0
270 first_start = True
271 kafka_has_received = False
272 self.pings_not_received = 1
273 while True:
274 try:
275 await self.msg_admin.aiowrite(
276 "admin",
277 "ping",
278 {
279 "from": "lcm",
280 "to": "lcm",
281 "worker_id": self.worker_id,
282 "version": lcm_version,
283 },
284 self.loop,
285 )
286 # time between pings are low when it is not received and at starting
287 wait_time = (
288 self.ping_interval_boot
289 if not kafka_has_received
290 else self.ping_interval_pace
291 )
292 if not self.pings_not_received:
293 kafka_has_received = True
294 self.pings_not_received += 1
295 await asyncio.sleep(wait_time, loop=self.loop)
296 if self.pings_not_received > 10:
297 raise LcmException("It is not receiving pings from Kafka bus")
298 consecutive_errors = 0
299 first_start = False
300 except LcmException:
301 raise
302 except Exception as e:
303 # if not first_start is the first time after starting. So leave more time and wait
304 # to allow kafka starts
305 if consecutive_errors == 8 if not first_start else 30:
306 self.logger.error(
307 "Task kafka_read task exit error too many errors. Exception: {}".format(
308 e
309 )
310 )
311 raise
312 consecutive_errors += 1
313 self.logger.error(
314 "Task kafka_read retrying after Exception {}".format(e)
315 )
316 wait_time = 2 if not first_start else 5
317 await asyncio.sleep(wait_time, loop=self.loop)
318
319 def kafka_read_callback(self, topic, command, params):
320 order_id = 1
321
322 if topic != "admin" and command != "ping":
323 self.logger.debug(
324 "Task kafka_read receives {} {}: {}".format(topic, command, params)
325 )
326 self.consecutive_errors = 0
327 self.first_start = False
328 order_id += 1
329 if command == "exit":
330 raise LcmExceptionExit
331 elif command.startswith("#"):
332 return
333 elif command == "echo":
334 # just for test
335 print(params)
336 sys.stdout.flush()
337 return
338 elif command == "test":
339 asyncio.Task(self.test(params), loop=self.loop)
340 return
341
342 if topic == "admin":
343 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
344 if params.get("worker_id") != self.worker_id:
345 return
346 self.pings_not_received = 0
347 try:
348 with open(health_check_file, "w") as f:
349 f.write(str(time()))
350 except Exception as e:
351 self.logger.error(
352 "Cannot write into '{}' for healthcheck: {}".format(
353 health_check_file, e
354 )
355 )
356 return
357 elif topic == "pla":
358 if command == "placement":
359 self.ns.update_nsrs_with_pla_result(params)
360 return
361 elif topic == "k8scluster":
362 if command == "create" or command == "created":
363 k8scluster_id = params.get("_id")
364 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
365 self.lcm_tasks.register(
366 "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
367 )
368 return
369 elif command == "delete" or command == "deleted":
370 k8scluster_id = params.get("_id")
371 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
372 self.lcm_tasks.register(
373 "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
374 )
375 return
376 elif topic == "vca":
377 if command == "create" or command == "created":
378 vca_id = params.get("_id")
379 task = asyncio.ensure_future(self.vca.create(params, order_id))
380 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
381 return
382 elif command == "delete" or command == "deleted":
383 vca_id = params.get("_id")
384 task = asyncio.ensure_future(self.vca.delete(params, order_id))
385 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
386 return
387 elif topic == "k8srepo":
388 if command == "create" or command == "created":
389 k8srepo_id = params.get("_id")
390 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
391 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
392 self.lcm_tasks.register(
393 "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
394 )
395 return
396 elif command == "delete" or command == "deleted":
397 k8srepo_id = params.get("_id")
398 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
399 self.lcm_tasks.register(
400 "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
401 )
402 return
403 elif topic == "ns":
404 if command == "instantiate":
405 # self.logger.debug("Deploying NS {}".format(nsr_id))
406 nslcmop = params
407 nslcmop_id = nslcmop["_id"]
408 nsr_id = nslcmop["nsInstanceId"]
409 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
410 self.lcm_tasks.register(
411 "ns", nsr_id, nslcmop_id, "ns_instantiate", task
412 )
413 return
414 elif command == "terminate":
415 # self.logger.debug("Deleting NS {}".format(nsr_id))
416 nslcmop = params
417 nslcmop_id = nslcmop["_id"]
418 nsr_id = nslcmop["nsInstanceId"]
419 self.lcm_tasks.cancel(topic, nsr_id)
420 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
421 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
422 return
423 elif command == "vca_status_refresh":
424 nslcmop = params
425 nslcmop_id = nslcmop["_id"]
426 nsr_id = nslcmop["nsInstanceId"]
427 task = asyncio.ensure_future(
428 self.ns.vca_status_refresh(nsr_id, nslcmop_id)
429 )
430 self.lcm_tasks.register(
431 "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
432 )
433 return
434 elif command == "action":
435 # self.logger.debug("Update NS {}".format(nsr_id))
436 nslcmop = params
437 nslcmop_id = nslcmop["_id"]
438 nsr_id = nslcmop["nsInstanceId"]
439 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
440 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
441 return
442 elif command == "update":
443 # self.logger.debug("Update NS {}".format(nsr_id))
444 nslcmop = params
445 nslcmop_id = nslcmop["_id"]
446 nsr_id = nslcmop["nsInstanceId"]
447 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
448 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
449 return
450 elif command == "scale":
451 # self.logger.debug("Update NS {}".format(nsr_id))
452 nslcmop = params
453 nslcmop_id = nslcmop["_id"]
454 nsr_id = nslcmop["nsInstanceId"]
455 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
456 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
457 return
458 elif command == "heal":
459 # self.logger.debug("Healing NS {}".format(nsr_id))
460 nslcmop = params
461 nslcmop_id = nslcmop["_id"]
462 nsr_id = nslcmop["nsInstanceId"]
463 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
464 self.lcm_tasks.register(
465 "ns", nsr_id, nslcmop_id, "ns_heal", task
466 )
467 return
468 elif command == "migrate":
469 nslcmop = params
470 nslcmop_id = nslcmop["_id"]
471 nsr_id = nslcmop["nsInstanceId"]
472 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
473 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
474 return
475 elif command == "verticalscale":
476 nslcmop = params
477 nslcmop_id = nslcmop["_id"]
478 nsr_id = nslcmop["nsInstanceId"]
479 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
480 self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task))
481 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task)
482 self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task))
483 return
484 elif command == "show":
485 nsr_id = params
486 try:
487 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
488 print(
489 "nsr:\n _id={}\n operational-status: {}\n config-status: {}"
490 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
491 "".format(
492 nsr_id,
493 db_nsr["operational-status"],
494 db_nsr["config-status"],
495 db_nsr["detailed-status"],
496 db_nsr["_admin"]["deployed"],
497 self.lcm_ns_tasks.get(nsr_id),
498 )
499 )
500 except Exception as e:
501 print("nsr {} not found: {}".format(nsr_id, e))
502 sys.stdout.flush()
503 return
504 elif command == "deleted":
505 return # TODO cleaning of task just in case should be done
506 elif command in (
507 "vnf_terminated",
508 "policy_updated",
509 "terminated",
510 "instantiated",
511 "scaled",
512 "healed",
513 "actioned",
514 "updated",
515 "migrated",
516 "verticalscaled",
517 ): # "scaled-cooldown-time"
518 return
519
520 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
521 if command == "instantiate":
522 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
523 nsilcmop = params
524 nsilcmop_id = nsilcmop["_id"] # slice operation id
525 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
526 task = asyncio.ensure_future(
527 self.netslice.instantiate(nsir_id, nsilcmop_id)
528 )
529 self.lcm_tasks.register(
530 "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
531 )
532 return
533 elif command == "terminate":
534 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
535 nsilcmop = params
536 nsilcmop_id = nsilcmop["_id"] # slice operation id
537 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
538 self.lcm_tasks.cancel(topic, nsir_id)
539 task = asyncio.ensure_future(
540 self.netslice.terminate(nsir_id, nsilcmop_id)
541 )
542 self.lcm_tasks.register(
543 "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
544 )
545 return
546 elif command == "show":
547 nsir_id = params
548 try:
549 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
550 print(
551 "nsir:\n _id={}\n operational-status: {}\n config-status: {}"
552 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
553 "".format(
554 nsir_id,
555 db_nsir["operational-status"],
556 db_nsir["config-status"],
557 db_nsir["detailed-status"],
558 db_nsir["_admin"]["deployed"],
559 self.lcm_netslice_tasks.get(nsir_id),
560 )
561 )
562 except Exception as e:
563 print("nsir {} not found: {}".format(nsir_id, e))
564 sys.stdout.flush()
565 return
566 elif command == "deleted":
567 return # TODO cleaning of task just in case should be done
568 elif command in (
569 "terminated",
570 "instantiated",
571 "scaled",
572 "healed",
573 "actioned",
574 ): # "scaled-cooldown-time"
575 return
576 elif topic == "vim_account":
577 vim_id = params["_id"]
578 if command in ("create", "created"):
579 if not self.config["ro_config"].get("ng"):
580 task = asyncio.ensure_future(self.vim.create(params, order_id))
581 self.lcm_tasks.register(
582 "vim_account", vim_id, order_id, "vim_create", task
583 )
584 return
585 elif command == "delete" or command == "deleted":
586 self.lcm_tasks.cancel(topic, vim_id)
587 task = asyncio.ensure_future(self.vim.delete(params, order_id))
588 self.lcm_tasks.register(
589 "vim_account", vim_id, order_id, "vim_delete", task
590 )
591 return
592 elif command == "show":
593 print("not implemented show with vim_account")
594 sys.stdout.flush()
595 return
596 elif command in ("edit", "edited"):
597 if not self.config["ro_config"].get("ng"):
598 task = asyncio.ensure_future(self.vim.edit(params, order_id))
599 self.lcm_tasks.register(
600 "vim_account", vim_id, order_id, "vim_edit", task
601 )
602 return
603 elif command == "deleted":
604 return # TODO cleaning of task just in case should be done
605 elif topic == "wim_account":
606 wim_id = params["_id"]
607 if command in ("create", "created"):
608 if not self.config["ro_config"].get("ng"):
609 task = asyncio.ensure_future(self.wim.create(params, order_id))
610 self.lcm_tasks.register(
611 "wim_account", wim_id, order_id, "wim_create", task
612 )
613 return
614 elif command == "delete" or command == "deleted":
615 self.lcm_tasks.cancel(topic, wim_id)
616 task = asyncio.ensure_future(self.wim.delete(params, order_id))
617 self.lcm_tasks.register(
618 "wim_account", wim_id, order_id, "wim_delete", task
619 )
620 return
621 elif command == "show":
622 print("not implemented show with wim_account")
623 sys.stdout.flush()
624 return
625 elif command in ("edit", "edited"):
626 task = asyncio.ensure_future(self.wim.edit(params, order_id))
627 self.lcm_tasks.register(
628 "wim_account", wim_id, order_id, "wim_edit", task
629 )
630 return
631 elif command == "deleted":
632 return # TODO cleaning of task just in case should be done
633 elif topic == "sdn":
634 _sdn_id = params["_id"]
635 if command in ("create", "created"):
636 if not self.config["ro_config"].get("ng"):
637 task = asyncio.ensure_future(self.sdn.create(params, order_id))
638 self.lcm_tasks.register(
639 "sdn", _sdn_id, order_id, "sdn_create", task
640 )
641 return
642 elif command == "delete" or command == "deleted":
643 self.lcm_tasks.cancel(topic, _sdn_id)
644 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
645 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
646 return
647 elif command in ("edit", "edited"):
648 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
649 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
650 return
651 elif command == "deleted":
652 return # TODO cleaning of task just in case should be done
653 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
654
655 async def kafka_read(self):
656 self.logger.debug(
657 "Task kafka_read Enter with worker_id={}".format(self.worker_id)
658 )
659 # future = asyncio.Future()
660 self.consecutive_errors = 0
661 self.first_start = True
662 while self.consecutive_errors < 10:
663 try:
664 topics = (
665 "ns",
666 "vim_account",
667 "wim_account",
668 "sdn",
669 "nsi",
670 "k8scluster",
671 "vca",
672 "k8srepo",
673 "pla",
674 )
675 topics_admin = ("admin",)
676 await asyncio.gather(
677 self.msg.aioread(
678 topics, self.loop, self.kafka_read_callback, from_beginning=True
679 ),
680 self.msg_admin.aioread(
681 topics_admin,
682 self.loop,
683 self.kafka_read_callback,
684 group_id=False,
685 ),
686 )
687
688 except LcmExceptionExit:
689 self.logger.debug("Bye!")
690 break
691 except Exception as e:
692 # if not first_start is the first time after starting. So leave more time and wait
693 # to allow kafka starts
694 if self.consecutive_errors == 8 if not self.first_start else 30:
695 self.logger.error(
696 "Task kafka_read task exit error too many errors. Exception: {}".format(
697 e
698 )
699 )
700 raise
701 self.consecutive_errors += 1
702 self.logger.error(
703 "Task kafka_read retrying after Exception {}".format(e)
704 )
705 wait_time = 2 if not self.first_start else 5
706 await asyncio.sleep(wait_time, loop=self.loop)
707
708 # self.logger.debug("Task kafka_read terminating")
709 self.logger.debug("Task kafka_read exit")
710
711 def start(self):
712
713 # check RO version
714 self.loop.run_until_complete(self.check_RO_version())
715
716 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop)
717 self.netslice = netslice.NetsliceLcm(
718 self.msg, self.lcm_tasks, self.config, self.loop, self.ns
719 )
720 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
721 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
722 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
723 self.k8scluster = vim_sdn.K8sClusterLcm(
724 self.msg, self.lcm_tasks, self.config, self.loop
725 )
726 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
727 self.k8srepo = vim_sdn.K8sRepoLcm(
728 self.msg, self.lcm_tasks, self.config, self.loop
729 )
730
731 self.loop.run_until_complete(
732 asyncio.gather(self.kafka_read(), self.kafka_ping())
733 )
734
735 # TODO
736 # self.logger.debug("Terminating cancelling creation tasks")
737 # self.lcm_tasks.cancel("ALL", "create")
738 # timeout = 200
739 # while self.is_pending_tasks():
740 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
741 # await asyncio.sleep(2, loop=self.loop)
742 # timeout -= 2
743 # if not timeout:
744 # self.lcm_tasks.cancel("ALL", "ALL")
745 self.loop.close()
746 self.loop = None
747 if self.db:
748 self.db.db_disconnect()
749 if self.msg:
750 self.msg.disconnect()
751 if self.msg_admin:
752 self.msg_admin.disconnect()
753 if self.fs:
754 self.fs.fs_disconnect()
755
756 def read_config_file(self, config_file):
757 # TODO make a [ini] + yaml inside parser
758 # the configparser library is not suitable, because it does not admit comments at the end of line,
759 # and not parse integer or boolean
760 try:
761 # read file as yaml format
762 with open(config_file) as f:
763 conf = yaml.load(f, Loader=yaml.Loader)
764 # Ensure all sections are not empty
765 for k in (
766 "global",
767 "timeout",
768 "RO",
769 "VCA",
770 "database",
771 "storage",
772 "message",
773 ):
774 if not conf.get(k):
775 conf[k] = {}
776
777 # read all environ that starts with OSMLCM_
778 for k, v in environ.items():
779 if not k.startswith("OSMLCM_"):
780 continue
781 subject, _, item = k[7:].lower().partition("_")
782 if not item:
783 continue
784 if subject in ("ro", "vca"):
785 # put in capital letter
786 subject = subject.upper()
787 try:
788 if item == "port" or subject == "timeout":
789 conf[subject][item] = int(v)
790 else:
791 conf[subject][item] = v
792 except Exception as e:
793 self.logger.warning(
794 "skipping environ '{}' on exception '{}'".format(k, e)
795 )
796
797 # backward compatibility of VCA parameters
798
799 if "pubkey" in conf["VCA"]:
800 conf["VCA"]["public_key"] = conf["VCA"].pop("pubkey")
801 if "cacert" in conf["VCA"]:
802 conf["VCA"]["ca_cert"] = conf["VCA"].pop("cacert")
803 if "apiproxy" in conf["VCA"]:
804 conf["VCA"]["api_proxy"] = conf["VCA"].pop("apiproxy")
805
806 if "enableosupgrade" in conf["VCA"]:
807 conf["VCA"]["enable_os_upgrade"] = conf["VCA"].pop("enableosupgrade")
808 if isinstance(conf["VCA"].get("enable_os_upgrade"), str):
809 if conf["VCA"]["enable_os_upgrade"].lower() == "false":
810 conf["VCA"]["enable_os_upgrade"] = False
811 elif conf["VCA"]["enable_os_upgrade"].lower() == "true":
812 conf["VCA"]["enable_os_upgrade"] = True
813
814 if "aptmirror" in conf["VCA"]:
815 conf["VCA"]["apt_mirror"] = conf["VCA"].pop("aptmirror")
816
817 return conf
818 except Exception as e:
819 self.logger.critical("At config file '{}': {}".format(config_file, e))
820 exit(1)
821
822 @staticmethod
823 def get_process_id():
824 """
825 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
826 will provide a random one
827 :return: Obtained ID
828 """
829 # Try getting docker id. If fails, get pid
830 try:
831 with open("/proc/self/cgroup", "r") as f:
832 text_id_ = f.readline()
833 _, _, text_id = text_id_.rpartition("/")
834 text_id = text_id.replace("\n", "")[:12]
835 if text_id:
836 return text_id
837 except Exception:
838 pass
839 # Return a random id
840 return "".join(random_choice("0123456789abcdef") for _ in range(12))
841
842
843 def usage():
844 print(
845 """Usage: {} [options]
846 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
847 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
848 -h|--help: shows this help
849 """.format(
850 sys.argv[0]
851 )
852 )
853 # --log-socket-host HOST: send logs to this host")
854 # --log-socket-port PORT: send logs using this port (default: 9022)")
855
856
857 if __name__ == "__main__":
858
859 try:
860 # print("SYS.PATH='{}'".format(sys.path))
861 # load parameters and configuration
862 # -h
863 # -c value
864 # --config value
865 # --help
866 # --health-check
867 opts, args = getopt.getopt(
868 sys.argv[1:], "hc:", ["config=", "help", "health-check"]
869 )
870 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
871 config_file = None
872 for o, a in opts:
873 if o in ("-h", "--help"):
874 usage()
875 sys.exit()
876 elif o in ("-c", "--config"):
877 config_file = a
878 elif o == "--health-check":
879 from osm_lcm.lcm_hc import health_check
880
881 health_check(health_check_file, Lcm.ping_interval_pace)
882 # elif o == "--log-socket-port":
883 # log_socket_port = a
884 # elif o == "--log-socket-host":
885 # log_socket_host = a
886 # elif o == "--log-file":
887 # log_file = a
888 else:
889 assert False, "Unhandled option"
890
891 if config_file:
892 if not path.isfile(config_file):
893 print(
894 "configuration file '{}' does not exist".format(config_file),
895 file=sys.stderr,
896 )
897 exit(1)
898 else:
899 for config_file in (
900 __file__[: __file__.rfind(".")] + ".cfg",
901 "./lcm.cfg",
902 "/etc/osm/lcm.cfg",
903 ):
904 if path.isfile(config_file):
905 break
906 else:
907 print(
908 "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
909 file=sys.stderr,
910 )
911 exit(1)
912 lcm = Lcm(config_file)
913 lcm.start()
914 except (LcmException, getopt.GetoptError) as e:
915 print(str(e), file=sys.stderr)
916 # usage()
917 exit(1)