Merge branch feature7928. It includes feature5837
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 # uncomment if LCM is installed as library and installed, and get them from __init__.py
60 # lcm_version = '0.1.41'
61 # lcm_version_date = '2019-06-19'
62 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
63
64
65 class Lcm:
66
67 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
68 ping_interval_boot = 5 # how many time ping is sent when booting
69
70 def __init__(self, config_file, loop=None):
71 """
72 Init, Connect to database, filesystem storage, and messaging
73 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
74 :return: None
75 """
76
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.ro_config = {
93 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
94 "tenant": config.get("tenant", "osm"),
95 "logger_name": "lcm.ROclient",
96 "loglevel": "ERROR",
97 }
98
99 self.vca_config = config["VCA"]
100
101 self.loop = loop or asyncio.get_event_loop()
102
103 # logging
104 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
105 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
106 config["database"]["logger_name"] = "lcm.db"
107 config["storage"]["logger_name"] = "lcm.fs"
108 config["message"]["logger_name"] = "lcm.msg"
109 if config["global"].get("logfile"):
110 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
111 maxBytes=100e6, backupCount=9, delay=0)
112 file_handler.setFormatter(log_formatter_simple)
113 self.logger.addHandler(file_handler)
114 if not config["global"].get("nologging"):
115 str_handler = logging.StreamHandler()
116 str_handler.setFormatter(log_formatter_simple)
117 self.logger.addHandler(str_handler)
118
119 if config["global"].get("loglevel"):
120 self.logger.setLevel(config["global"]["loglevel"])
121
122 # logging other modules
123 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
124 config[k1]["logger_name"] = logname
125 logger_module = logging.getLogger(logname)
126 if config[k1].get("logfile"):
127 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
128 maxBytes=100e6, backupCount=9, delay=0)
129 file_handler.setFormatter(log_formatter_simple)
130 logger_module.addHandler(file_handler)
131 if config[k1].get("loglevel"):
132 logger_module.setLevel(config[k1]["loglevel"])
133 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
134
135 # check version of N2VC
136 # TODO enhance with int conversion or from distutils.version import LooseVersion
137 # or with list(map(int, version.split(".")))
138 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
139 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
140 n2vc_version, min_n2vc_version))
141 # check version of common
142 if versiontuple(common_version) < versiontuple(min_common_version):
143 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
144 common_version, min_common_version))
145
146 try:
147 # TODO check database version
148 if config["database"]["driver"] == "mongo":
149 self.db = dbmongo.DbMongo()
150 self.db.db_connect(config["database"])
151 elif config["database"]["driver"] == "memory":
152 self.db = dbmemory.DbMemory()
153 self.db.db_connect(config["database"])
154 else:
155 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
156 config["database"]["driver"]))
157
158 if config["storage"]["driver"] == "local":
159 self.fs = fslocal.FsLocal()
160 self.fs.fs_connect(config["storage"])
161 else:
162 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
163 config["storage"]["driver"]))
164
165 # copy message configuration in order to remove 'group_id' for msg_admin
166 config_message = config["message"].copy()
167 config_message["loop"] = self.loop
168 if config_message["driver"] == "local":
169 self.msg = msglocal.MsgLocal()
170 self.msg.connect(config_message)
171 self.msg_admin = msglocal.MsgLocal()
172 config_message.pop("group_id", None)
173 self.msg_admin.connect(config_message)
174 elif config_message["driver"] == "kafka":
175 self.msg = msgkafka.MsgKafka()
176 self.msg.connect(config_message)
177 self.msg_admin = msgkafka.MsgKafka()
178 config_message.pop("group_id", None)
179 self.msg_admin.connect(config_message)
180 else:
181 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
182 config["message"]["driver"]))
183 except (DbException, FsException, MsgException) as e:
184 self.logger.critical(str(e), exc_info=True)
185 raise LcmException(str(e))
186
187 # contains created tasks/futures to be able to cancel
188 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
189
190 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
191 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
192 self.vca_config, self.loop)
193 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
194 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
195 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
196 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
197 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
198
199 async def check_RO_version(self):
200 tries = 14
201 last_error = None
202 while True:
203 try:
204 ro_server = ROclient.ROClient(self.loop, **self.ro_config)
205 ro_version = await ro_server.get_version()
206 if versiontuple(ro_version) < versiontuple(min_RO_version):
207 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
208 ro_version, min_RO_version))
209 self.logger.info("Connected to RO version {}".format(ro_version))
210 return
211 except ROclient.ROClientException as e:
212 tries -= 1
213 error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e)
214 if tries <= 0:
215 self.logger.critical(error_text)
216 raise LcmException(error_text)
217 if last_error != error_text:
218 last_error = error_text
219 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
220 await asyncio.sleep(5)
221
222 async def test(self, param=None):
223 self.logger.debug("Starting/Ending test task: {}".format(param))
224
225 async def kafka_ping(self):
226 self.logger.debug("Task kafka_ping Enter")
227 consecutive_errors = 0
228 first_start = True
229 kafka_has_received = False
230 self.pings_not_received = 1
231 while True:
232 try:
233 await self.msg_admin.aiowrite(
234 "admin", "ping",
235 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
236 self.loop)
237 # time between pings are low when it is not received and at starting
238 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
239 if not self.pings_not_received:
240 kafka_has_received = True
241 self.pings_not_received += 1
242 await asyncio.sleep(wait_time, loop=self.loop)
243 if self.pings_not_received > 10:
244 raise LcmException("It is not receiving pings from Kafka bus")
245 consecutive_errors = 0
246 first_start = False
247 except LcmException:
248 raise
249 except Exception as e:
250 # if not first_start is the first time after starting. So leave more time and wait
251 # to allow kafka starts
252 if consecutive_errors == 8 if not first_start else 30:
253 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
254 raise
255 consecutive_errors += 1
256 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
257 wait_time = 2 if not first_start else 5
258 await asyncio.sleep(wait_time, loop=self.loop)
259
260 def kafka_read_callback(self, topic, command, params):
261 order_id = 1
262
263 if topic != "admin" and command != "ping":
264 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
265 self.consecutive_errors = 0
266 self.first_start = False
267 order_id += 1
268 if command == "exit":
269 raise LcmExceptionExit
270 elif command.startswith("#"):
271 return
272 elif command == "echo":
273 # just for test
274 print(params)
275 sys.stdout.flush()
276 return
277 elif command == "test":
278 asyncio.Task(self.test(params), loop=self.loop)
279 return
280
281 if topic == "admin":
282 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
283 if params.get("worker_id") != self.worker_id:
284 return
285 self.pings_not_received = 0
286 try:
287 with open(health_check_file, "w") as f:
288 f.write(str(time()))
289 except Exception as e:
290 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
291 return
292 elif topic == "k8scluster":
293 if command == "create" or command == "created":
294 k8scluster_id = params.get("_id")
295 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
296 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
297 return
298 elif command == "delete" or command == "deleted":
299 k8scluster_id = params.get("_id")
300 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
301 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
302 return
303 elif topic == "k8srepo":
304 if command == "create" or command == "created":
305 k8srepo_id = params.get("_id")
306 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
307 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
308 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
309 return
310 elif command == "delete" or command == "deleted":
311 k8srepo_id = params.get("_id")
312 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
313 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
314 return
315 elif topic == "ns":
316 if command == "instantiate" or command == "instantiated":
317 # self.logger.debug("Deploying NS {}".format(nsr_id))
318 nslcmop = params
319 nslcmop_id = nslcmop["_id"]
320 nsr_id = nslcmop["nsInstanceId"]
321 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
322 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
323 return
324 elif command == "terminate" or command == "terminated":
325 # self.logger.debug("Deleting NS {}".format(nsr_id))
326 nslcmop = params
327 nslcmop_id = nslcmop["_id"]
328 nsr_id = nslcmop["nsInstanceId"]
329 self.lcm_tasks.cancel(topic, nsr_id)
330 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
331 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
332 return
333 elif command == "action":
334 # self.logger.debug("Update NS {}".format(nsr_id))
335 nslcmop = params
336 nslcmop_id = nslcmop["_id"]
337 nsr_id = nslcmop["nsInstanceId"]
338 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
339 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
340 return
341 elif command == "scale":
342 # self.logger.debug("Update NS {}".format(nsr_id))
343 nslcmop = params
344 nslcmop_id = nslcmop["_id"]
345 nsr_id = nslcmop["nsInstanceId"]
346 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
347 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
348 return
349 elif command == "show":
350 nsr_id = params
351 try:
352 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
353 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
354 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
355 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
356 db_nsr["detailed-status"],
357 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
358 except Exception as e:
359 print("nsr {} not found: {}".format(nsr_id, e))
360 sys.stdout.flush()
361 return
362 elif command == "deleted":
363 return # TODO cleaning of task just in case should be done
364 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
365 return
366 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
367 if command == "instantiate" or command == "instantiated":
368 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
369 nsilcmop = params
370 nsilcmop_id = nsilcmop["_id"] # slice operation id
371 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
372 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
373 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
374 return
375 elif command == "terminate" or command == "terminated":
376 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
377 nsilcmop = params
378 nsilcmop_id = nsilcmop["_id"] # slice operation id
379 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
380 self.lcm_tasks.cancel(topic, nsir_id)
381 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
382 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
383 return
384 elif command == "show":
385 nsir_id = params
386 try:
387 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
388 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
389 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
390 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
391 db_nsir["detailed-status"],
392 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
393 except Exception as e:
394 print("nsir {} not found: {}".format(nsir_id, e))
395 sys.stdout.flush()
396 return
397 elif command == "deleted":
398 return # TODO cleaning of task just in case should be done
399 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
400 return
401 elif topic == "vim_account":
402 vim_id = params["_id"]
403 if command in ("create", "created"):
404 task = asyncio.ensure_future(self.vim.create(params, order_id))
405 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
406 return
407 elif command == "delete" or command == "deleted":
408 self.lcm_tasks.cancel(topic, vim_id)
409 task = asyncio.ensure_future(self.vim.delete(params, order_id))
410 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
411 return
412 elif command == "show":
413 print("not implemented show with vim_account")
414 sys.stdout.flush()
415 return
416 elif command in ("edit", "edited"):
417 task = asyncio.ensure_future(self.vim.edit(params, order_id))
418 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
419 return
420 elif command == "deleted":
421 return # TODO cleaning of task just in case should be done
422 elif topic == "wim_account":
423 wim_id = params["_id"]
424 if command in ("create", "created"):
425 task = asyncio.ensure_future(self.wim.create(params, order_id))
426 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
427 return
428 elif command == "delete" or command == "deleted":
429 self.lcm_tasks.cancel(topic, wim_id)
430 task = asyncio.ensure_future(self.wim.delete(params, order_id))
431 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
432 return
433 elif command == "show":
434 print("not implemented show with wim_account")
435 sys.stdout.flush()
436 return
437 elif command in ("edit", "edited"):
438 task = asyncio.ensure_future(self.wim.edit(params, order_id))
439 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
440 return
441 elif command == "deleted":
442 return # TODO cleaning of task just in case should be done
443 elif topic == "sdn":
444 _sdn_id = params["_id"]
445 if command in ("create", "created"):
446 task = asyncio.ensure_future(self.sdn.create(params, order_id))
447 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
448 return
449 elif command == "delete" or command == "deleted":
450 self.lcm_tasks.cancel(topic, _sdn_id)
451 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
452 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
453 return
454 elif command in ("edit", "edited"):
455 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
456 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
457 return
458 elif command == "deleted":
459 return # TODO cleaning of task just in case should be done
460 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
461
462 async def kafka_read(self):
463 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
464 # future = asyncio.Future()
465 self.consecutive_errors = 0
466 self.first_start = True
467 while self.consecutive_errors < 10:
468 try:
469 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
470 topics_admin = ("admin", )
471 await asyncio.gather(
472 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
473 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
474 )
475
476 except LcmExceptionExit:
477 self.logger.debug("Bye!")
478 break
479 except Exception as e:
480 # if not first_start is the first time after starting. So leave more time and wait
481 # to allow kafka starts
482 if self.consecutive_errors == 8 if not self.first_start else 30:
483 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
484 raise
485 self.consecutive_errors += 1
486 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
487 wait_time = 2 if not self.first_start else 5
488 await asyncio.sleep(wait_time, loop=self.loop)
489
490 # self.logger.debug("Task kafka_read terminating")
491 self.logger.debug("Task kafka_read exit")
492
493 def start(self):
494
495 # check RO version
496 self.loop.run_until_complete(self.check_RO_version())
497
498 self.loop.run_until_complete(asyncio.gather(
499 self.kafka_read(),
500 self.kafka_ping()
501 ))
502 # TODO
503 # self.logger.debug("Terminating cancelling creation tasks")
504 # self.lcm_tasks.cancel("ALL", "create")
505 # timeout = 200
506 # while self.is_pending_tasks():
507 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
508 # await asyncio.sleep(2, loop=self.loop)
509 # timeout -= 2
510 # if not timeout:
511 # self.lcm_tasks.cancel("ALL", "ALL")
512 self.loop.close()
513 self.loop = None
514 if self.db:
515 self.db.db_disconnect()
516 if self.msg:
517 self.msg.disconnect()
518 if self.msg_admin:
519 self.msg_admin.disconnect()
520 if self.fs:
521 self.fs.fs_disconnect()
522
523 def read_config_file(self, config_file):
524 # TODO make a [ini] + yaml inside parser
525 # the configparser library is not suitable, because it does not admit comments at the end of line,
526 # and not parse integer or boolean
527 try:
528 with open(config_file) as f:
529 conf = yaml.load(f, Loader=yaml.Loader)
530 for k, v in environ.items():
531 if not k.startswith("OSMLCM_"):
532 continue
533 k_items = k.lower().split("_")
534 if len(k_items) < 3:
535 continue
536 if k_items[1] in ("ro", "vca"):
537 # put in capital letter
538 k_items[1] = k_items[1].upper()
539 c = conf
540 try:
541 for k_item in k_items[1:-1]:
542 c = c[k_item]
543 if k_items[-1] == "port":
544 c[k_items[-1]] = int(v)
545 else:
546 c[k_items[-1]] = v
547 except Exception as e:
548 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
549
550 return conf
551 except Exception as e:
552 self.logger.critical("At config file '{}': {}".format(config_file, e))
553 exit(1)
554
555 @staticmethod
556 def get_process_id():
557 """
558 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
559 will provide a random one
560 :return: Obtained ID
561 """
562 # Try getting docker id. If fails, get pid
563 try:
564 with open("/proc/self/cgroup", "r") as f:
565 text_id_ = f.readline()
566 _, _, text_id = text_id_.rpartition("/")
567 text_id = text_id.replace('\n', '')[:12]
568 if text_id:
569 return text_id
570 except Exception:
571 pass
572 # Return a random id
573 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
574
575
576 def usage():
577 print("""Usage: {} [options]
578 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
579 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
580 -h|--help: shows this help
581 """.format(sys.argv[0]))
582 # --log-socket-host HOST: send logs to this host")
583 # --log-socket-port PORT: send logs using this port (default: 9022)")
584
585
586 def health_check():
587 retry = 2
588 while retry:
589 retry -= 1
590 try:
591 with open(health_check_file, "r") as f:
592 last_received_ping = f.read()
593
594 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
595 exit(0)
596 except Exception:
597 pass
598 if retry:
599 sleep(6)
600 exit(1)
601
602
603 if __name__ == '__main__':
604
605 try:
606 print("SYS.PATH='{}'".format(sys.path))
607 # load parameters and configuration
608 # -h
609 # -c value
610 # --config value
611 # --help
612 # --health-check
613 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
614 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
615 config_file = None
616 for o, a in opts:
617 if o in ("-h", "--help"):
618 usage()
619 sys.exit()
620 elif o in ("-c", "--config"):
621 config_file = a
622 elif o == "--health-check":
623 health_check()
624 # elif o == "--log-socket-port":
625 # log_socket_port = a
626 # elif o == "--log-socket-host":
627 # log_socket_host = a
628 # elif o == "--log-file":
629 # log_file = a
630 else:
631 assert False, "Unhandled option"
632
633 if config_file:
634 if not path.isfile(config_file):
635 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
636 exit(1)
637 else:
638 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
639 if path.isfile(config_file):
640 break
641 else:
642 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
643 exit(1)
644 lcm = Lcm(config_file)
645 lcm.start()
646 except (LcmException, getopt.GetoptError) as e:
647 print(str(e), file=sys.stderr)
648 # usage()
649 exit(1)