fix 1015. move health check to other module to avoid import unneeded libraries
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
60
61
62 class Lcm:
63
64 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
65 ping_interval_boot = 5 # how many time ping is sent when booting
66
67 def __init__(self, config_file, loop=None):
68 """
69 Init, Connect to database, filesystem storage, and messaging
70 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
71 :return: None
72 """
73 self.db = None
74 self.msg = None
75 self.msg_admin = None
76 self.fs = None
77 self.pings_not_received = 1
78 self.consecutive_errors = 0
79 self.first_start = False
80
81 # logging
82 self.logger = logging.getLogger('lcm')
83 # get id
84 self.worker_id = self.get_process_id()
85 # load configuration
86 config = self.read_config_file(config_file)
87 self.config = config
88 self.config["ro_config"] = {
89 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
90 "tenant": config.get("tenant", "osm"),
91 "logger_name": "lcm.ROclient",
92 "loglevel": "ERROR",
93 }
94
95 self.loop = loop or asyncio.get_event_loop()
96
97 # logging
98 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
99 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
100 config["database"]["logger_name"] = "lcm.db"
101 config["storage"]["logger_name"] = "lcm.fs"
102 config["message"]["logger_name"] = "lcm.msg"
103 if config["global"].get("logfile"):
104 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
105 maxBytes=100e6, backupCount=9, delay=0)
106 file_handler.setFormatter(log_formatter_simple)
107 self.logger.addHandler(file_handler)
108 if not config["global"].get("nologging"):
109 str_handler = logging.StreamHandler()
110 str_handler.setFormatter(log_formatter_simple)
111 self.logger.addHandler(str_handler)
112
113 if config["global"].get("loglevel"):
114 self.logger.setLevel(config["global"]["loglevel"])
115
116 # logging other modules
117 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
118 config[k1]["logger_name"] = logname
119 logger_module = logging.getLogger(logname)
120 if config[k1].get("logfile"):
121 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
122 maxBytes=100e6, backupCount=9, delay=0)
123 file_handler.setFormatter(log_formatter_simple)
124 logger_module.addHandler(file_handler)
125 if config[k1].get("loglevel"):
126 logger_module.setLevel(config[k1]["loglevel"])
127 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
128
129 # check version of N2VC
130 # TODO enhance with int conversion or from distutils.version import LooseVersion
131 # or with list(map(int, version.split(".")))
132 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
133 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
134 n2vc_version, min_n2vc_version))
135 # check version of common
136 if versiontuple(common_version) < versiontuple(min_common_version):
137 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
138 common_version, min_common_version))
139
140 try:
141 # TODO check database version
142 if config["database"]["driver"] == "mongo":
143 self.db = dbmongo.DbMongo()
144 self.db.db_connect(config["database"])
145 elif config["database"]["driver"] == "memory":
146 self.db = dbmemory.DbMemory()
147 self.db.db_connect(config["database"])
148 else:
149 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
150 config["database"]["driver"]))
151
152 if config["storage"]["driver"] == "local":
153 self.fs = fslocal.FsLocal()
154 self.fs.fs_connect(config["storage"])
155 elif config["storage"]["driver"] == "mongo":
156 self.fs = fsmongo.FsMongo()
157 self.fs.fs_connect(config["storage"])
158 else:
159 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
160 config["storage"]["driver"]))
161
162 # copy message configuration in order to remove 'group_id' for msg_admin
163 config_message = config["message"].copy()
164 config_message["loop"] = self.loop
165 if config_message["driver"] == "local":
166 self.msg = msglocal.MsgLocal()
167 self.msg.connect(config_message)
168 self.msg_admin = msglocal.MsgLocal()
169 config_message.pop("group_id", None)
170 self.msg_admin.connect(config_message)
171 elif config_message["driver"] == "kafka":
172 self.msg = msgkafka.MsgKafka()
173 self.msg.connect(config_message)
174 self.msg_admin = msgkafka.MsgKafka()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 else:
178 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
179 config["message"]["driver"]))
180 except (DbException, FsException, MsgException) as e:
181 self.logger.critical(str(e), exc_info=True)
182 raise LcmException(str(e))
183
184 # contains created tasks/futures to be able to cancel
185 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
186
187 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
188 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
189 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
190 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
191 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
192 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
193 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
194
195 async def check_RO_version(self):
196 tries = 14
197 last_error = None
198 while True:
199 try:
200 ro_server = ROclient.ROClient(self.loop, **self.config["ro_config"])
201 ro_version = await ro_server.get_version()
202 if versiontuple(ro_version) < versiontuple(min_RO_version):
203 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
204 ro_version, min_RO_version))
205 self.logger.info("Connected to RO version {}".format(ro_version))
206 return
207 except ROclient.ROClientException as e:
208 tries -= 1
209 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["endpoint_url"],
210 e)
211 if tries <= 0:
212 self.logger.critical(error_text)
213 raise LcmException(error_text)
214 if last_error != error_text:
215 last_error = error_text
216 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
217 await asyncio.sleep(5)
218
219 async def test(self, param=None):
220 self.logger.debug("Starting/Ending test task: {}".format(param))
221
222 async def kafka_ping(self):
223 self.logger.debug("Task kafka_ping Enter")
224 consecutive_errors = 0
225 first_start = True
226 kafka_has_received = False
227 self.pings_not_received = 1
228 while True:
229 try:
230 await self.msg_admin.aiowrite(
231 "admin", "ping",
232 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
233 self.loop)
234 # time between pings are low when it is not received and at starting
235 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
236 if not self.pings_not_received:
237 kafka_has_received = True
238 self.pings_not_received += 1
239 await asyncio.sleep(wait_time, loop=self.loop)
240 if self.pings_not_received > 10:
241 raise LcmException("It is not receiving pings from Kafka bus")
242 consecutive_errors = 0
243 first_start = False
244 except LcmException:
245 raise
246 except Exception as e:
247 # if not first_start is the first time after starting. So leave more time and wait
248 # to allow kafka starts
249 if consecutive_errors == 8 if not first_start else 30:
250 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
251 raise
252 consecutive_errors += 1
253 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
254 wait_time = 2 if not first_start else 5
255 await asyncio.sleep(wait_time, loop=self.loop)
256
257 def kafka_read_callback(self, topic, command, params):
258 order_id = 1
259
260 if topic != "admin" and command != "ping":
261 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
262 self.consecutive_errors = 0
263 self.first_start = False
264 order_id += 1
265 if command == "exit":
266 raise LcmExceptionExit
267 elif command.startswith("#"):
268 return
269 elif command == "echo":
270 # just for test
271 print(params)
272 sys.stdout.flush()
273 return
274 elif command == "test":
275 asyncio.Task(self.test(params), loop=self.loop)
276 return
277
278 if topic == "admin":
279 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
280 if params.get("worker_id") != self.worker_id:
281 return
282 self.pings_not_received = 0
283 try:
284 with open(health_check_file, "w") as f:
285 f.write(str(time()))
286 except Exception as e:
287 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
288 return
289 elif topic == "pla":
290 if command == "placement":
291 self.ns.update_nsrs_with_pla_result(params)
292 return
293 elif topic == "k8scluster":
294 if command == "create" or command == "created":
295 k8scluster_id = params.get("_id")
296 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
297 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
298 return
299 elif command == "delete" or command == "deleted":
300 k8scluster_id = params.get("_id")
301 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
302 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
303 return
304 elif topic == "k8srepo":
305 if command == "create" or command == "created":
306 k8srepo_id = params.get("_id")
307 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
308 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
309 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
310 return
311 elif command == "delete" or command == "deleted":
312 k8srepo_id = params.get("_id")
313 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
314 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
315 return
316 elif topic == "ns":
317 if command == "instantiate":
318 # self.logger.debug("Deploying NS {}".format(nsr_id))
319 nslcmop = params
320 nslcmop_id = nslcmop["_id"]
321 nsr_id = nslcmop["nsInstanceId"]
322 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
323 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
324 return
325 elif command == "terminate":
326 # self.logger.debug("Deleting NS {}".format(nsr_id))
327 nslcmop = params
328 nslcmop_id = nslcmop["_id"]
329 nsr_id = nslcmop["nsInstanceId"]
330 self.lcm_tasks.cancel(topic, nsr_id)
331 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
332 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
333 return
334 elif command == "action":
335 # self.logger.debug("Update NS {}".format(nsr_id))
336 nslcmop = params
337 nslcmop_id = nslcmop["_id"]
338 nsr_id = nslcmop["nsInstanceId"]
339 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
340 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
341 return
342 elif command == "scale":
343 # self.logger.debug("Update NS {}".format(nsr_id))
344 nslcmop = params
345 nslcmop_id = nslcmop["_id"]
346 nsr_id = nslcmop["nsInstanceId"]
347 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
348 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
349 return
350 elif command == "show":
351 nsr_id = params
352 try:
353 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
354 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
355 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
356 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
357 db_nsr["detailed-status"],
358 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
359 except Exception as e:
360 print("nsr {} not found: {}".format(nsr_id, e))
361 sys.stdout.flush()
362 return
363 elif command == "deleted":
364 return # TODO cleaning of task just in case should be done
365 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
366 return
367 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
368 if command == "instantiate":
369 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
370 nsilcmop = params
371 nsilcmop_id = nsilcmop["_id"] # slice operation id
372 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
373 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
374 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
375 return
376 elif command == "terminate":
377 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
378 nsilcmop = params
379 nsilcmop_id = nsilcmop["_id"] # slice operation id
380 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
381 self.lcm_tasks.cancel(topic, nsir_id)
382 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
383 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
384 return
385 elif command == "show":
386 nsir_id = params
387 try:
388 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
389 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
390 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
391 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
392 db_nsir["detailed-status"],
393 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
394 except Exception as e:
395 print("nsir {} not found: {}".format(nsir_id, e))
396 sys.stdout.flush()
397 return
398 elif command == "deleted":
399 return # TODO cleaning of task just in case should be done
400 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
401 return
402 elif topic == "vim_account":
403 vim_id = params["_id"]
404 if command in ("create", "created"):
405 task = asyncio.ensure_future(self.vim.create(params, order_id))
406 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
407 return
408 elif command == "delete" or command == "deleted":
409 self.lcm_tasks.cancel(topic, vim_id)
410 task = asyncio.ensure_future(self.vim.delete(params, order_id))
411 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
412 return
413 elif command == "show":
414 print("not implemented show with vim_account")
415 sys.stdout.flush()
416 return
417 elif command in ("edit", "edited"):
418 task = asyncio.ensure_future(self.vim.edit(params, order_id))
419 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
420 return
421 elif command == "deleted":
422 return # TODO cleaning of task just in case should be done
423 elif topic == "wim_account":
424 wim_id = params["_id"]
425 if command in ("create", "created"):
426 task = asyncio.ensure_future(self.wim.create(params, order_id))
427 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
428 return
429 elif command == "delete" or command == "deleted":
430 self.lcm_tasks.cancel(topic, wim_id)
431 task = asyncio.ensure_future(self.wim.delete(params, order_id))
432 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
433 return
434 elif command == "show":
435 print("not implemented show with wim_account")
436 sys.stdout.flush()
437 return
438 elif command in ("edit", "edited"):
439 task = asyncio.ensure_future(self.wim.edit(params, order_id))
440 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
441 return
442 elif command == "deleted":
443 return # TODO cleaning of task just in case should be done
444 elif topic == "sdn":
445 _sdn_id = params["_id"]
446 if command in ("create", "created"):
447 task = asyncio.ensure_future(self.sdn.create(params, order_id))
448 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
449 return
450 elif command == "delete" or command == "deleted":
451 self.lcm_tasks.cancel(topic, _sdn_id)
452 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
453 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
454 return
455 elif command in ("edit", "edited"):
456 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
457 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
458 return
459 elif command == "deleted":
460 return # TODO cleaning of task just in case should be done
461 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
462
463 async def kafka_read(self):
464 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
465 # future = asyncio.Future()
466 self.consecutive_errors = 0
467 self.first_start = True
468 while self.consecutive_errors < 10:
469 try:
470 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
471 topics_admin = ("admin", )
472 await asyncio.gather(
473 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
474 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
475 )
476
477 except LcmExceptionExit:
478 self.logger.debug("Bye!")
479 break
480 except Exception as e:
481 # if not first_start is the first time after starting. So leave more time and wait
482 # to allow kafka starts
483 if self.consecutive_errors == 8 if not self.first_start else 30:
484 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
485 raise
486 self.consecutive_errors += 1
487 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
488 wait_time = 2 if not self.first_start else 5
489 await asyncio.sleep(wait_time, loop=self.loop)
490
491 # self.logger.debug("Task kafka_read terminating")
492 self.logger.debug("Task kafka_read exit")
493
494 def start(self):
495
496 # check RO version
497 self.loop.run_until_complete(self.check_RO_version())
498
499 self.loop.run_until_complete(asyncio.gather(
500 self.kafka_read(),
501 self.kafka_ping()
502 ))
503 # TODO
504 # self.logger.debug("Terminating cancelling creation tasks")
505 # self.lcm_tasks.cancel("ALL", "create")
506 # timeout = 200
507 # while self.is_pending_tasks():
508 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
509 # await asyncio.sleep(2, loop=self.loop)
510 # timeout -= 2
511 # if not timeout:
512 # self.lcm_tasks.cancel("ALL", "ALL")
513 self.loop.close()
514 self.loop = None
515 if self.db:
516 self.db.db_disconnect()
517 if self.msg:
518 self.msg.disconnect()
519 if self.msg_admin:
520 self.msg_admin.disconnect()
521 if self.fs:
522 self.fs.fs_disconnect()
523
524 def read_config_file(self, config_file):
525 # TODO make a [ini] + yaml inside parser
526 # the configparser library is not suitable, because it does not admit comments at the end of line,
527 # and not parse integer or boolean
528 try:
529 # read file as yaml format
530 with open(config_file) as f:
531 conf = yaml.load(f, Loader=yaml.Loader)
532 # Ensure all sections are not empty
533 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
534 if not conf.get(k):
535 conf[k] = {}
536
537 # read all environ that starts with OSMLCM_
538 for k, v in environ.items():
539 if not k.startswith("OSMLCM_"):
540 continue
541 subject, _, item = k[7:].lower().partition("_")
542 if not item:
543 continue
544 if subject in ("ro", "vca"):
545 # put in capital letter
546 subject = subject.upper()
547 try:
548 if item == "port" or subject == "timeout":
549 conf[subject][item] = int(v)
550 else:
551 conf[subject][item] = v
552 except Exception as e:
553 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
554
555 # backward compatibility of VCA parameters
556
557 if 'pubkey' in conf["VCA"]:
558 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
559 if 'cacert' in conf["VCA"]:
560 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
561 if 'apiproxy' in conf["VCA"]:
562 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
563
564 if 'enableosupgrade' in conf["VCA"]:
565 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
566 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
567 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
568 conf["VCA"]['enable_os_upgrade'] = False
569 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
570 conf["VCA"]['enable_os_upgrade'] = True
571
572 if 'aptmirror' in conf["VCA"]:
573 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
574
575 return conf
576 except Exception as e:
577 self.logger.critical("At config file '{}': {}".format(config_file, e))
578 exit(1)
579
580 @staticmethod
581 def get_process_id():
582 """
583 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
584 will provide a random one
585 :return: Obtained ID
586 """
587 # Try getting docker id. If fails, get pid
588 try:
589 with open("/proc/self/cgroup", "r") as f:
590 text_id_ = f.readline()
591 _, _, text_id = text_id_.rpartition("/")
592 text_id = text_id.replace('\n', '')[:12]
593 if text_id:
594 return text_id
595 except Exception:
596 pass
597 # Return a random id
598 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
599
600
601 def usage():
602 print("""Usage: {} [options]
603 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
604 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
605 -h|--help: shows this help
606 """.format(sys.argv[0]))
607 # --log-socket-host HOST: send logs to this host")
608 # --log-socket-port PORT: send logs using this port (default: 9022)")
609
610
611 if __name__ == '__main__':
612
613 try:
614 # print("SYS.PATH='{}'".format(sys.path))
615 # load parameters and configuration
616 # -h
617 # -c value
618 # --config value
619 # --help
620 # --health-check
621 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
622 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
623 config_file = None
624 for o, a in opts:
625 if o in ("-h", "--help"):
626 usage()
627 sys.exit()
628 elif o in ("-c", "--config"):
629 config_file = a
630 elif o == "--health-check":
631 from osm_lcm.lcm_hc import health_check
632 health_check(health_check_file, Lcm.ping_interval_pace)
633 # elif o == "--log-socket-port":
634 # log_socket_port = a
635 # elif o == "--log-socket-host":
636 # log_socket_host = a
637 # elif o == "--log-file":
638 # log_file = a
639 else:
640 assert False, "Unhandled option"
641
642 if config_file:
643 if not path.isfile(config_file):
644 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
645 exit(1)
646 else:
647 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
648 if path.isfile(config_file):
649 break
650 else:
651 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
652 exit(1)
653 lcm = Lcm(config_file)
654 lcm.start()
655 except (LcmException, getopt.GetoptError) as e:
656 print(str(e), file=sys.stderr)
657 # usage()
658 exit(1)