Ignore kafka message ns-nsi/created
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
60
61
62 class Lcm:
63
64 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
65 ping_interval_boot = 5 # how many time ping is sent when booting
66
67 def __init__(self, config_file, loop=None):
68 """
69 Init, Connect to database, filesystem storage, and messaging
70 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
71 :return: None
72 """
73 self.db = None
74 self.msg = None
75 self.msg_admin = None
76 self.fs = None
77 self.pings_not_received = 1
78 self.consecutive_errors = 0
79 self.first_start = False
80
81 # logging
82 self.logger = logging.getLogger('lcm')
83 # get id
84 self.worker_id = self.get_process_id()
85 # load configuration
86 config = self.read_config_file(config_file)
87 self.config = config
88 self.config["ro_config"] = {
89 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
90 "tenant": config.get("tenant", "osm"),
91 "logger_name": "lcm.ROclient",
92 "loglevel": "ERROR",
93 }
94
95 self.loop = loop or asyncio.get_event_loop()
96
97 # logging
98 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
99 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
100 config["database"]["logger_name"] = "lcm.db"
101 config["storage"]["logger_name"] = "lcm.fs"
102 config["message"]["logger_name"] = "lcm.msg"
103 if config["global"].get("logfile"):
104 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
105 maxBytes=100e6, backupCount=9, delay=0)
106 file_handler.setFormatter(log_formatter_simple)
107 self.logger.addHandler(file_handler)
108 if not config["global"].get("nologging"):
109 str_handler = logging.StreamHandler()
110 str_handler.setFormatter(log_formatter_simple)
111 self.logger.addHandler(str_handler)
112
113 if config["global"].get("loglevel"):
114 self.logger.setLevel(config["global"]["loglevel"])
115
116 # logging other modules
117 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
118 config[k1]["logger_name"] = logname
119 logger_module = logging.getLogger(logname)
120 if config[k1].get("logfile"):
121 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
122 maxBytes=100e6, backupCount=9, delay=0)
123 file_handler.setFormatter(log_formatter_simple)
124 logger_module.addHandler(file_handler)
125 if config[k1].get("loglevel"):
126 logger_module.setLevel(config[k1]["loglevel"])
127 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
128
129 # check version of N2VC
130 # TODO enhance with int conversion or from distutils.version import LooseVersion
131 # or with list(map(int, version.split(".")))
132 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
133 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
134 n2vc_version, min_n2vc_version))
135 # check version of common
136 if versiontuple(common_version) < versiontuple(min_common_version):
137 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
138 common_version, min_common_version))
139
140 try:
141 # TODO check database version
142 if config["database"]["driver"] == "mongo":
143 self.db = dbmongo.DbMongo()
144 self.db.db_connect(config["database"])
145 elif config["database"]["driver"] == "memory":
146 self.db = dbmemory.DbMemory()
147 self.db.db_connect(config["database"])
148 else:
149 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
150 config["database"]["driver"]))
151
152 if config["storage"]["driver"] == "local":
153 self.fs = fslocal.FsLocal()
154 self.fs.fs_connect(config["storage"])
155 elif config["storage"]["driver"] == "mongo":
156 self.fs = fsmongo.FsMongo()
157 self.fs.fs_connect(config["storage"])
158 else:
159 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
160 config["storage"]["driver"]))
161
162 # copy message configuration in order to remove 'group_id' for msg_admin
163 config_message = config["message"].copy()
164 config_message["loop"] = self.loop
165 if config_message["driver"] == "local":
166 self.msg = msglocal.MsgLocal()
167 self.msg.connect(config_message)
168 self.msg_admin = msglocal.MsgLocal()
169 config_message.pop("group_id", None)
170 self.msg_admin.connect(config_message)
171 elif config_message["driver"] == "kafka":
172 self.msg = msgkafka.MsgKafka()
173 self.msg.connect(config_message)
174 self.msg_admin = msgkafka.MsgKafka()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 else:
178 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
179 config["message"]["driver"]))
180 except (DbException, FsException, MsgException) as e:
181 self.logger.critical(str(e), exc_info=True)
182 raise LcmException(str(e))
183
184 # contains created tasks/futures to be able to cancel
185 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
186
187 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
188 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
189 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
190 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
191 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
192 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
193 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
194
195 async def check_RO_version(self):
196 tries = 14
197 last_error = None
198 while True:
199 try:
200 ro_server = ROclient.ROClient(self.loop, **self.config["ro_config"])
201 ro_version = await ro_server.get_version()
202 if versiontuple(ro_version) < versiontuple(min_RO_version):
203 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
204 ro_version, min_RO_version))
205 self.logger.info("Connected to RO version {}".format(ro_version))
206 return
207 except ROclient.ROClientException as e:
208 tries -= 1
209 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["endpoint_url"],
210 e)
211 if tries <= 0:
212 self.logger.critical(error_text)
213 raise LcmException(error_text)
214 if last_error != error_text:
215 last_error = error_text
216 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
217 await asyncio.sleep(5)
218
219 async def test(self, param=None):
220 self.logger.debug("Starting/Ending test task: {}".format(param))
221
222 async def kafka_ping(self):
223 self.logger.debug("Task kafka_ping Enter")
224 consecutive_errors = 0
225 first_start = True
226 kafka_has_received = False
227 self.pings_not_received = 1
228 while True:
229 try:
230 await self.msg_admin.aiowrite(
231 "admin", "ping",
232 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
233 self.loop)
234 # time between pings are low when it is not received and at starting
235 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
236 if not self.pings_not_received:
237 kafka_has_received = True
238 self.pings_not_received += 1
239 await asyncio.sleep(wait_time, loop=self.loop)
240 if self.pings_not_received > 10:
241 raise LcmException("It is not receiving pings from Kafka bus")
242 consecutive_errors = 0
243 first_start = False
244 except LcmException:
245 raise
246 except Exception as e:
247 # if not first_start is the first time after starting. So leave more time and wait
248 # to allow kafka starts
249 if consecutive_errors == 8 if not first_start else 30:
250 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
251 raise
252 consecutive_errors += 1
253 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
254 wait_time = 2 if not first_start else 5
255 await asyncio.sleep(wait_time, loop=self.loop)
256
257 def kafka_read_callback(self, topic, command, params):
258 order_id = 1
259
260 if topic != "admin" and command != "ping":
261 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
262 self.consecutive_errors = 0
263 self.first_start = False
264 order_id += 1
265 if command == "exit":
266 raise LcmExceptionExit
267 elif command.startswith("#"):
268 return
269 elif command == "echo":
270 # just for test
271 print(params)
272 sys.stdout.flush()
273 return
274 elif command == "test":
275 asyncio.Task(self.test(params), loop=self.loop)
276 return
277
278 if topic == "admin":
279 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
280 if params.get("worker_id") != self.worker_id:
281 return
282 self.pings_not_received = 0
283 try:
284 with open(health_check_file, "w") as f:
285 f.write(str(time()))
286 except Exception as e:
287 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
288 return
289 elif topic == "k8scluster":
290 if command == "create" or command == "created":
291 k8scluster_id = params.get("_id")
292 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
293 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
294 return
295 elif command == "delete" or command == "deleted":
296 k8scluster_id = params.get("_id")
297 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
298 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
299 return
300 elif topic == "k8srepo":
301 if command == "create" or command == "created":
302 k8srepo_id = params.get("_id")
303 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
304 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
305 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
306 return
307 elif command == "delete" or command == "deleted":
308 k8srepo_id = params.get("_id")
309 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
310 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
311 return
312 elif topic == "ns":
313 if command == "instantiate":
314 # self.logger.debug("Deploying NS {}".format(nsr_id))
315 nslcmop = params
316 nslcmop_id = nslcmop["_id"]
317 nsr_id = nslcmop["nsInstanceId"]
318 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
319 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
320 return
321 elif command == "terminate":
322 # self.logger.debug("Deleting NS {}".format(nsr_id))
323 nslcmop = params
324 nslcmop_id = nslcmop["_id"]
325 nsr_id = nslcmop["nsInstanceId"]
326 self.lcm_tasks.cancel(topic, nsr_id)
327 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
328 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
329 return
330 elif command == "action":
331 # self.logger.debug("Update NS {}".format(nsr_id))
332 nslcmop = params
333 nslcmop_id = nslcmop["_id"]
334 nsr_id = nslcmop["nsInstanceId"]
335 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
336 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
337 return
338 elif command == "scale":
339 # self.logger.debug("Update NS {}".format(nsr_id))
340 nslcmop = params
341 nslcmop_id = nslcmop["_id"]
342 nsr_id = nslcmop["nsInstanceId"]
343 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
344 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
345 return
346 elif command == "show":
347 nsr_id = params
348 try:
349 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
350 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
351 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
352 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
353 db_nsr["detailed-status"],
354 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
355 except Exception as e:
356 print("nsr {} not found: {}".format(nsr_id, e))
357 sys.stdout.flush()
358 return
359 elif command == "deleted":
360 return # TODO cleaning of task just in case should be done
361 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
362 return
363 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
364 if command == "instantiate":
365 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
366 nsilcmop = params
367 nsilcmop_id = nsilcmop["_id"] # slice operation id
368 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
369 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
370 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
371 return
372 elif command == "terminate":
373 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
374 nsilcmop = params
375 nsilcmop_id = nsilcmop["_id"] # slice operation id
376 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
377 self.lcm_tasks.cancel(topic, nsir_id)
378 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
379 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
380 return
381 elif command == "show":
382 nsir_id = params
383 try:
384 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
385 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
386 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
387 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
388 db_nsir["detailed-status"],
389 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
390 except Exception as e:
391 print("nsir {} not found: {}".format(nsir_id, e))
392 sys.stdout.flush()
393 return
394 elif command == "deleted":
395 return # TODO cleaning of task just in case should be done
396 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
397 return
398 elif topic == "vim_account":
399 vim_id = params["_id"]
400 if command in ("create", "created"):
401 task = asyncio.ensure_future(self.vim.create(params, order_id))
402 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
403 return
404 elif command == "delete" or command == "deleted":
405 self.lcm_tasks.cancel(topic, vim_id)
406 task = asyncio.ensure_future(self.vim.delete(params, order_id))
407 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
408 return
409 elif command == "show":
410 print("not implemented show with vim_account")
411 sys.stdout.flush()
412 return
413 elif command in ("edit", "edited"):
414 task = asyncio.ensure_future(self.vim.edit(params, order_id))
415 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
416 return
417 elif command == "deleted":
418 return # TODO cleaning of task just in case should be done
419 elif topic == "wim_account":
420 wim_id = params["_id"]
421 if command in ("create", "created"):
422 task = asyncio.ensure_future(self.wim.create(params, order_id))
423 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
424 return
425 elif command == "delete" or command == "deleted":
426 self.lcm_tasks.cancel(topic, wim_id)
427 task = asyncio.ensure_future(self.wim.delete(params, order_id))
428 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
429 return
430 elif command == "show":
431 print("not implemented show with wim_account")
432 sys.stdout.flush()
433 return
434 elif command in ("edit", "edited"):
435 task = asyncio.ensure_future(self.wim.edit(params, order_id))
436 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
437 return
438 elif command == "deleted":
439 return # TODO cleaning of task just in case should be done
440 elif topic == "sdn":
441 _sdn_id = params["_id"]
442 if command in ("create", "created"):
443 task = asyncio.ensure_future(self.sdn.create(params, order_id))
444 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
445 return
446 elif command == "delete" or command == "deleted":
447 self.lcm_tasks.cancel(topic, _sdn_id)
448 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
449 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
450 return
451 elif command in ("edit", "edited"):
452 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
453 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
454 return
455 elif command == "deleted":
456 return # TODO cleaning of task just in case should be done
457 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
458
459 async def kafka_read(self):
460 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
461 # future = asyncio.Future()
462 self.consecutive_errors = 0
463 self.first_start = True
464 while self.consecutive_errors < 10:
465 try:
466 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
467 topics_admin = ("admin", )
468 await asyncio.gather(
469 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
470 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
471 )
472
473 except LcmExceptionExit:
474 self.logger.debug("Bye!")
475 break
476 except Exception as e:
477 # if not first_start is the first time after starting. So leave more time and wait
478 # to allow kafka starts
479 if self.consecutive_errors == 8 if not self.first_start else 30:
480 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
481 raise
482 self.consecutive_errors += 1
483 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
484 wait_time = 2 if not self.first_start else 5
485 await asyncio.sleep(wait_time, loop=self.loop)
486
487 # self.logger.debug("Task kafka_read terminating")
488 self.logger.debug("Task kafka_read exit")
489
490 def start(self):
491
492 # check RO version
493 self.loop.run_until_complete(self.check_RO_version())
494
495 self.loop.run_until_complete(asyncio.gather(
496 self.kafka_read(),
497 self.kafka_ping()
498 ))
499 # TODO
500 # self.logger.debug("Terminating cancelling creation tasks")
501 # self.lcm_tasks.cancel("ALL", "create")
502 # timeout = 200
503 # while self.is_pending_tasks():
504 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
505 # await asyncio.sleep(2, loop=self.loop)
506 # timeout -= 2
507 # if not timeout:
508 # self.lcm_tasks.cancel("ALL", "ALL")
509 self.loop.close()
510 self.loop = None
511 if self.db:
512 self.db.db_disconnect()
513 if self.msg:
514 self.msg.disconnect()
515 if self.msg_admin:
516 self.msg_admin.disconnect()
517 if self.fs:
518 self.fs.fs_disconnect()
519
520 def read_config_file(self, config_file):
521 # TODO make a [ini] + yaml inside parser
522 # the configparser library is not suitable, because it does not admit comments at the end of line,
523 # and not parse integer or boolean
524 try:
525 # read file as yaml format
526 with open(config_file) as f:
527 conf = yaml.load(f, Loader=yaml.Loader)
528 # Ensure all sections are not empty
529 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
530 if not conf.get(k):
531 conf[k] = {}
532
533 # read all environ that starts with OSMLCM_
534 for k, v in environ.items():
535 if not k.startswith("OSMLCM_"):
536 continue
537 subject, _, item = k[7:].lower().partition("_")
538 if not item:
539 continue
540 if subject in ("ro", "vca"):
541 # put in capital letter
542 subject = subject.upper()
543 try:
544 if item == "port" or subject == "timeout":
545 conf[subject][item] = int(v)
546 else:
547 conf[subject][item] = v
548 except Exception as e:
549 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
550
551 # backward compatibility of VCA parameters
552
553 if 'pubkey' in conf["VCA"]:
554 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
555 if 'cacert' in conf["VCA"]:
556 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
557 if 'apiproxy' in conf["VCA"]:
558 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
559
560 if 'enableosupgrade' in conf["VCA"]:
561 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
562 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
563 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
564 conf["VCA"]['enable_os_upgrade'] = False
565 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
566 conf["VCA"]['enable_os_upgrade'] = True
567
568 if 'aptmirror' in conf["VCA"]:
569 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
570
571 return conf
572 except Exception as e:
573 self.logger.critical("At config file '{}': {}".format(config_file, e))
574 exit(1)
575
576 @staticmethod
577 def get_process_id():
578 """
579 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
580 will provide a random one
581 :return: Obtained ID
582 """
583 # Try getting docker id. If fails, get pid
584 try:
585 with open("/proc/self/cgroup", "r") as f:
586 text_id_ = f.readline()
587 _, _, text_id = text_id_.rpartition("/")
588 text_id = text_id.replace('\n', '')[:12]
589 if text_id:
590 return text_id
591 except Exception:
592 pass
593 # Return a random id
594 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
595
596
597 def usage():
598 print("""Usage: {} [options]
599 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
600 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
601 -h|--help: shows this help
602 """.format(sys.argv[0]))
603 # --log-socket-host HOST: send logs to this host")
604 # --log-socket-port PORT: send logs using this port (default: 9022)")
605
606
607 def health_check():
608 retry = 2
609 while retry:
610 retry -= 1
611 try:
612 with open(health_check_file, "r") as f:
613 last_received_ping = f.read()
614
615 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
616 exit(0)
617 except Exception:
618 pass
619 if retry:
620 sleep(6)
621 exit(1)
622
623
624 if __name__ == '__main__':
625
626 try:
627 print("SYS.PATH='{}'".format(sys.path))
628 # load parameters and configuration
629 # -h
630 # -c value
631 # --config value
632 # --help
633 # --health-check
634 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
635 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
636 config_file = None
637 for o, a in opts:
638 if o in ("-h", "--help"):
639 usage()
640 sys.exit()
641 elif o in ("-c", "--config"):
642 config_file = a
643 elif o == "--health-check":
644 health_check()
645 # elif o == "--log-socket-port":
646 # log_socket_port = a
647 # elif o == "--log-socket-host":
648 # log_socket_host = a
649 # elif o == "--log-file":
650 # log_file = a
651 else:
652 assert False, "Unhandled option"
653
654 if config_file:
655 if not path.isfile(config_file):
656 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
657 exit(1)
658 else:
659 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
660 if path.isfile(config_file):
661 break
662 else:
663 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
664 exit(1)
665 lcm = Lcm(config_file)
666 lcm.start()
667 except (LcmException, getopt.GetoptError) as e:
668 print(str(e), file=sys.stderr)
669 # usage()
670 exit(1)