de829bd5c3d2733b227ff78f3e7d1db3bf3c8373
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 # uncomment if LCM is installed as library and installed, and get them from __init__.py
60 # lcm_version = '0.1.41'
61 # lcm_version_date = '2019-06-19'
62 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
63
64
65 class Lcm:
66
67 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
68 ping_interval_boot = 5 # how many time ping is sent when booting
69
70 def __init__(self, config_file, loop=None):
71 """
72 Init, Connect to database, filesystem storage, and messaging
73 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
74 :return: None
75 """
76
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.ro_config = {
93 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
94 "tenant": config.get("tenant", "osm"),
95 "logger_name": "lcm.ROclient",
96 "loglevel": "ERROR",
97 }
98
99 self.vca_config = config["VCA"]
100
101 self.loop = loop or asyncio.get_event_loop()
102
103 # logging
104 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
105 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
106 config["database"]["logger_name"] = "lcm.db"
107 config["storage"]["logger_name"] = "lcm.fs"
108 config["message"]["logger_name"] = "lcm.msg"
109 if config["global"].get("logfile"):
110 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
111 maxBytes=100e6, backupCount=9, delay=0)
112 file_handler.setFormatter(log_formatter_simple)
113 self.logger.addHandler(file_handler)
114 if not config["global"].get("nologging"):
115 str_handler = logging.StreamHandler()
116 str_handler.setFormatter(log_formatter_simple)
117 self.logger.addHandler(str_handler)
118
119 if config["global"].get("loglevel"):
120 self.logger.setLevel(config["global"]["loglevel"])
121
122 # logging other modules
123 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
124 config[k1]["logger_name"] = logname
125 logger_module = logging.getLogger(logname)
126 if config[k1].get("logfile"):
127 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
128 maxBytes=100e6, backupCount=9, delay=0)
129 file_handler.setFormatter(log_formatter_simple)
130 logger_module.addHandler(file_handler)
131 if config[k1].get("loglevel"):
132 logger_module.setLevel(config[k1]["loglevel"])
133 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
134
135 # check version of N2VC
136 # TODO enhance with int conversion or from distutils.version import LooseVersion
137 # or with list(map(int, version.split(".")))
138 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
139 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
140 n2vc_version, min_n2vc_version))
141 # check version of common
142 if versiontuple(common_version) < versiontuple(min_common_version):
143 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
144 common_version, min_common_version))
145
146 try:
147 # TODO check database version
148 if config["database"]["driver"] == "mongo":
149 self.db = dbmongo.DbMongo()
150 self.db.db_connect(config["database"])
151 elif config["database"]["driver"] == "memory":
152 self.db = dbmemory.DbMemory()
153 self.db.db_connect(config["database"])
154 else:
155 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
156 config["database"]["driver"]))
157
158 if config["storage"]["driver"] == "local":
159 self.fs = fslocal.FsLocal()
160 self.fs.fs_connect(config["storage"])
161 elif config["storage"]["driver"] == "mongo":
162 self.fs = fsmongo.FsMongo()
163 self.fs.fs_connect(config["storage"])
164 else:
165 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
166 config["storage"]["driver"]))
167
168 # copy message configuration in order to remove 'group_id' for msg_admin
169 config_message = config["message"].copy()
170 config_message["loop"] = self.loop
171 if config_message["driver"] == "local":
172 self.msg = msglocal.MsgLocal()
173 self.msg.connect(config_message)
174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 elif config_message["driver"] == "kafka":
178 self.msg = msgkafka.MsgKafka()
179 self.msg.connect(config_message)
180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
183 else:
184 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
185 config["message"]["driver"]))
186 except (DbException, FsException, MsgException) as e:
187 self.logger.critical(str(e), exc_info=True)
188 raise LcmException(str(e))
189
190 # contains created tasks/futures to be able to cancel
191 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
192
193 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
194 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
195 self.vca_config, self.loop)
196 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
197 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
198 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
199 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
200 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
201
202 async def check_RO_version(self):
203 tries = 14
204 last_error = None
205 while True:
206 try:
207 ro_server = ROclient.ROClient(self.loop, **self.ro_config)
208 ro_version = await ro_server.get_version()
209 if versiontuple(ro_version) < versiontuple(min_RO_version):
210 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
211 ro_version, min_RO_version))
212 self.logger.info("Connected to RO version {}".format(ro_version))
213 return
214 except ROclient.ROClientException as e:
215 tries -= 1
216 error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e)
217 if tries <= 0:
218 self.logger.critical(error_text)
219 raise LcmException(error_text)
220 if last_error != error_text:
221 last_error = error_text
222 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
223 await asyncio.sleep(5)
224
225 async def test(self, param=None):
226 self.logger.debug("Starting/Ending test task: {}".format(param))
227
228 async def kafka_ping(self):
229 self.logger.debug("Task kafka_ping Enter")
230 consecutive_errors = 0
231 first_start = True
232 kafka_has_received = False
233 self.pings_not_received = 1
234 while True:
235 try:
236 await self.msg_admin.aiowrite(
237 "admin", "ping",
238 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
239 self.loop)
240 # time between pings are low when it is not received and at starting
241 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
242 if not self.pings_not_received:
243 kafka_has_received = True
244 self.pings_not_received += 1
245 await asyncio.sleep(wait_time, loop=self.loop)
246 if self.pings_not_received > 10:
247 raise LcmException("It is not receiving pings from Kafka bus")
248 consecutive_errors = 0
249 first_start = False
250 except LcmException:
251 raise
252 except Exception as e:
253 # if not first_start is the first time after starting. So leave more time and wait
254 # to allow kafka starts
255 if consecutive_errors == 8 if not first_start else 30:
256 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
257 raise
258 consecutive_errors += 1
259 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
260 wait_time = 2 if not first_start else 5
261 await asyncio.sleep(wait_time, loop=self.loop)
262
263 def kafka_read_callback(self, topic, command, params):
264 order_id = 1
265
266 if topic != "admin" and command != "ping":
267 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
268 self.consecutive_errors = 0
269 self.first_start = False
270 order_id += 1
271 if command == "exit":
272 raise LcmExceptionExit
273 elif command.startswith("#"):
274 return
275 elif command == "echo":
276 # just for test
277 print(params)
278 sys.stdout.flush()
279 return
280 elif command == "test":
281 asyncio.Task(self.test(params), loop=self.loop)
282 return
283
284 if topic == "admin":
285 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
286 if params.get("worker_id") != self.worker_id:
287 return
288 self.pings_not_received = 0
289 try:
290 with open(health_check_file, "w") as f:
291 f.write(str(time()))
292 except Exception as e:
293 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
294 return
295 elif topic == "k8scluster":
296 if command == "create" or command == "created":
297 k8scluster_id = params.get("_id")
298 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
299 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
300 return
301 elif command == "delete" or command == "deleted":
302 k8scluster_id = params.get("_id")
303 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
304 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
305 return
306 elif topic == "k8srepo":
307 if command == "create" or command == "created":
308 k8srepo_id = params.get("_id")
309 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
310 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
311 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
312 return
313 elif command == "delete" or command == "deleted":
314 k8srepo_id = params.get("_id")
315 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
316 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
317 return
318 elif topic == "ns":
319 if command == "instantiate" or command == "instantiated":
320 # self.logger.debug("Deploying NS {}".format(nsr_id))
321 nslcmop = params
322 nslcmop_id = nslcmop["_id"]
323 nsr_id = nslcmop["nsInstanceId"]
324 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
325 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
326 return
327 elif command == "terminate" or command == "terminated":
328 # self.logger.debug("Deleting NS {}".format(nsr_id))
329 nslcmop = params
330 nslcmop_id = nslcmop["_id"]
331 nsr_id = nslcmop["nsInstanceId"]
332 self.lcm_tasks.cancel(topic, nsr_id)
333 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
334 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
335 return
336 elif command == "action":
337 # self.logger.debug("Update NS {}".format(nsr_id))
338 nslcmop = params
339 nslcmop_id = nslcmop["_id"]
340 nsr_id = nslcmop["nsInstanceId"]
341 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
342 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
343 return
344 elif command == "scale":
345 # self.logger.debug("Update NS {}".format(nsr_id))
346 nslcmop = params
347 nslcmop_id = nslcmop["_id"]
348 nsr_id = nslcmop["nsInstanceId"]
349 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
350 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
351 return
352 elif command == "show":
353 nsr_id = params
354 try:
355 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
356 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
357 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
358 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
359 db_nsr["detailed-status"],
360 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
361 except Exception as e:
362 print("nsr {} not found: {}".format(nsr_id, e))
363 sys.stdout.flush()
364 return
365 elif command == "deleted":
366 return # TODO cleaning of task just in case should be done
367 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
368 return
369 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
370 if command == "instantiate" or command == "instantiated":
371 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
372 nsilcmop = params
373 nsilcmop_id = nsilcmop["_id"] # slice operation id
374 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
375 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
376 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
377 return
378 elif command == "terminate" or command == "terminated":
379 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
380 nsilcmop = params
381 nsilcmop_id = nsilcmop["_id"] # slice operation id
382 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
383 self.lcm_tasks.cancel(topic, nsir_id)
384 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
385 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
386 return
387 elif command == "show":
388 nsir_id = params
389 try:
390 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
391 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
392 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
393 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
394 db_nsir["detailed-status"],
395 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
396 except Exception as e:
397 print("nsir {} not found: {}".format(nsir_id, e))
398 sys.stdout.flush()
399 return
400 elif command == "deleted":
401 return # TODO cleaning of task just in case should be done
402 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
403 return
404 elif topic == "vim_account":
405 vim_id = params["_id"]
406 if command in ("create", "created"):
407 task = asyncio.ensure_future(self.vim.create(params, order_id))
408 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
409 return
410 elif command == "delete" or command == "deleted":
411 self.lcm_tasks.cancel(topic, vim_id)
412 task = asyncio.ensure_future(self.vim.delete(params, order_id))
413 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
414 return
415 elif command == "show":
416 print("not implemented show with vim_account")
417 sys.stdout.flush()
418 return
419 elif command in ("edit", "edited"):
420 task = asyncio.ensure_future(self.vim.edit(params, order_id))
421 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
422 return
423 elif command == "deleted":
424 return # TODO cleaning of task just in case should be done
425 elif topic == "wim_account":
426 wim_id = params["_id"]
427 if command in ("create", "created"):
428 task = asyncio.ensure_future(self.wim.create(params, order_id))
429 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
430 return
431 elif command == "delete" or command == "deleted":
432 self.lcm_tasks.cancel(topic, wim_id)
433 task = asyncio.ensure_future(self.wim.delete(params, order_id))
434 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
435 return
436 elif command == "show":
437 print("not implemented show with wim_account")
438 sys.stdout.flush()
439 return
440 elif command in ("edit", "edited"):
441 task = asyncio.ensure_future(self.wim.edit(params, order_id))
442 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
443 return
444 elif command == "deleted":
445 return # TODO cleaning of task just in case should be done
446 elif topic == "sdn":
447 _sdn_id = params["_id"]
448 if command in ("create", "created"):
449 task = asyncio.ensure_future(self.sdn.create(params, order_id))
450 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
451 return
452 elif command == "delete" or command == "deleted":
453 self.lcm_tasks.cancel(topic, _sdn_id)
454 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
455 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
456 return
457 elif command in ("edit", "edited"):
458 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
459 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
460 return
461 elif command == "deleted":
462 return # TODO cleaning of task just in case should be done
463 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
464
465 async def kafka_read(self):
466 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
467 # future = asyncio.Future()
468 self.consecutive_errors = 0
469 self.first_start = True
470 while self.consecutive_errors < 10:
471 try:
472 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
473 topics_admin = ("admin", )
474 await asyncio.gather(
475 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
476 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
477 )
478
479 except LcmExceptionExit:
480 self.logger.debug("Bye!")
481 break
482 except Exception as e:
483 # if not first_start is the first time after starting. So leave more time and wait
484 # to allow kafka starts
485 if self.consecutive_errors == 8 if not self.first_start else 30:
486 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
487 raise
488 self.consecutive_errors += 1
489 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
490 wait_time = 2 if not self.first_start else 5
491 await asyncio.sleep(wait_time, loop=self.loop)
492
493 # self.logger.debug("Task kafka_read terminating")
494 self.logger.debug("Task kafka_read exit")
495
496 def start(self):
497
498 # check RO version
499 self.loop.run_until_complete(self.check_RO_version())
500
501 self.loop.run_until_complete(asyncio.gather(
502 self.kafka_read(),
503 self.kafka_ping()
504 ))
505 # TODO
506 # self.logger.debug("Terminating cancelling creation tasks")
507 # self.lcm_tasks.cancel("ALL", "create")
508 # timeout = 200
509 # while self.is_pending_tasks():
510 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
511 # await asyncio.sleep(2, loop=self.loop)
512 # timeout -= 2
513 # if not timeout:
514 # self.lcm_tasks.cancel("ALL", "ALL")
515 self.loop.close()
516 self.loop = None
517 if self.db:
518 self.db.db_disconnect()
519 if self.msg:
520 self.msg.disconnect()
521 if self.msg_admin:
522 self.msg_admin.disconnect()
523 if self.fs:
524 self.fs.fs_disconnect()
525
526 def read_config_file(self, config_file):
527 # TODO make a [ini] + yaml inside parser
528 # the configparser library is not suitable, because it does not admit comments at the end of line,
529 # and not parse integer or boolean
530 try:
531 with open(config_file) as f:
532 conf = yaml.load(f, Loader=yaml.Loader)
533 for k, v in environ.items():
534 if not k.startswith("OSMLCM_"):
535 continue
536 k_items = k.lower().split("_")
537 if len(k_items) < 3:
538 continue
539 if k_items[1] in ("ro", "vca"):
540 # put in capital letter
541 k_items[1] = k_items[1].upper()
542 c = conf
543 try:
544 for k_item in k_items[1:-1]:
545 c = c[k_item]
546 if k_items[-1] == "port":
547 c[k_items[-1]] = int(v)
548 else:
549 c[k_items[-1]] = v
550 except Exception as e:
551 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
552
553 return conf
554 except Exception as e:
555 self.logger.critical("At config file '{}': {}".format(config_file, e))
556 exit(1)
557
558 @staticmethod
559 def get_process_id():
560 """
561 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
562 will provide a random one
563 :return: Obtained ID
564 """
565 # Try getting docker id. If fails, get pid
566 try:
567 with open("/proc/self/cgroup", "r") as f:
568 text_id_ = f.readline()
569 _, _, text_id = text_id_.rpartition("/")
570 text_id = text_id.replace('\n', '')[:12]
571 if text_id:
572 return text_id
573 except Exception:
574 pass
575 # Return a random id
576 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
577
578
579 def usage():
580 print("""Usage: {} [options]
581 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
582 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
583 -h|--help: shows this help
584 """.format(sys.argv[0]))
585 # --log-socket-host HOST: send logs to this host")
586 # --log-socket-port PORT: send logs using this port (default: 9022)")
587
588
589 def health_check():
590 retry = 2
591 while retry:
592 retry -= 1
593 try:
594 with open(health_check_file, "r") as f:
595 last_received_ping = f.read()
596
597 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
598 exit(0)
599 except Exception:
600 pass
601 if retry:
602 sleep(6)
603 exit(1)
604
605
606 if __name__ == '__main__':
607
608 try:
609 print("SYS.PATH='{}'".format(sys.path))
610 # load parameters and configuration
611 # -h
612 # -c value
613 # --config value
614 # --help
615 # --health-check
616 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
617 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
618 config_file = None
619 for o, a in opts:
620 if o in ("-h", "--help"):
621 usage()
622 sys.exit()
623 elif o in ("-c", "--config"):
624 config_file = a
625 elif o == "--health-check":
626 health_check()
627 # elif o == "--log-socket-port":
628 # log_socket_port = a
629 # elif o == "--log-socket-host":
630 # log_socket_host = a
631 # elif o == "--log-file":
632 # log_file = a
633 else:
634 assert False, "Unhandled option"
635
636 if config_file:
637 if not path.isfile(config_file):
638 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
639 exit(1)
640 else:
641 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
642 if path.isfile(config_file):
643 break
644 else:
645 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
646 exit(1)
647 lcm = Lcm(config_file)
648 lcm.start()
649 except (LcmException, getopt.GetoptError) as e:
650 print(str(e), file=sys.stderr)
651 # usage()
652 exit(1)