Get version from installed package if available instead of code
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as _lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 lcm_version = _lcm_version
60 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
61
62
63 class Lcm:
64
65 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
66 ping_interval_boot = 5 # how many time ping is sent when booting
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 global lcm_version
75 self.db = None
76 self.msg = None
77 self.msg_admin = None
78 self.fs = None
79 self.pings_not_received = 1
80 self.consecutive_errors = 0
81 self.first_start = False
82
83 # logging
84 self.logger = logging.getLogger('lcm')
85 # get id
86 self.worker_id = self.get_process_id()
87 # load configuration
88 config = self.read_config_file(config_file)
89 self.config = config
90 self.ro_config = {
91 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.ROclient",
94 "loglevel": "ERROR",
95 }
96
97 self.vca_config = config["VCA"]
98
99 self.loop = loop or asyncio.get_event_loop()
100
101 # logging
102 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
103 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
104 config["database"]["logger_name"] = "lcm.db"
105 config["storage"]["logger_name"] = "lcm.fs"
106 config["message"]["logger_name"] = "lcm.msg"
107 if config["global"].get("logfile"):
108 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
109 maxBytes=100e6, backupCount=9, delay=0)
110 file_handler.setFormatter(log_formatter_simple)
111 self.logger.addHandler(file_handler)
112 if not config["global"].get("nologging"):
113 str_handler = logging.StreamHandler()
114 str_handler.setFormatter(log_formatter_simple)
115 self.logger.addHandler(str_handler)
116
117 if config["global"].get("loglevel"):
118 self.logger.setLevel(config["global"]["loglevel"])
119
120 # logging other modules
121 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
122 config[k1]["logger_name"] = logname
123 logger_module = logging.getLogger(logname)
124 if config[k1].get("logfile"):
125 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
126 maxBytes=100e6, backupCount=9, delay=0)
127 file_handler.setFormatter(log_formatter_simple)
128 logger_module.addHandler(file_handler)
129 if config[k1].get("loglevel"):
130 logger_module.setLevel(config[k1]["loglevel"])
131 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
132
133 # check version of N2VC
134 # TODO enhance with int conversion or from distutils.version import LooseVersion
135 # or with list(map(int, version.split(".")))
136 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
137 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
138 n2vc_version, min_n2vc_version))
139 # check version of common
140 if versiontuple(common_version) < versiontuple(min_common_version):
141 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
142 common_version, min_common_version))
143
144 try:
145 # TODO check database version
146 if config["database"]["driver"] == "mongo":
147 self.db = dbmongo.DbMongo()
148 self.db.db_connect(config["database"])
149 elif config["database"]["driver"] == "memory":
150 self.db = dbmemory.DbMemory()
151 self.db.db_connect(config["database"])
152 else:
153 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
154 config["database"]["driver"]))
155
156 if config["storage"]["driver"] == "local":
157 self.fs = fslocal.FsLocal()
158 self.fs.fs_connect(config["storage"])
159 elif config["storage"]["driver"] == "mongo":
160 self.fs = fsmongo.FsMongo()
161 self.fs.fs_connect(config["storage"])
162 else:
163 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
164 config["storage"]["driver"]))
165
166 # copy message configuration in order to remove 'group_id' for msg_admin
167 config_message = config["message"].copy()
168 config_message["loop"] = self.loop
169 if config_message["driver"] == "local":
170 self.msg = msglocal.MsgLocal()
171 self.msg.connect(config_message)
172 self.msg_admin = msglocal.MsgLocal()
173 config_message.pop("group_id", None)
174 self.msg_admin.connect(config_message)
175 elif config_message["driver"] == "kafka":
176 self.msg = msgkafka.MsgKafka()
177 self.msg.connect(config_message)
178 self.msg_admin = msgkafka.MsgKafka()
179 config_message.pop("group_id", None)
180 self.msg_admin.connect(config_message)
181 else:
182 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
183 config["message"]["driver"]))
184 except (DbException, FsException, MsgException) as e:
185 self.logger.critical(str(e), exc_info=True)
186 raise LcmException(str(e))
187
188 # contains created tasks/futures to be able to cancel
189 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
190
191 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
192 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
193 self.vca_config, self.loop)
194 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
195 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
196 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
197 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
198 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
199
200 async def check_RO_version(self):
201 tries = 14
202 last_error = None
203 while True:
204 try:
205 ro_server = ROclient.ROClient(self.loop, **self.ro_config)
206 ro_version = await ro_server.get_version()
207 if versiontuple(ro_version) < versiontuple(min_RO_version):
208 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
209 ro_version, min_RO_version))
210 self.logger.info("Connected to RO version {}".format(ro_version))
211 return
212 except ROclient.ROClientException as e:
213 tries -= 1
214 error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e)
215 if tries <= 0:
216 self.logger.critical(error_text)
217 raise LcmException(error_text)
218 if last_error != error_text:
219 last_error = error_text
220 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
221 await asyncio.sleep(5)
222
223 async def test(self, param=None):
224 self.logger.debug("Starting/Ending test task: {}".format(param))
225
226 async def kafka_ping(self):
227 global lcm_version
228 self.logger.debug("Task kafka_ping Enter")
229 consecutive_errors = 0
230 first_start = True
231 kafka_has_received = False
232 self.pings_not_received = 1
233 while True:
234 try:
235 await self.msg_admin.aiowrite(
236 "admin", "ping",
237 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
238 self.loop)
239 # time between pings are low when it is not received and at starting
240 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
241 if not self.pings_not_received:
242 kafka_has_received = True
243 self.pings_not_received += 1
244 await asyncio.sleep(wait_time, loop=self.loop)
245 if self.pings_not_received > 10:
246 raise LcmException("It is not receiving pings from Kafka bus")
247 consecutive_errors = 0
248 first_start = False
249 except LcmException:
250 raise
251 except Exception as e:
252 # if not first_start is the first time after starting. So leave more time and wait
253 # to allow kafka starts
254 if consecutive_errors == 8 if not first_start else 30:
255 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
256 raise
257 consecutive_errors += 1
258 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
259 wait_time = 2 if not first_start else 5
260 await asyncio.sleep(wait_time, loop=self.loop)
261
262 def kafka_read_callback(self, topic, command, params):
263 order_id = 1
264
265 if topic != "admin" and command != "ping":
266 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
267 self.consecutive_errors = 0
268 self.first_start = False
269 order_id += 1
270 if command == "exit":
271 raise LcmExceptionExit
272 elif command.startswith("#"):
273 return
274 elif command == "echo":
275 # just for test
276 print(params)
277 sys.stdout.flush()
278 return
279 elif command == "test":
280 asyncio.Task(self.test(params), loop=self.loop)
281 return
282
283 if topic == "admin":
284 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
285 if params.get("worker_id") != self.worker_id:
286 return
287 self.pings_not_received = 0
288 try:
289 with open(health_check_file, "w") as f:
290 f.write(str(time()))
291 except Exception as e:
292 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
293 return
294 elif topic == "k8scluster":
295 if command == "create" or command == "created":
296 k8scluster_id = params.get("_id")
297 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
298 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
299 return
300 elif command == "delete" or command == "deleted":
301 k8scluster_id = params.get("_id")
302 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
303 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
304 return
305 elif topic == "k8srepo":
306 if command == "create" or command == "created":
307 k8srepo_id = params.get("_id")
308 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
309 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
310 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
311 return
312 elif command == "delete" or command == "deleted":
313 k8srepo_id = params.get("_id")
314 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
315 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
316 return
317 elif topic == "ns":
318 if command == "instantiate" or command == "instantiated":
319 # self.logger.debug("Deploying NS {}".format(nsr_id))
320 nslcmop = params
321 nslcmop_id = nslcmop["_id"]
322 nsr_id = nslcmop["nsInstanceId"]
323 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
324 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
325 return
326 elif command == "terminate" or command == "terminated":
327 # self.logger.debug("Deleting NS {}".format(nsr_id))
328 nslcmop = params
329 nslcmop_id = nslcmop["_id"]
330 nsr_id = nslcmop["nsInstanceId"]
331 self.lcm_tasks.cancel(topic, nsr_id)
332 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
333 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
334 return
335 elif command == "action":
336 # self.logger.debug("Update NS {}".format(nsr_id))
337 nslcmop = params
338 nslcmop_id = nslcmop["_id"]
339 nsr_id = nslcmop["nsInstanceId"]
340 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
341 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
342 return
343 elif command == "scale":
344 # self.logger.debug("Update NS {}".format(nsr_id))
345 nslcmop = params
346 nslcmop_id = nslcmop["_id"]
347 nsr_id = nslcmop["nsInstanceId"]
348 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
349 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
350 return
351 elif command == "show":
352 nsr_id = params
353 try:
354 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
355 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
356 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
357 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
358 db_nsr["detailed-status"],
359 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
360 except Exception as e:
361 print("nsr {} not found: {}".format(nsr_id, e))
362 sys.stdout.flush()
363 return
364 elif command == "deleted":
365 return # TODO cleaning of task just in case should be done
366 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
367 return
368 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
369 if command == "instantiate" or command == "instantiated":
370 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
371 nsilcmop = params
372 nsilcmop_id = nsilcmop["_id"] # slice operation id
373 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
374 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
375 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
376 return
377 elif command == "terminate" or command == "terminated":
378 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
379 nsilcmop = params
380 nsilcmop_id = nsilcmop["_id"] # slice operation id
381 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
382 self.lcm_tasks.cancel(topic, nsir_id)
383 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
384 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
385 return
386 elif command == "show":
387 nsir_id = params
388 try:
389 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
390 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
391 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
392 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
393 db_nsir["detailed-status"],
394 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
395 except Exception as e:
396 print("nsir {} not found: {}".format(nsir_id, e))
397 sys.stdout.flush()
398 return
399 elif command == "deleted":
400 return # TODO cleaning of task just in case should be done
401 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
402 return
403 elif topic == "vim_account":
404 vim_id = params["_id"]
405 if command in ("create", "created"):
406 task = asyncio.ensure_future(self.vim.create(params, order_id))
407 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
408 return
409 elif command == "delete" or command == "deleted":
410 self.lcm_tasks.cancel(topic, vim_id)
411 task = asyncio.ensure_future(self.vim.delete(params, order_id))
412 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
413 return
414 elif command == "show":
415 print("not implemented show with vim_account")
416 sys.stdout.flush()
417 return
418 elif command in ("edit", "edited"):
419 task = asyncio.ensure_future(self.vim.edit(params, order_id))
420 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
421 return
422 elif command == "deleted":
423 return # TODO cleaning of task just in case should be done
424 elif topic == "wim_account":
425 wim_id = params["_id"]
426 if command in ("create", "created"):
427 task = asyncio.ensure_future(self.wim.create(params, order_id))
428 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
429 return
430 elif command == "delete" or command == "deleted":
431 self.lcm_tasks.cancel(topic, wim_id)
432 task = asyncio.ensure_future(self.wim.delete(params, order_id))
433 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
434 return
435 elif command == "show":
436 print("not implemented show with wim_account")
437 sys.stdout.flush()
438 return
439 elif command in ("edit", "edited"):
440 task = asyncio.ensure_future(self.wim.edit(params, order_id))
441 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
442 return
443 elif command == "deleted":
444 return # TODO cleaning of task just in case should be done
445 elif topic == "sdn":
446 _sdn_id = params["_id"]
447 if command in ("create", "created"):
448 task = asyncio.ensure_future(self.sdn.create(params, order_id))
449 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
450 return
451 elif command == "delete" or command == "deleted":
452 self.lcm_tasks.cancel(topic, _sdn_id)
453 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
454 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
455 return
456 elif command in ("edit", "edited"):
457 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
458 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
459 return
460 elif command == "deleted":
461 return # TODO cleaning of task just in case should be done
462 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
463
464 async def kafka_read(self):
465 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
466 # future = asyncio.Future()
467 self.consecutive_errors = 0
468 self.first_start = True
469 while self.consecutive_errors < 10:
470 try:
471 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
472 topics_admin = ("admin", )
473 await asyncio.gather(
474 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
475 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
476 )
477
478 except LcmExceptionExit:
479 self.logger.debug("Bye!")
480 break
481 except Exception as e:
482 # if not first_start is the first time after starting. So leave more time and wait
483 # to allow kafka starts
484 if self.consecutive_errors == 8 if not self.first_start else 30:
485 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
486 raise
487 self.consecutive_errors += 1
488 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
489 wait_time = 2 if not self.first_start else 5
490 await asyncio.sleep(wait_time, loop=self.loop)
491
492 # self.logger.debug("Task kafka_read terminating")
493 self.logger.debug("Task kafka_read exit")
494
495 def start(self):
496
497 # check RO version
498 self.loop.run_until_complete(self.check_RO_version())
499
500 self.loop.run_until_complete(asyncio.gather(
501 self.kafka_read(),
502 self.kafka_ping()
503 ))
504 # TODO
505 # self.logger.debug("Terminating cancelling creation tasks")
506 # self.lcm_tasks.cancel("ALL", "create")
507 # timeout = 200
508 # while self.is_pending_tasks():
509 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
510 # await asyncio.sleep(2, loop=self.loop)
511 # timeout -= 2
512 # if not timeout:
513 # self.lcm_tasks.cancel("ALL", "ALL")
514 self.loop.close()
515 self.loop = None
516 if self.db:
517 self.db.db_disconnect()
518 if self.msg:
519 self.msg.disconnect()
520 if self.msg_admin:
521 self.msg_admin.disconnect()
522 if self.fs:
523 self.fs.fs_disconnect()
524
525 def read_config_file(self, config_file):
526 # TODO make a [ini] + yaml inside parser
527 # the configparser library is not suitable, because it does not admit comments at the end of line,
528 # and not parse integer or boolean
529 try:
530 with open(config_file) as f:
531 conf = yaml.load(f, Loader=yaml.Loader)
532 for k, v in environ.items():
533 if not k.startswith("OSMLCM_"):
534 continue
535 k_items = k.lower().split("_")
536 if len(k_items) < 3:
537 continue
538 if k_items[1] in ("ro", "vca"):
539 # put in capital letter
540 k_items[1] = k_items[1].upper()
541 c = conf
542 try:
543 for k_item in k_items[1:-1]:
544 c = c[k_item]
545 if k_items[-1] == "port":
546 c[k_items[-1]] = int(v)
547 else:
548 c[k_items[-1]] = v
549 except Exception as e:
550 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
551
552 return conf
553 except Exception as e:
554 self.logger.critical("At config file '{}': {}".format(config_file, e))
555 exit(1)
556
557 @staticmethod
558 def get_process_id():
559 """
560 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
561 will provide a random one
562 :return: Obtained ID
563 """
564 # Try getting docker id. If fails, get pid
565 try:
566 with open("/proc/self/cgroup", "r") as f:
567 text_id_ = f.readline()
568 _, _, text_id = text_id_.rpartition("/")
569 text_id = text_id.replace('\n', '')[:12]
570 if text_id:
571 return text_id
572 except Exception:
573 pass
574 # Return a random id
575 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
576
577
578 def _get_version():
579 """
580 Try to get version from package using pkg_resources (available with setuptools)
581 """
582 global lcm_version
583 try:
584 from pkg_resources import get_distribution
585 lcm_version = get_distribution("osm_lcm").version
586 except Exception:
587 pass
588
589
590 def usage():
591 print("""Usage: {} [options]
592 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
593 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
594 -h|--help: shows this help
595 """.format(sys.argv[0]))
596 # --log-socket-host HOST: send logs to this host")
597 # --log-socket-port PORT: send logs using this port (default: 9022)")
598
599
600 def health_check():
601 retry = 2
602 while retry:
603 retry -= 1
604 try:
605 with open(health_check_file, "r") as f:
606 last_received_ping = f.read()
607
608 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
609 exit(0)
610 except Exception:
611 pass
612 if retry:
613 sleep(6)
614 exit(1)
615
616
617 if __name__ == '__main__':
618
619 try:
620 print("SYS.PATH='{}'".format(sys.path))
621 # load parameters and configuration
622 # -h
623 # -c value
624 # --config value
625 # --help
626 # --health-check
627 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
628 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
629 config_file = None
630 for o, a in opts:
631 if o in ("-h", "--help"):
632 usage()
633 sys.exit()
634 elif o in ("-c", "--config"):
635 config_file = a
636 elif o == "--health-check":
637 health_check()
638 # elif o == "--log-socket-port":
639 # log_socket_port = a
640 # elif o == "--log-socket-host":
641 # log_socket_host = a
642 # elif o == "--log-file":
643 # log_file = a
644 else:
645 assert False, "Unhandled option"
646
647 if config_file:
648 if not path.isfile(config_file):
649 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
650 exit(1)
651 else:
652 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
653 if path.isfile(config_file):
654 break
655 else:
656 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
657 exit(1)
658 # get version from package and upate global lcm_version
659 _get_version()
660 lcm = Lcm(config_file)
661 lcm.start()
662 except (LcmException, getopt.GetoptError) as e:
663 print(str(e), file=sys.stderr)
664 # usage()
665 exit(1)