365272bb4349aabac126864fd263511a6ebacc2a
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm import ROclient
36
37 from time import time, sleep
38 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
39 from osm_lcm import version as _lcm_version, version_date as lcm_version_date
40
41 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
42 from osm_common import version as common_version
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45 from osm_common.msgbase import MsgException
46 from os import environ, path
47 from random import choice as random_choice
48 from n2vc import version as n2vc_version
49
50 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
51 pdb.set_trace()
52
53
54 __author__ = "Alfonso Tierno"
55 min_RO_version = "6.0.2"
56 min_n2vc_version = "0.0.2"
57
58 min_common_version = "0.1.19"
59 lcm_version = _lcm_version
60 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
61
62
63 class Lcm:
64
65 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
66 ping_interval_boot = 5 # how many time ping is sent when booting
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 global lcm_version
75 self.db = None
76 self.msg = None
77 self.msg_admin = None
78 self.fs = None
79 self.pings_not_received = 1
80 self.consecutive_errors = 0
81 self.first_start = False
82
83 # logging
84 self.logger = logging.getLogger('lcm')
85 # get id
86 self.worker_id = self.get_process_id()
87 # load configuration
88 config = self.read_config_file(config_file)
89 self.config = config
90 self.config["ro_config"] = {
91 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.ROclient",
94 "loglevel": "ERROR",
95 }
96
97 self.loop = loop or asyncio.get_event_loop()
98
99 # logging
100 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
101 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
102 config["database"]["logger_name"] = "lcm.db"
103 config["storage"]["logger_name"] = "lcm.fs"
104 config["message"]["logger_name"] = "lcm.msg"
105 if config["global"].get("logfile"):
106 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
107 maxBytes=100e6, backupCount=9, delay=0)
108 file_handler.setFormatter(log_formatter_simple)
109 self.logger.addHandler(file_handler)
110 if not config["global"].get("nologging"):
111 str_handler = logging.StreamHandler()
112 str_handler.setFormatter(log_formatter_simple)
113 self.logger.addHandler(str_handler)
114
115 if config["global"].get("loglevel"):
116 self.logger.setLevel(config["global"]["loglevel"])
117
118 # logging other modules
119 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
120 config[k1]["logger_name"] = logname
121 logger_module = logging.getLogger(logname)
122 if config[k1].get("logfile"):
123 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
124 maxBytes=100e6, backupCount=9, delay=0)
125 file_handler.setFormatter(log_formatter_simple)
126 logger_module.addHandler(file_handler)
127 if config[k1].get("loglevel"):
128 logger_module.setLevel(config[k1]["loglevel"])
129 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
130
131 # check version of N2VC
132 # TODO enhance with int conversion or from distutils.version import LooseVersion
133 # or with list(map(int, version.split(".")))
134 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
135 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
136 n2vc_version, min_n2vc_version))
137 # check version of common
138 if versiontuple(common_version) < versiontuple(min_common_version):
139 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
140 common_version, min_common_version))
141
142 try:
143 # TODO check database version
144 if config["database"]["driver"] == "mongo":
145 self.db = dbmongo.DbMongo()
146 self.db.db_connect(config["database"])
147 elif config["database"]["driver"] == "memory":
148 self.db = dbmemory.DbMemory()
149 self.db.db_connect(config["database"])
150 else:
151 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
152 config["database"]["driver"]))
153
154 if config["storage"]["driver"] == "local":
155 self.fs = fslocal.FsLocal()
156 self.fs.fs_connect(config["storage"])
157 elif config["storage"]["driver"] == "mongo":
158 self.fs = fsmongo.FsMongo()
159 self.fs.fs_connect(config["storage"])
160 else:
161 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
162 config["storage"]["driver"]))
163
164 # copy message configuration in order to remove 'group_id' for msg_admin
165 config_message = config["message"].copy()
166 config_message["loop"] = self.loop
167 if config_message["driver"] == "local":
168 self.msg = msglocal.MsgLocal()
169 self.msg.connect(config_message)
170 self.msg_admin = msglocal.MsgLocal()
171 config_message.pop("group_id", None)
172 self.msg_admin.connect(config_message)
173 elif config_message["driver"] == "kafka":
174 self.msg = msgkafka.MsgKafka()
175 self.msg.connect(config_message)
176 self.msg_admin = msgkafka.MsgKafka()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 else:
180 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
181 config["message"]["driver"]))
182 except (DbException, FsException, MsgException) as e:
183 self.logger.critical(str(e), exc_info=True)
184 raise LcmException(str(e))
185
186 # contains created tasks/futures to be able to cancel
187 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
188
189 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
190 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
191 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
192 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
193 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
194 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
195 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
196
197 async def check_RO_version(self):
198 tries = 14
199 last_error = None
200 while True:
201 try:
202 ro_server = ROclient.ROClient(self.loop, **self.config["ro_config"])
203 ro_version = await ro_server.get_version()
204 if versiontuple(ro_version) < versiontuple(min_RO_version):
205 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
206 ro_version, min_RO_version))
207 self.logger.info("Connected to RO version {}".format(ro_version))
208 return
209 except ROclient.ROClientException as e:
210 tries -= 1
211 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["endpoint_url"],
212 e)
213 if tries <= 0:
214 self.logger.critical(error_text)
215 raise LcmException(error_text)
216 if last_error != error_text:
217 last_error = error_text
218 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
219 await asyncio.sleep(5)
220
221 async def test(self, param=None):
222 self.logger.debug("Starting/Ending test task: {}".format(param))
223
224 async def kafka_ping(self):
225 global lcm_version
226 self.logger.debug("Task kafka_ping Enter")
227 consecutive_errors = 0
228 first_start = True
229 kafka_has_received = False
230 self.pings_not_received = 1
231 while True:
232 try:
233 await self.msg_admin.aiowrite(
234 "admin", "ping",
235 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
236 self.loop)
237 # time between pings are low when it is not received and at starting
238 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
239 if not self.pings_not_received:
240 kafka_has_received = True
241 self.pings_not_received += 1
242 await asyncio.sleep(wait_time, loop=self.loop)
243 if self.pings_not_received > 10:
244 raise LcmException("It is not receiving pings from Kafka bus")
245 consecutive_errors = 0
246 first_start = False
247 except LcmException:
248 raise
249 except Exception as e:
250 # if not first_start is the first time after starting. So leave more time and wait
251 # to allow kafka starts
252 if consecutive_errors == 8 if not first_start else 30:
253 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
254 raise
255 consecutive_errors += 1
256 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
257 wait_time = 2 if not first_start else 5
258 await asyncio.sleep(wait_time, loop=self.loop)
259
260 def kafka_read_callback(self, topic, command, params):
261 order_id = 1
262
263 if topic != "admin" and command != "ping":
264 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
265 self.consecutive_errors = 0
266 self.first_start = False
267 order_id += 1
268 if command == "exit":
269 raise LcmExceptionExit
270 elif command.startswith("#"):
271 return
272 elif command == "echo":
273 # just for test
274 print(params)
275 sys.stdout.flush()
276 return
277 elif command == "test":
278 asyncio.Task(self.test(params), loop=self.loop)
279 return
280
281 if topic == "admin":
282 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
283 if params.get("worker_id") != self.worker_id:
284 return
285 self.pings_not_received = 0
286 try:
287 with open(health_check_file, "w") as f:
288 f.write(str(time()))
289 except Exception as e:
290 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
291 return
292 elif topic == "k8scluster":
293 if command == "create" or command == "created":
294 k8scluster_id = params.get("_id")
295 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
296 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
297 return
298 elif command == "delete" or command == "deleted":
299 k8scluster_id = params.get("_id")
300 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
301 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
302 return
303 elif topic == "k8srepo":
304 if command == "create" or command == "created":
305 k8srepo_id = params.get("_id")
306 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
307 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
308 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
309 return
310 elif command == "delete" or command == "deleted":
311 k8srepo_id = params.get("_id")
312 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
313 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
314 return
315 elif topic == "ns":
316 if command == "instantiate" or command == "instantiated":
317 # self.logger.debug("Deploying NS {}".format(nsr_id))
318 nslcmop = params
319 nslcmop_id = nslcmop["_id"]
320 nsr_id = nslcmop["nsInstanceId"]
321 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
322 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
323 return
324 elif command == "terminate" or command == "terminated":
325 # self.logger.debug("Deleting NS {}".format(nsr_id))
326 nslcmop = params
327 nslcmop_id = nslcmop["_id"]
328 nsr_id = nslcmop["nsInstanceId"]
329 self.lcm_tasks.cancel(topic, nsr_id)
330 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
331 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
332 return
333 elif command == "action":
334 # self.logger.debug("Update NS {}".format(nsr_id))
335 nslcmop = params
336 nslcmop_id = nslcmop["_id"]
337 nsr_id = nslcmop["nsInstanceId"]
338 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
339 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
340 return
341 elif command == "scale":
342 # self.logger.debug("Update NS {}".format(nsr_id))
343 nslcmop = params
344 nslcmop_id = nslcmop["_id"]
345 nsr_id = nslcmop["nsInstanceId"]
346 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
347 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
348 return
349 elif command == "show":
350 nsr_id = params
351 try:
352 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
353 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
354 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
355 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
356 db_nsr["detailed-status"],
357 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
358 except Exception as e:
359 print("nsr {} not found: {}".format(nsr_id, e))
360 sys.stdout.flush()
361 return
362 elif command == "deleted":
363 return # TODO cleaning of task just in case should be done
364 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
365 return
366 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
367 if command == "instantiate" or command == "instantiated":
368 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
369 nsilcmop = params
370 nsilcmop_id = nsilcmop["_id"] # slice operation id
371 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
372 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
373 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
374 return
375 elif command == "terminate" or command == "terminated":
376 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
377 nsilcmop = params
378 nsilcmop_id = nsilcmop["_id"] # slice operation id
379 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
380 self.lcm_tasks.cancel(topic, nsir_id)
381 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
382 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
383 return
384 elif command == "show":
385 nsir_id = params
386 try:
387 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
388 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
389 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
390 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
391 db_nsir["detailed-status"],
392 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
393 except Exception as e:
394 print("nsir {} not found: {}".format(nsir_id, e))
395 sys.stdout.flush()
396 return
397 elif command == "deleted":
398 return # TODO cleaning of task just in case should be done
399 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
400 return
401 elif topic == "vim_account":
402 vim_id = params["_id"]
403 if command in ("create", "created"):
404 task = asyncio.ensure_future(self.vim.create(params, order_id))
405 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
406 return
407 elif command == "delete" or command == "deleted":
408 self.lcm_tasks.cancel(topic, vim_id)
409 task = asyncio.ensure_future(self.vim.delete(params, order_id))
410 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
411 return
412 elif command == "show":
413 print("not implemented show with vim_account")
414 sys.stdout.flush()
415 return
416 elif command in ("edit", "edited"):
417 task = asyncio.ensure_future(self.vim.edit(params, order_id))
418 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
419 return
420 elif command == "deleted":
421 return # TODO cleaning of task just in case should be done
422 elif topic == "wim_account":
423 wim_id = params["_id"]
424 if command in ("create", "created"):
425 task = asyncio.ensure_future(self.wim.create(params, order_id))
426 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
427 return
428 elif command == "delete" or command == "deleted":
429 self.lcm_tasks.cancel(topic, wim_id)
430 task = asyncio.ensure_future(self.wim.delete(params, order_id))
431 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
432 return
433 elif command == "show":
434 print("not implemented show with wim_account")
435 sys.stdout.flush()
436 return
437 elif command in ("edit", "edited"):
438 task = asyncio.ensure_future(self.wim.edit(params, order_id))
439 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
440 return
441 elif command == "deleted":
442 return # TODO cleaning of task just in case should be done
443 elif topic == "sdn":
444 _sdn_id = params["_id"]
445 if command in ("create", "created"):
446 task = asyncio.ensure_future(self.sdn.create(params, order_id))
447 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
448 return
449 elif command == "delete" or command == "deleted":
450 self.lcm_tasks.cancel(topic, _sdn_id)
451 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
452 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
453 return
454 elif command in ("edit", "edited"):
455 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
456 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
457 return
458 elif command == "deleted":
459 return # TODO cleaning of task just in case should be done
460 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
461
462 async def kafka_read(self):
463 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
464 # future = asyncio.Future()
465 self.consecutive_errors = 0
466 self.first_start = True
467 while self.consecutive_errors < 10:
468 try:
469 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
470 topics_admin = ("admin", )
471 await asyncio.gather(
472 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
473 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
474 )
475
476 except LcmExceptionExit:
477 self.logger.debug("Bye!")
478 break
479 except Exception as e:
480 # if not first_start is the first time after starting. So leave more time and wait
481 # to allow kafka starts
482 if self.consecutive_errors == 8 if not self.first_start else 30:
483 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
484 raise
485 self.consecutive_errors += 1
486 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
487 wait_time = 2 if not self.first_start else 5
488 await asyncio.sleep(wait_time, loop=self.loop)
489
490 # self.logger.debug("Task kafka_read terminating")
491 self.logger.debug("Task kafka_read exit")
492
493 def start(self):
494
495 # check RO version
496 self.loop.run_until_complete(self.check_RO_version())
497
498 self.loop.run_until_complete(asyncio.gather(
499 self.kafka_read(),
500 self.kafka_ping()
501 ))
502 # TODO
503 # self.logger.debug("Terminating cancelling creation tasks")
504 # self.lcm_tasks.cancel("ALL", "create")
505 # timeout = 200
506 # while self.is_pending_tasks():
507 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
508 # await asyncio.sleep(2, loop=self.loop)
509 # timeout -= 2
510 # if not timeout:
511 # self.lcm_tasks.cancel("ALL", "ALL")
512 self.loop.close()
513 self.loop = None
514 if self.db:
515 self.db.db_disconnect()
516 if self.msg:
517 self.msg.disconnect()
518 if self.msg_admin:
519 self.msg_admin.disconnect()
520 if self.fs:
521 self.fs.fs_disconnect()
522
523 def read_config_file(self, config_file):
524 # TODO make a [ini] + yaml inside parser
525 # the configparser library is not suitable, because it does not admit comments at the end of line,
526 # and not parse integer or boolean
527 try:
528 # read file as yaml format
529 with open(config_file) as f:
530 conf = yaml.load(f, Loader=yaml.Loader)
531 # Ensure all sections are not empty
532 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
533 if not conf.get(k):
534 conf[k] = {}
535
536 # read all environ that starts with OSMLCM_
537 for k, v in environ.items():
538 if not k.startswith("OSMLCM_"):
539 continue
540 subject, _, item = k[7:].lower().partition("_")
541 if not item:
542 continue
543 if subject in ("ro", "vca"):
544 # put in capital letter
545 subject = subject.upper()
546 try:
547 if item == "port" or subject == "timeout":
548 conf[subject][item] = int(v)
549 else:
550 conf[subject][item] = v
551 except Exception as e:
552 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
553
554 # backward compatibility of VCA parameters
555
556 if 'pubkey' in conf["VCA"]:
557 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
558 if 'cacert' in conf["VCA"]:
559 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
560 if 'apiproxy' in conf["VCA"]:
561 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
562
563 if 'enableosupgrade' in conf["VCA"]:
564 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
565 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
566 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
567 conf["VCA"]['enable_os_upgrade'] = False
568 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
569 conf["VCA"]['enable_os_upgrade'] = True
570
571 if 'aptmirror' in conf["VCA"]:
572 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
573
574 return conf
575 except Exception as e:
576 self.logger.critical("At config file '{}': {}".format(config_file, e))
577 exit(1)
578
579 @staticmethod
580 def get_process_id():
581 """
582 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
583 will provide a random one
584 :return: Obtained ID
585 """
586 # Try getting docker id. If fails, get pid
587 try:
588 with open("/proc/self/cgroup", "r") as f:
589 text_id_ = f.readline()
590 _, _, text_id = text_id_.rpartition("/")
591 text_id = text_id.replace('\n', '')[:12]
592 if text_id:
593 return text_id
594 except Exception:
595 pass
596 # Return a random id
597 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
598
599
600 def _get_version():
601 """
602 Try to get version from package using pkg_resources (available with setuptools)
603 """
604 global lcm_version
605 try:
606 from pkg_resources import get_distribution
607 lcm_version = get_distribution("osm_lcm").version
608 except Exception:
609 pass
610
611
612 def usage():
613 print("""Usage: {} [options]
614 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
615 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
616 -h|--help: shows this help
617 """.format(sys.argv[0]))
618 # --log-socket-host HOST: send logs to this host")
619 # --log-socket-port PORT: send logs using this port (default: 9022)")
620
621
622 def health_check():
623 retry = 2
624 while retry:
625 retry -= 1
626 try:
627 with open(health_check_file, "r") as f:
628 last_received_ping = f.read()
629
630 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
631 exit(0)
632 except Exception:
633 pass
634 if retry:
635 sleep(6)
636 exit(1)
637
638
639 if __name__ == '__main__':
640
641 try:
642 print("SYS.PATH='{}'".format(sys.path))
643 # load parameters and configuration
644 # -h
645 # -c value
646 # --config value
647 # --help
648 # --health-check
649 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
650 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
651 config_file = None
652 for o, a in opts:
653 if o in ("-h", "--help"):
654 usage()
655 sys.exit()
656 elif o in ("-c", "--config"):
657 config_file = a
658 elif o == "--health-check":
659 health_check()
660 # elif o == "--log-socket-port":
661 # log_socket_port = a
662 # elif o == "--log-socket-host":
663 # log_socket_host = a
664 # elif o == "--log-file":
665 # log_file = a
666 else:
667 assert False, "Unhandled option"
668
669 if config_file:
670 if not path.isfile(config_file):
671 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
672 exit(1)
673 else:
674 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
675 if path.isfile(config_file):
676 break
677 else:
678 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
679 exit(1)
680 # get version from package and upate global lcm_version
681 _get_version()
682 lcm = Lcm(config_file)
683 lcm.start()
684 except (LcmException, getopt.GetoptError) as e:
685 print(str(e), file=sys.stderr)
686 # usage()
687 exit(1)