Code Coverage

Cobertura Coverage Report > osm_lcm >

lcm.py

Trend

File Coverage summary

NameClassesLinesConditionals
lcm.py
100%
1/1
27%
134/497
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
lcm.py
27%
134/497
N/A

Source

osm_lcm/lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 #         http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 1 import os
23 1 import pdb
24
25 1 import asyncio
26 1 import yaml
27 1 import logging
28 1 import logging.handlers
29 1 import getopt
30 1 import sys
31
32 1 from osm_lcm import ns, vim_sdn, netslice
33 1 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 1 from osm_lcm.ROclient import ROClient, ROClientException
35
36 1 from time import time
37 1 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 1 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 1 from osm_common import msglocal, msgkafka
41 1 from osm_common import version as common_version
42 1 from osm_common.dbbase import DbException
43 1 from osm_common.fsbase import FsException
44 1 from osm_common.msgbase import MsgException
45 1 from osm_lcm.data_utils.database.database import Database
46 1 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 1 from osm_lcm.data_utils.lcm_config import LcmCfg
48 1 from osm_lcm.lcm_hc import get_health_check_file
49 1 from os import path
50 1 from random import choice as random_choice
51 1 from n2vc import version as n2vc_version
52 1 import traceback
53
54 1 if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
55 0     pdb.set_trace()
56
57
58 1 __author__ = "Alfonso Tierno"
59 1 min_RO_version = "6.0.2"
60 1 min_n2vc_version = "0.0.2"
61
62 1 min_common_version = "0.1.19"
63
64
65 1 class Lcm:
66 1     ping_interval_pace = (
67         120  # how many time ping is send once is confirmed all is running
68     )
69 1     ping_interval_boot = 5  # how many time ping is sent when booting
70
71 1     main_config = LcmCfg()
72
73 1     def __init__(self, config_file, loop=None):
74         """
75         Init, Connect to database, filesystem storage, and messaging
76         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77         :return: None
78         """
79 1         self.db = None
80 1         self.msg = None
81 1         self.msg_admin = None
82 1         self.fs = None
83 1         self.pings_not_received = 1
84 1         self.consecutive_errors = 0
85 1         self.first_start = False
86
87         # logging
88 1         self.logger = logging.getLogger("lcm")
89         # get id
90 1         self.worker_id = self.get_process_id()
91         # load configuration
92 1         config = self.read_config_file(config_file)
93 1         self.main_config.set_from_dict(config)
94 1         self.main_config.transform()
95 1         self.main_config.load_from_env()
96 1         self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
97         # TODO: check if lcm_hc.py is necessary
98 1         self.health_check_file = get_health_check_file(self.main_config.to_dict())
99 1         self.loop = loop or asyncio.get_event_loop()
100 1         self.ns = (
101             self.netslice
102         ) = (
103             self.vim
104         ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
105
106         # logging
107 1         log_format_simple = (
108             "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
109         )
110 1         log_formatter_simple = logging.Formatter(
111             log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
112         )
113 1         if self.main_config.globalConfig.logfile:
114 0             file_handler = logging.handlers.RotatingFileHandler(
115                 self.main_config.globalConfig.logfile,
116                 maxBytes=100e6,
117                 backupCount=9,
118                 delay=0,
119             )
120 0             file_handler.setFormatter(log_formatter_simple)
121 0             self.logger.addHandler(file_handler)
122 1         if not self.main_config.globalConfig.to_dict()["nologging"]:
123 1             str_handler = logging.StreamHandler()
124 1             str_handler.setFormatter(log_formatter_simple)
125 1             self.logger.addHandler(str_handler)
126
127 1         if self.main_config.globalConfig.to_dict()["loglevel"]:
128 1             self.logger.setLevel(self.main_config.globalConfig.loglevel)
129
130         # logging other modules
131 1         for logger in ("message", "database", "storage", "tsdb"):
132 1             logger_config = self.main_config.to_dict()[logger]
133 1             logger_module = logging.getLogger(logger_config["logger_name"])
134 1             if logger_config["logfile"]:
135 0                 file_handler = logging.handlers.RotatingFileHandler(
136                     logger_config["logfile"], maxBytes=100e6, backupCount=9, delay=0
137                 )
138 0                 file_handler.setFormatter(log_formatter_simple)
139 0                 logger_module.addHandler(file_handler)
140 1             if logger_config["loglevel"]:
141 1                 logger_module.setLevel(logger_config["loglevel"])
142 1         self.logger.critical(
143             "starting osm/lcm version {} {}".format(lcm_version, lcm_version_date)
144         )
145
146         # check version of N2VC
147         # TODO enhance with int conversion or from distutils.version import LooseVersion
148         # or with list(map(int, version.split(".")))
149 1         if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
150 0             raise LcmException(
151                 "Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
152                     n2vc_version, min_n2vc_version
153                 )
154             )
155         # check version of common
156 1         if versiontuple(common_version) < versiontuple(min_common_version):
157 0             raise LcmException(
158                 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
159                     common_version, min_common_version
160                 )
161             )
162
163 1         try:
164 1             self.db = Database(self.main_config.to_dict()).instance.db
165
166 1             self.fs = Filesystem(self.main_config.to_dict()).instance.fs
167 1             self.fs.sync()
168
169             # copy message configuration in order to remove 'group_id' for msg_admin
170 1             config_message = self.main_config.message.to_dict()
171 1             config_message["loop"] = self.loop
172 1             if config_message["driver"] == "local":
173 1                 self.msg = msglocal.MsgLocal()
174 1                 self.msg.connect(config_message)
175 1                 self.msg_admin = msglocal.MsgLocal()
176 1                 config_message.pop("group_id", None)
177 1                 self.msg_admin.connect(config_message)
178 0             elif config_message["driver"] == "kafka":
179 0                 self.msg = msgkafka.MsgKafka()
180 0                 self.msg.connect(config_message)
181 0                 self.msg_admin = msgkafka.MsgKafka()
182 0                 config_message.pop("group_id", None)
183 0                 self.msg_admin.connect(config_message)
184             else:
185 0                 raise LcmException(
186                     "Invalid configuration param '{}' at '[message]':'driver'".format(
187                         self.main_config.message.driver
188                     )
189                 )
190 0         except (DbException, FsException, MsgException) as e:
191 0             self.logger.critical(str(e), exc_info=True)
192 0             raise LcmException(str(e))
193
194         # contains created tasks/futures to be able to cancel
195 1         self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
196
197 1     async def check_RO_version(self):
198 0         tries = 14
199 0         last_error = None
200         while True:
201 0             ro_uri = self.main_config.RO.uri
202 0             if not ro_uri:
203 0                 ro_uri = ""
204 0             try:
205                 # try new  RO, if fail old RO
206 0                 try:
207 0                     self.main_config.RO.uri = ro_uri + "ro"
208 0                     ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
209 0                     ro_version = await ro_server.get_version()
210 0                     self.main_config.RO.ng = True
211 0                 except Exception:
212 0                     self.main_config.RO.uri = ro_uri + "openmano"
213 0                     ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
214 0                     ro_version = await ro_server.get_version()
215 0                     self.main_config.RO.ng = False
216 0                 if versiontuple(ro_version) < versiontuple(min_RO_version):
217 0                     raise LcmException(
218                         "Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
219                             ro_version, min_RO_version
220                         )
221                     )
222 0                 self.logger.info(
223                     "Connected to RO version {} new-generation version {}".format(
224                         ro_version, self.main_config.RO.ng
225                     )
226                 )
227 0                 return
228 0             except (ROClientException, NgRoException) as e:
229 0                 self.main_config.RO.uri = ro_uri
230 0                 tries -= 1
231 0                 traceback.print_tb(e.__traceback__)
232 0                 error_text = "Error while connecting to RO on {}: {}".format(
233                     self.main_config.RO.uri, e
234                 )
235 0                 if tries <= 0:
236 0                     self.logger.critical(error_text)
237 0                     raise LcmException(error_text)
238 0                 if last_error != error_text:
239 0                     last_error = error_text
240 0                     self.logger.error(
241                         error_text + ". Waiting until {} seconds".format(5 * tries)
242                     )
243 0                 await asyncio.sleep(5)
244
245 1     async def test(self, param=None):
246 0         self.logger.debug("Starting/Ending test task: {}".format(param))
247
248 1     async def kafka_ping(self):
249 0         self.logger.debug("Task kafka_ping Enter")
250 0         consecutive_errors = 0
251 0         first_start = True
252 0         kafka_has_received = False
253 0         self.pings_not_received = 1
254         while True:
255 0             try:
256 0                 await self.msg_admin.aiowrite(
257                     "admin",
258                     "ping",
259                     {
260                         "from": "lcm",
261                         "to": "lcm",
262                         "worker_id": self.worker_id,
263                         "version": lcm_version,
264                     },
265                     self.loop,
266                 )
267                 # time between pings are low when it is not received and at starting
268 0                 wait_time = (
269                     self.ping_interval_boot
270                     if not kafka_has_received
271                     else self.ping_interval_pace
272                 )
273 0                 if not self.pings_not_received:
274 0                     kafka_has_received = True
275 0                 self.pings_not_received += 1
276 0                 await asyncio.sleep(wait_time, loop=self.loop)
277 0                 if self.pings_not_received > 10:
278 0                     raise LcmException("It is not receiving pings from Kafka bus")
279 0                 consecutive_errors = 0
280 0                 first_start = False
281 0             except LcmException:
282 0                 raise
283 0             except Exception as e:
284                 # if not first_start is the first time after starting. So leave more time and wait
285                 # to allow kafka starts
286 0                 if consecutive_errors == 8 if not first_start else 30:
287 0                     self.logger.error(
288                         "Task kafka_read task exit error too many errors. Exception: {}".format(
289                             e
290                         )
291                     )
292 0                     raise
293 0                 consecutive_errors += 1
294 0                 self.logger.error(
295                     "Task kafka_read retrying after Exception {}".format(e)
296                 )
297 0                 wait_time = 2 if not first_start else 5
298 0                 await asyncio.sleep(wait_time, loop=self.loop)
299
300 1     def kafka_read_callback(self, topic, command, params):
301 1         order_id = 1
302
303 1         if topic != "admin" and command != "ping":
304 0             self.logger.debug(
305                 "Task kafka_read receives {} {}: {}".format(topic, command, params)
306             )
307 1         self.consecutive_errors = 0
308 1         self.first_start = False
309 1         order_id += 1
310 1         if command == "exit":
311 0             raise LcmExceptionExit
312 1         elif command.startswith("#"):
313 0             return
314 1         elif command == "echo":
315             # just for test
316 0             print(params)
317 0             sys.stdout.flush()
318 0             return
319 1         elif command == "test":
320 0             asyncio.Task(self.test(params), loop=self.loop)
321 0             return
322
323 1         if topic == "admin":
324 1             if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
325 1                 if params.get("worker_id") != self.worker_id:
326 1                     return
327 1                 self.pings_not_received = 0
328 1                 try:
329 1                     with open(self.health_check_file, "w") as f:
330 1                         f.write(str(time()))
331 0                 except Exception as e:
332 0                     self.logger.error(
333                         "Cannot write into '{}' for healthcheck: {}".format(
334                             self.health_check_file, e
335                         )
336                     )
337 1             return
338 1         elif topic == "pla":
339 0             if command == "placement":
340 0                 self.ns.update_nsrs_with_pla_result(params)
341 0             return
342 1         elif topic == "k8scluster":
343 0             if command == "create" or command == "created":
344 0                 k8scluster_id = params.get("_id")
345 0                 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
346 0                 self.lcm_tasks.register(
347                     "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
348                 )
349 0                 return
350 0             elif command == "delete" or command == "deleted":
351 0                 k8scluster_id = params.get("_id")
352 0                 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
353 0                 self.lcm_tasks.register(
354                     "k8scluster", k8scluster_id, order_id, "k8scluster_delete", task
355                 )
356 0                 return
357 1         elif topic == "vca":
358 0             if command == "create" or command == "created":
359 0                 vca_id = params.get("_id")
360 0                 task = asyncio.ensure_future(self.vca.create(params, order_id))
361 0                 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
362 0                 return
363 0             elif command == "delete" or command == "deleted":
364 0                 vca_id = params.get("_id")
365 0                 task = asyncio.ensure_future(self.vca.delete(params, order_id))
366 0                 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
367 0                 return
368 1         elif topic == "k8srepo":
369 0             if command == "create" or command == "created":
370 0                 k8srepo_id = params.get("_id")
371 0                 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
372 0                 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
373 0                 self.lcm_tasks.register(
374                     "k8srepo", k8srepo_id, order_id, "k8srepo_create", task
375                 )
376 0                 return
377 0             elif command == "delete" or command == "deleted":
378 0                 k8srepo_id = params.get("_id")
379 0                 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
380 0                 self.lcm_tasks.register(
381                     "k8srepo", k8srepo_id, order_id, "k8srepo_delete", task
382                 )
383 0                 return
384 1         elif topic == "ns":
385 0             if command == "instantiate":
386                 # self.logger.debug("Deploying NS {}".format(nsr_id))
387 0                 nslcmop = params
388 0                 nslcmop_id = nslcmop["_id"]
389 0                 nsr_id = nslcmop["nsInstanceId"]
390 0                 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
391 0                 self.lcm_tasks.register(
392                     "ns", nsr_id, nslcmop_id, "ns_instantiate", task
393                 )
394 0                 return
395 0             elif command == "terminate":
396                 # self.logger.debug("Deleting NS {}".format(nsr_id))
397 0                 nslcmop = params
398 0                 nslcmop_id = nslcmop["_id"]
399 0                 nsr_id = nslcmop["nsInstanceId"]
400 0                 self.lcm_tasks.cancel(topic, nsr_id)
401 0                 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
402 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
403 0                 return
404 0             elif command == "vca_status_refresh":
405 0                 nslcmop = params
406 0                 nslcmop_id = nslcmop["_id"]
407 0                 nsr_id = nslcmop["nsInstanceId"]
408 0                 task = asyncio.ensure_future(
409                     self.ns.vca_status_refresh(nsr_id, nslcmop_id)
410                 )
411 0                 self.lcm_tasks.register(
412                     "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
413                 )
414 0                 return
415 0             elif command == "action":
416                 # self.logger.debug("Update NS {}".format(nsr_id))
417 0                 nslcmop = params
418 0                 nslcmop_id = nslcmop["_id"]
419 0                 nsr_id = nslcmop["nsInstanceId"]
420 0                 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
421 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
422 0                 return
423 0             elif command == "update":
424                 # self.logger.debug("Update NS {}".format(nsr_id))
425 0                 nslcmop = params
426 0                 nslcmop_id = nslcmop["_id"]
427 0                 nsr_id = nslcmop["nsInstanceId"]
428 0                 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
429 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
430 0                 return
431 0             elif command == "scale":
432                 # self.logger.debug("Update NS {}".format(nsr_id))
433 0                 nslcmop = params
434 0                 nslcmop_id = nslcmop["_id"]
435 0                 nsr_id = nslcmop["nsInstanceId"]
436 0                 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
437 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
438 0                 return
439 0             elif command == "heal":
440                 # self.logger.debug("Healing NS {}".format(nsr_id))
441 0                 nslcmop = params
442 0                 nslcmop_id = nslcmop["_id"]
443 0                 nsr_id = nslcmop["nsInstanceId"]
444 0                 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
445 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
446 0                 return
447 0             elif command == "migrate":
448 0                 nslcmop = params
449 0                 nslcmop_id = nslcmop["_id"]
450 0                 nsr_id = nslcmop["nsInstanceId"]
451 0                 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
452 0                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
453 0                 return
454 0             elif command == "verticalscale":
455 0                 nslcmop = params
456 0                 nslcmop_id = nslcmop["_id"]
457 0                 nsr_id = nslcmop["nsInstanceId"]
458 0                 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
459 0                 self.logger.debug(
460                     "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
461                 )
462 0                 self.lcm_tasks.register(
463                     "ns", nsr_id, nslcmop_id, "ns_verticalscale", task
464                 )
465 0                 self.logger.debug(
466                     "LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)
467                 )
468 0                 return
469 0             elif command == "show":
470 0                 nsr_id = params
471 0                 try:
472 0                     db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
473 0                     print(
474                         "nsr:\n    _id={}\n    operational-status: {}\n    config-status: {}"
475                         "\n    detailed-status: {}\n    deploy: {}\n    tasks: {}"
476                         "".format(
477                             nsr_id,
478                             db_nsr["operational-status"],
479                             db_nsr["config-status"],
480                             db_nsr["detailed-status"],
481                             db_nsr["_admin"]["deployed"],
482                             self.lcm_ns_tasks.get(nsr_id),
483                         )
484                     )
485 0                 except Exception as e:
486 0                     print("nsr {} not found: {}".format(nsr_id, e))
487 0                 sys.stdout.flush()
488 0                 return
489 0             elif command == "deleted":
490 0                 return  # TODO cleaning of task just in case should be done
491 0             elif command in (
492                 "vnf_terminated",
493                 "policy_updated",
494                 "terminated",
495                 "instantiated",
496                 "scaled",
497                 "healed",
498                 "actioned",
499                 "updated",
500                 "migrated",
501                 "verticalscaled",
502             ):  # "scaled-cooldown-time"
503 0                 return
504
505 1         elif topic == "nsi":  # netslice LCM processes (instantiate, terminate, etc)
506 0             if command == "instantiate":
507                 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
508 0                 nsilcmop = params
509 0                 nsilcmop_id = nsilcmop["_id"]  # slice operation id
510 0                 nsir_id = nsilcmop["netsliceInstanceId"]  # slice record id
511 0                 task = asyncio.ensure_future(
512                     self.netslice.instantiate(nsir_id, nsilcmop_id)
513                 )
514 0                 self.lcm_tasks.register(
515                     "nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task
516                 )
517 0                 return
518 0             elif command == "terminate":
519                 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
520 0                 nsilcmop = params
521 0                 nsilcmop_id = nsilcmop["_id"]  # slice operation id
522 0                 nsir_id = nsilcmop["netsliceInstanceId"]  # slice record id
523 0                 self.lcm_tasks.cancel(topic, nsir_id)
524 0                 task = asyncio.ensure_future(
525                     self.netslice.terminate(nsir_id, nsilcmop_id)
526                 )
527 0                 self.lcm_tasks.register(
528                     "nsi", nsir_id, nsilcmop_id, "nsi_terminate", task
529                 )
530 0                 return
531 0             elif command == "show":
532 0                 nsir_id = params
533 0                 try:
534 0                     db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
535 0                     print(
536                         "nsir:\n    _id={}\n    operational-status: {}\n    config-status: {}"
537                         "\n    detailed-status: {}\n    deploy: {}\n    tasks: {}"
538                         "".format(
539                             nsir_id,
540                             db_nsir["operational-status"],
541                             db_nsir["config-status"],
542                             db_nsir["detailed-status"],
543                             db_nsir["_admin"]["deployed"],
544                             self.lcm_netslice_tasks.get(nsir_id),
545                         )
546                     )
547 0                 except Exception as e:
548 0                     print("nsir {} not found: {}".format(nsir_id, e))
549 0                 sys.stdout.flush()
550 0                 return
551 0             elif command == "deleted":
552 0                 return  # TODO cleaning of task just in case should be done
553 0             elif command in (
554                 "terminated",
555                 "instantiated",
556                 "scaled",
557                 "healed",
558                 "actioned",
559             ):  # "scaled-cooldown-time"
560 0                 return
561 1         elif topic == "vim_account":
562 0             vim_id = params["_id"]
563 0             if command in ("create", "created"):
564 0                 if not self.main_config.RO.ng:
565 0                     task = asyncio.ensure_future(self.vim.create(params, order_id))
566 0                     self.lcm_tasks.register(
567                         "vim_account", vim_id, order_id, "vim_create", task
568                     )
569 0                 return
570 0             elif command == "delete" or command == "deleted":
571 0                 self.lcm_tasks.cancel(topic, vim_id)
572 0                 task = asyncio.ensure_future(self.vim.delete(params, order_id))
573 0                 self.lcm_tasks.register(
574                     "vim_account", vim_id, order_id, "vim_delete", task
575                 )
576 0                 return
577 0             elif command == "show":
578 0                 print("not implemented show with vim_account")
579 0                 sys.stdout.flush()
580 0                 return
581 0             elif command in ("edit", "edited"):
582 0                 if not self.main_config.RO.ng:
583 0                     task = asyncio.ensure_future(self.vim.edit(params, order_id))
584 0                     self.lcm_tasks.register(
585                         "vim_account", vim_id, order_id, "vim_edit", task
586                     )
587 0                 return
588 0             elif command == "deleted":
589 0                 return  # TODO cleaning of task just in case should be done
590 1         elif topic == "wim_account":
591 0             wim_id = params["_id"]
592 0             if command in ("create", "created"):
593 0                 if not self.main_config.RO.ng:
594 0                     task = asyncio.ensure_future(self.wim.create(params, order_id))
595 0                     self.lcm_tasks.register(
596                         "wim_account", wim_id, order_id, "wim_create", task
597                     )
598 0                 return
599 0             elif command == "delete" or command == "deleted":
600 0                 self.lcm_tasks.cancel(topic, wim_id)
601 0                 task = asyncio.ensure_future(self.wim.delete(params, order_id))
602 0                 self.lcm_tasks.register(
603                     "wim_account", wim_id, order_id, "wim_delete", task
604                 )
605 0                 return
606 0             elif command == "show":
607 0                 print("not implemented show with wim_account")
608 0                 sys.stdout.flush()
609 0                 return
610 0             elif command in ("edit", "edited"):
611 0                 task = asyncio.ensure_future(self.wim.edit(params, order_id))
612 0                 self.lcm_tasks.register(
613                     "wim_account", wim_id, order_id, "wim_edit", task
614                 )
615 0                 return
616 0             elif command == "deleted":
617 0                 return  # TODO cleaning of task just in case should be done
618 1         elif topic == "sdn":
619 0             _sdn_id = params["_id"]
620 0             if command in ("create", "created"):
621 0                 if not self.main_config.RO.ng:
622 0                     task = asyncio.ensure_future(self.sdn.create(params, order_id))
623 0                     self.lcm_tasks.register(
624                         "sdn", _sdn_id, order_id, "sdn_create", task
625                     )
626 0                 return
627 0             elif command == "delete" or command == "deleted":
628 0                 self.lcm_tasks.cancel(topic, _sdn_id)
629 0                 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
630 0                 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
631 0                 return
632 0             elif command in ("edit", "edited"):
633 0                 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
634 0                 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
635 0                 return
636 0             elif command == "deleted":
637 0                 return  # TODO cleaning of task just in case should be done
638 1         self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
639
640 1     async def kafka_read(self):
641 0         self.logger.debug(
642             "Task kafka_read Enter with worker_id={}".format(self.worker_id)
643         )
644         # future = asyncio.Future()
645 0         self.consecutive_errors = 0
646 0         self.first_start = True
647 0         while self.consecutive_errors < 10:
648 0             try:
649 0                 topics = (
650                     "ns",
651                     "vim_account",
652                     "wim_account",
653                     "sdn",
654                     "nsi",
655                     "k8scluster",
656                     "vca",
657                     "k8srepo",
658                     "pla",
659                 )
660 0                 topics_admin = ("admin",)
661 0                 await asyncio.gather(
662                     self.msg.aioread(
663                         topics, self.loop, self.kafka_read_callback, from_beginning=True
664                     ),
665                     self.msg_admin.aioread(
666                         topics_admin,
667                         self.loop,
668                         self.kafka_read_callback,
669                         group_id=False,
670                     ),
671                 )
672
673 0             except LcmExceptionExit:
674 0                 self.logger.debug("Bye!")
675 0                 break
676 0             except Exception as e:
677                 # if not first_start is the first time after starting. So leave more time and wait
678                 # to allow kafka starts
679 0                 if self.consecutive_errors == 8 if not self.first_start else 30:
680 0                     self.logger.error(
681                         "Task kafka_read task exit error too many errors. Exception: {}".format(
682                             e
683                         )
684                     )
685 0                     raise
686 0                 self.consecutive_errors += 1
687 0                 self.logger.error(
688                     "Task kafka_read retrying after Exception {}".format(e)
689                 )
690 0                 wait_time = 2 if not self.first_start else 5
691 0                 await asyncio.sleep(wait_time, loop=self.loop)
692
693         # self.logger.debug("Task kafka_read terminating")
694 0         self.logger.debug("Task kafka_read exit")
695
696 1     def start(self):
697         # check RO version
698 0         self.loop.run_until_complete(self.check_RO_version())
699
700 0         self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
701         # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
702 0         self.netslice = netslice.NetsliceLcm(
703             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
704         )
705 0         self.vim = vim_sdn.VimLcm(
706             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
707         )
708 0         self.wim = vim_sdn.WimLcm(
709             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
710         )
711 0         self.sdn = vim_sdn.SdnLcm(
712             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
713         )
714 0         self.k8scluster = vim_sdn.K8sClusterLcm(
715             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
716         )
717 0         self.vca = vim_sdn.VcaLcm(
718             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
719         )
720 0         self.k8srepo = vim_sdn.K8sRepoLcm(
721             self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
722         )
723
724 0         self.loop.run_until_complete(
725             asyncio.gather(self.kafka_read(), self.kafka_ping())
726         )
727
728         # TODO
729         # self.logger.debug("Terminating cancelling creation tasks")
730         # self.lcm_tasks.cancel("ALL", "create")
731         # timeout = 200
732         # while self.is_pending_tasks():
733         #     self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
734         #     await asyncio.sleep(2, loop=self.loop)
735         #     timeout -= 2
736         #     if not timeout:
737         #         self.lcm_tasks.cancel("ALL", "ALL")
738 0         self.loop.close()
739 0         self.loop = None
740 0         if self.db:
741 0             self.db.db_disconnect()
742 0         if self.msg:
743 0             self.msg.disconnect()
744 0         if self.msg_admin:
745 0             self.msg_admin.disconnect()
746 0         if self.fs:
747 0             self.fs.fs_disconnect()
748
749 1     def read_config_file(self, config_file):
750 1         try:
751 1             with open(config_file) as f:
752 1                 return yaml.safe_load(f)
753 0         except Exception as e:
754 0             self.logger.critical("At config file '{}': {}".format(config_file, e))
755 0             exit(1)
756
757 1     @staticmethod
758 1     def get_process_id():
759         """
760         Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
761         will provide a random one
762         :return: Obtained ID
763         """
764         # Try getting docker id. If fails, get pid
765 1         try:
766 1             with open("/proc/self/cgroup", "r") as f:
767 1                 text_id_ = f.readline()
768 1                 _, _, text_id = text_id_.rpartition("/")
769 1                 text_id = text_id.replace("\n", "")[:12]
770 1                 if text_id:
771 0                     return text_id
772 0         except Exception:
773 0             pass
774         # Return a random id
775 1         return "".join(random_choice("0123456789abcdef") for _ in range(12))
776
777
778 1 def usage():
779 0     print(
780         """Usage: {} [options]
781         -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
782         --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
783         -h|--help: shows this help
784         """.format(
785             sys.argv[0]
786         )
787     )
788     # --log-socket-host HOST: send logs to this host")
789     # --log-socket-port PORT: send logs using this port (default: 9022)")
790
791
792 1 if __name__ == "__main__":
793 0     try:
794         # print("SYS.PATH='{}'".format(sys.path))
795         # load parameters and configuration
796         # -h
797         # -c value
798         # --config value
799         # --help
800         # --health-check
801 0         opts, args = getopt.getopt(
802             sys.argv[1:], "hc:", ["config=", "help", "health-check"]
803         )
804         # TODO add  "log-socket-host=", "log-socket-port=", "log-file="
805 0         config_file = None
806 0         for o, a in opts:
807 0             if o in ("-h", "--help"):
808 0                 usage()
809 0                 sys.exit()
810 0             elif o in ("-c", "--config"):
811 0                 config_file = a
812 0             elif o == "--health-check":
813 0                 from osm_lcm.lcm_hc import health_check
814
815 0                 health_check(config_file, Lcm.ping_interval_pace)
816             # elif o == "--log-socket-port":
817             #     log_socket_port = a
818             # elif o == "--log-socket-host":
819             #     log_socket_host = a
820             # elif o == "--log-file":
821             #     log_file = a
822             else:
823 0                 assert False, "Unhandled option"
824
825 0         if config_file:
826 0             if not path.isfile(config_file):
827 0                 print(
828                     "configuration file '{}' does not exist".format(config_file),
829                     file=sys.stderr,
830                 )
831 0                 exit(1)
832         else:
833 0             for config_file in (
834                 __file__[: __file__.rfind(".")] + ".cfg",
835                 "./lcm.cfg",
836                 "/etc/osm/lcm.cfg",
837             ):
838 0                 if path.isfile(config_file):
839 0                     break
840             else:
841 0                 print(
842                     "No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/",
843                     file=sys.stderr,
844                 )
845 0                 exit(1)
846 0         lcm = Lcm(config_file)
847 0         lcm.start()
848 0     except (LcmException, getopt.GetoptError) as e:
849 0         print(str(e), file=sys.stderr)
850         # usage()
851 0         exit(1)