LCM Helm connector integration
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 # uncomment if LCM is installed as library and installed, and get them from __init__.py
60 # lcm_version = '0.1.41'
61 # lcm_version_date = '2019-06-19'
62 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
63
64
65 class Lcm:
66
67 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
68 ping_interval_boot = 5 # how many time ping is sent when booting
69
70 def __init__(self, config_file, loop=None):
71 """
72 Init, Connect to database, filesystem storage, and messaging
73 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
74 :return: None
75 """
76
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.ro_config = {
93 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
94 "tenant": config.get("tenant", "osm"),
95 "logger_name": "lcm.ROclient",
96 "loglevel": "ERROR",
97 }
98
99 self.vca_config = config["VCA"]
100
101 self.loop = loop or asyncio.get_event_loop()
102
103 # logging
104 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
105 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
106 config["database"]["logger_name"] = "lcm.db"
107 config["storage"]["logger_name"] = "lcm.fs"
108 config["message"]["logger_name"] = "lcm.msg"
109 if config["global"].get("logfile"):
110 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
111 maxBytes=100e6, backupCount=9, delay=0)
112 file_handler.setFormatter(log_formatter_simple)
113 self.logger.addHandler(file_handler)
114 if not config["global"].get("nologging"):
115 str_handler = logging.StreamHandler()
116 str_handler.setFormatter(log_formatter_simple)
117 self.logger.addHandler(str_handler)
118
119 if config["global"].get("loglevel"):
120 self.logger.setLevel(config["global"]["loglevel"])
121
122 # logging other modules
123 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
124 config[k1]["logger_name"] = logname
125 logger_module = logging.getLogger(logname)
126 if config[k1].get("logfile"):
127 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
128 maxBytes=100e6, backupCount=9, delay=0)
129 file_handler.setFormatter(log_formatter_simple)
130 logger_module.addHandler(file_handler)
131 if config[k1].get("loglevel"):
132 logger_module.setLevel(config[k1]["loglevel"])
133 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
134
135 # check version of N2VC
136 # TODO enhance with int conversion or from distutils.version import LooseVersion
137 # or with list(map(int, version.split(".")))
138 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
139 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
140 n2vc_version, min_n2vc_version))
141 # check version of common
142 if versiontuple(common_version) < versiontuple(min_common_version):
143 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
144 common_version, min_common_version))
145
146 try:
147 # TODO check database version
148 if config["database"]["driver"] == "mongo":
149 self.db = dbmongo.DbMongo()
150 self.db.db_connect(config["database"])
151 elif config["database"]["driver"] == "memory":
152 self.db = dbmemory.DbMemory()
153 self.db.db_connect(config["database"])
154 else:
155 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
156 config["database"]["driver"]))
157
158 if config["storage"]["driver"] == "local":
159 self.fs = fslocal.FsLocal()
160 self.fs.fs_connect(config["storage"])
161 else:
162 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
163 config["storage"]["driver"]))
164
165 # copy message configuration in order to remove 'group_id' for msg_admin
166 config_message = config["message"].copy()
167 config_message["loop"] = self.loop
168 if config_message["driver"] == "local":
169 self.msg = msglocal.MsgLocal()
170 self.msg.connect(config_message)
171 self.msg_admin = msglocal.MsgLocal()
172 config_message.pop("group_id", None)
173 self.msg_admin.connect(config_message)
174 elif config_message["driver"] == "kafka":
175 self.msg = msgkafka.MsgKafka()
176 self.msg.connect(config_message)
177 self.msg_admin = msgkafka.MsgKafka()
178 config_message.pop("group_id", None)
179 self.msg_admin.connect(config_message)
180 else:
181 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
182 config["message"]["driver"]))
183 except (DbException, FsException, MsgException) as e:
184 self.logger.critical(str(e), exc_info=True)
185 raise LcmException(str(e))
186
187 # contains created tasks/futures to be able to cancel
188 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
189
190 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
191 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
192 self.vca_config, self.loop)
193 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
194 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
195 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
196 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
197 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
198
199 async def check_RO_version(self):
200 tries = 14
201 last_error = None
202 while True:
203 try:
204 ro_server = ROclient.ROClient(self.loop, **self.ro_config)
205 ro_version = await ro_server.get_version()
206 if versiontuple(ro_version) < versiontuple(min_RO_version):
207 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
208 ro_version, min_RO_version))
209 self.logger.info("Connected to RO version {}".format(ro_version))
210 return
211 except ROclient.ROClientException as e:
212 tries -= 1
213 error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e)
214 if tries <= 0:
215 self.logger.critical(error_text)
216 raise LcmException(error_text)
217 if last_error != error_text:
218 last_error = error_text
219 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
220 await asyncio.sleep(5)
221
222 async def test(self, param=None):
223 self.logger.debug("Starting/Ending test task: {}".format(param))
224
225 async def kafka_ping(self):
226 self.logger.debug("Task kafka_ping Enter")
227 consecutive_errors = 0
228 first_start = True
229 kafka_has_received = False
230 self.pings_not_received = 1
231 while True:
232 try:
233 await self.msg_admin.aiowrite(
234 "admin", "ping",
235 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
236 self.loop)
237 # time between pings are low when it is not received and at starting
238 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
239 if not self.pings_not_received:
240 kafka_has_received = True
241 self.pings_not_received += 1
242 await asyncio.sleep(wait_time, loop=self.loop)
243 if self.pings_not_received > 10:
244 raise LcmException("It is not receiving pings from Kafka bus")
245 consecutive_errors = 0
246 first_start = False
247 except LcmException:
248 raise
249 except Exception as e:
250 # if not first_start is the first time after starting. So leave more time and wait
251 # to allow kafka starts
252 if consecutive_errors == 8 if not first_start else 30:
253 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
254 raise
255 consecutive_errors += 1
256 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
257 wait_time = 2 if not first_start else 5
258 await asyncio.sleep(wait_time, loop=self.loop)
259
260 def kafka_read_callback(self, topic, command, params):
261 order_id = 1
262
263 if topic != "admin" and command != "ping":
264 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
265 self.consecutive_errors = 0
266 self.first_start = False
267 order_id += 1
268 if command == "exit":
269 raise LcmExceptionExit
270 elif command.startswith("#"):
271 return
272 elif command == "echo":
273 # just for test
274 print(params)
275 sys.stdout.flush()
276 return
277 elif command == "test":
278 asyncio.Task(self.test(params), loop=self.loop)
279 return
280
281 if topic == "admin":
282 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
283 if params.get("worker_id") != self.worker_id:
284 return
285 self.pings_not_received = 0
286 try:
287 with open(health_check_file, "w") as f:
288 f.write(str(time()))
289 except Exception as e:
290 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
291 return
292 elif topic == "k8scluster":
293 if command == "create" or command == "created":
294 k8scluster_id = params.get("_id")
295 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
296 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
297 return
298 elif command == "delete" or command == "deleted":
299 k8scluster_id = params.get("_id")
300 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
301 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
302 return
303 elif topic == "k8srepo":
304 if command == "create" or command == "created":
305 k8srepo_id = params.get("_id")
306 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
307 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
308 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
309 return
310 elif command == "delete" or command == "deleted":
311 k8srepo_id = params.get("_id")
312 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
313 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
314 return
315 elif topic == "ns":
316 if command == "instantiate" or command == "instantiated":
317 # self.logger.debug("Deploying NS {}".format(nsr_id))
318 nslcmop = params
319 nslcmop_id = nslcmop["_id"]
320 nsr_id = nslcmop["nsInstanceId"]
321 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
322 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
323 return
324 elif command == "terminate" or command == "terminated":
325 # self.logger.debug("Deleting NS {}".format(nsr_id))
326 nslcmop = params
327 nslcmop_id = nslcmop["_id"]
328 nsr_id = nslcmop["nsInstanceId"]
329 self.lcm_tasks.cancel(topic, nsr_id)
330 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
331 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
332 return
333 elif command == "action":
334 # self.logger.debug("Update NS {}".format(nsr_id))
335 nslcmop = params
336 nslcmop_id = nslcmop["_id"]
337 nsr_id = nslcmop["nsInstanceId"]
338 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
339 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
340 return
341 elif command == "scale":
342 # self.logger.debug("Update NS {}".format(nsr_id))
343 nslcmop = params
344 nslcmop_id = nslcmop["_id"]
345 nsr_id = nslcmop["nsInstanceId"]
346 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
347 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
348 return
349 elif command == "show":
350 nsr_id = params
351 try:
352 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
353 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
354 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
355 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
356 db_nsr["detailed-status"],
357 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
358 except Exception as e:
359 print("nsr {} not found: {}".format(nsr_id, e))
360 sys.stdout.flush()
361 return
362 elif command == "deleted":
363 return # TODO cleaning of task just in case should be done
364 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
365 return
366 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
367 if command == "instantiate" or command == "instantiated":
368 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
369 nsilcmop = params
370 nsilcmop_id = nsilcmop["_id"] # slice operation id
371 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
372 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
373 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
374 return
375 elif command == "terminate" or command == "terminated":
376 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
377 nsilcmop = params
378 nsilcmop_id = nsilcmop["_id"] # slice operation id
379 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
380 self.lcm_tasks.cancel(topic, nsir_id)
381 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
382 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
383 return
384 elif command == "show":
385 nsir_id = params
386 try:
387 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
388 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
389 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
390 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
391 db_nsir["detailed-status"],
392 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
393 except Exception as e:
394 print("nsir {} not found: {}".format(nsir_id, e))
395 sys.stdout.flush()
396 return
397 elif command == "deleted":
398 return # TODO cleaning of task just in case should be done
399 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
400 return
401 elif topic == "vim_account":
402 vim_id = params["_id"]
403 if command == "create" or command == "created":
404 task = asyncio.ensure_future(self.vim.create(params, order_id))
405 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
406 return
407 elif command == "delete" or command == "deleted":
408 self.lcm_tasks.cancel(topic, vim_id)
409 task = asyncio.ensure_future(self.vim.delete(params, order_id))
410 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
411 return
412 elif command == "show":
413 print("not implemented show with vim_account")
414 sys.stdout.flush()
415 return
416 elif command == "edit" or command == "edited":
417 task = asyncio.ensure_future(self.vim.edit(params, order_id))
418 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
419 return
420 elif topic == "wim_account":
421 wim_id = params["_id"]
422 if command == "create" or command == "created":
423 task = asyncio.ensure_future(self.wim.create(params, order_id))
424 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
425 return
426 elif command == "delete" or command == "deleted":
427 self.lcm_tasks.cancel(topic, wim_id)
428 task = asyncio.ensure_future(self.wim.delete(params, order_id))
429 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
430 return
431 elif command == "show":
432 print("not implemented show with wim_account")
433 sys.stdout.flush()
434 return
435 elif command == "edit" or command == "edited":
436 task = asyncio.ensure_future(self.wim.edit(params, order_id))
437 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
438 return
439 elif topic == "sdn":
440 _sdn_id = params["_id"]
441 if command == "create" or command == "created":
442 task = asyncio.ensure_future(self.sdn.create(params, order_id))
443 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
444 return
445 elif command == "delete" or command == "deleted":
446 self.lcm_tasks.cancel(topic, _sdn_id)
447 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
448 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
449 return
450 elif command == "edit" or command == "edited":
451 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
452 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
453 return
454 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
455
456 async def kafka_read(self):
457 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
458 # future = asyncio.Future()
459 self.consecutive_errors = 0
460 self.first_start = True
461 while self.consecutive_errors < 10:
462 try:
463 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
464 topics_admin = ("admin", )
465 await asyncio.gather(
466 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
467 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
468 )
469
470 except LcmExceptionExit:
471 self.logger.debug("Bye!")
472 break
473 except Exception as e:
474 # if not first_start is the first time after starting. So leave more time and wait
475 # to allow kafka starts
476 if self.consecutive_errors == 8 if not self.first_start else 30:
477 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
478 raise
479 self.consecutive_errors += 1
480 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
481 wait_time = 2 if not self.first_start else 5
482 await asyncio.sleep(wait_time, loop=self.loop)
483
484 # self.logger.debug("Task kafka_read terminating")
485 self.logger.debug("Task kafka_read exit")
486
487 def start(self):
488
489 # check RO version
490 self.loop.run_until_complete(self.check_RO_version())
491
492 self.loop.run_until_complete(asyncio.gather(
493 self.kafka_read(),
494 self.kafka_ping()
495 ))
496 # TODO
497 # self.logger.debug("Terminating cancelling creation tasks")
498 # self.lcm_tasks.cancel("ALL", "create")
499 # timeout = 200
500 # while self.is_pending_tasks():
501 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
502 # await asyncio.sleep(2, loop=self.loop)
503 # timeout -= 2
504 # if not timeout:
505 # self.lcm_tasks.cancel("ALL", "ALL")
506 self.loop.close()
507 self.loop = None
508 if self.db:
509 self.db.db_disconnect()
510 if self.msg:
511 self.msg.disconnect()
512 if self.msg_admin:
513 self.msg_admin.disconnect()
514 if self.fs:
515 self.fs.fs_disconnect()
516
517 def read_config_file(self, config_file):
518 # TODO make a [ini] + yaml inside parser
519 # the configparser library is not suitable, because it does not admit comments at the end of line,
520 # and not parse integer or boolean
521 try:
522 with open(config_file) as f:
523 conf = yaml.load(f)
524 for k, v in environ.items():
525 if not k.startswith("OSMLCM_"):
526 continue
527 k_items = k.lower().split("_")
528 if len(k_items) < 3:
529 continue
530 if k_items[1] in ("ro", "vca"):
531 # put in capital letter
532 k_items[1] = k_items[1].upper()
533 c = conf
534 try:
535 for k_item in k_items[1:-1]:
536 c = c[k_item]
537 if k_items[-1] == "port":
538 c[k_items[-1]] = int(v)
539 else:
540 c[k_items[-1]] = v
541 except Exception as e:
542 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
543
544 return conf
545 except Exception as e:
546 self.logger.critical("At config file '{}': {}".format(config_file, e))
547 exit(1)
548
549 @staticmethod
550 def get_process_id():
551 """
552 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
553 will provide a random one
554 :return: Obtained ID
555 """
556 # Try getting docker id. If fails, get pid
557 try:
558 with open("/proc/self/cgroup", "r") as f:
559 text_id_ = f.readline()
560 _, _, text_id = text_id_.rpartition("/")
561 text_id = text_id.replace('\n', '')[:12]
562 if text_id:
563 return text_id
564 except Exception:
565 pass
566 # Return a random id
567 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
568
569
570 def usage():
571 print("""Usage: {} [options]
572 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
573 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
574 -h|--help: shows this help
575 """.format(sys.argv[0]))
576 # --log-socket-host HOST: send logs to this host")
577 # --log-socket-port PORT: send logs using this port (default: 9022)")
578
579
580 def health_check():
581 retry = 2
582 while retry:
583 retry -= 1
584 try:
585 with open(health_check_file, "r") as f:
586 last_received_ping = f.read()
587
588 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
589 exit(0)
590 except Exception:
591 pass
592 if retry:
593 sleep(6)
594 exit(1)
595
596
597 if __name__ == '__main__':
598
599 try:
600 # load parameters and configuration
601 # -h
602 # -c value
603 # --config value
604 # --help
605 # --health-check
606 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
607 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
608 config_file = None
609 for o, a in opts:
610 if o in ("-h", "--help"):
611 usage()
612 sys.exit()
613 elif o in ("-c", "--config"):
614 config_file = a
615 elif o == "--health-check":
616 health_check()
617 # elif o == "--log-socket-port":
618 # log_socket_port = a
619 # elif o == "--log-socket-host":
620 # log_socket_host = a
621 # elif o == "--log-file":
622 # log_file = a
623 else:
624 assert False, "Unhandled option"
625
626 if config_file:
627 if not path.isfile(config_file):
628 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
629 exit(1)
630 else:
631 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
632 if path.isfile(config_file):
633 break
634 else:
635 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
636 exit(1)
637 lcm = Lcm(config_file)
638 lcm.start()
639 except (LcmException, getopt.GetoptError) as e:
640 print(str(e), file=sys.stderr)
641 # usage()
642 exit(1)