dd7c4ec213e22f34ea82a0eba4be7e86ed1a6d7c
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from os import environ, path
46 from random import choice as random_choice
47 from n2vc import version as n2vc_version
48
49 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
50 pdb.set_trace()
51
52
53 __author__ = "Alfonso Tierno"
54 min_RO_version = "6.0.2"
55 min_n2vc_version = "0.0.2"
56
57 min_common_version = "0.1.19"
58 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
59
60
61 class Lcm:
62
63 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
64 ping_interval_boot = 5 # how many time ping is sent when booting
65
66 def __init__(self, config_file, loop=None):
67 """
68 Init, Connect to database, filesystem storage, and messaging
69 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
70 :return: None
71 """
72 self.db = None
73 self.msg = None
74 self.msg_admin = None
75 self.fs = None
76 self.pings_not_received = 1
77 self.consecutive_errors = 0
78 self.first_start = False
79
80 # logging
81 self.logger = logging.getLogger('lcm')
82 # get id
83 self.worker_id = self.get_process_id()
84 # load configuration
85 config = self.read_config_file(config_file)
86 self.config = config
87 self.config["ro_config"] = {
88 "ng": config["RO"].get("ng", False),
89 "uri": config["RO"].get("uri"),
90 "tenant": config.get("tenant", "osm"),
91 "logger_name": "lcm.roclient",
92 "loglevel": config["RO"].get("loglevel", "ERROR"),
93 }
94 if not self.config["ro_config"]["uri"]:
95 if not self.config["ro_config"]["ng"]:
96 self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"],
97 config["RO"]["port"])
98 else:
99 self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"])
100
101 self.loop = loop or asyncio.get_event_loop()
102
103 # logging
104 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
105 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
106 config["database"]["logger_name"] = "lcm.db"
107 config["storage"]["logger_name"] = "lcm.fs"
108 config["message"]["logger_name"] = "lcm.msg"
109 if config["global"].get("logfile"):
110 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
111 maxBytes=100e6, backupCount=9, delay=0)
112 file_handler.setFormatter(log_formatter_simple)
113 self.logger.addHandler(file_handler)
114 if not config["global"].get("nologging"):
115 str_handler = logging.StreamHandler()
116 str_handler.setFormatter(log_formatter_simple)
117 self.logger.addHandler(str_handler)
118
119 if config["global"].get("loglevel"):
120 self.logger.setLevel(config["global"]["loglevel"])
121
122 # logging other modules
123 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
124 config[k1]["logger_name"] = logname
125 logger_module = logging.getLogger(logname)
126 if config[k1].get("logfile"):
127 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
128 maxBytes=100e6, backupCount=9, delay=0)
129 file_handler.setFormatter(log_formatter_simple)
130 logger_module.addHandler(file_handler)
131 if config[k1].get("loglevel"):
132 logger_module.setLevel(config[k1]["loglevel"])
133 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
134
135 # check version of N2VC
136 # TODO enhance with int conversion or from distutils.version import LooseVersion
137 # or with list(map(int, version.split(".")))
138 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
139 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
140 n2vc_version, min_n2vc_version))
141 # check version of common
142 if versiontuple(common_version) < versiontuple(min_common_version):
143 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
144 common_version, min_common_version))
145
146 try:
147 # TODO check database version
148 if config["database"]["driver"] == "mongo":
149 self.db = dbmongo.DbMongo()
150 self.db.db_connect(config["database"])
151 elif config["database"]["driver"] == "memory":
152 self.db = dbmemory.DbMemory()
153 self.db.db_connect(config["database"])
154 else:
155 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
156 config["database"]["driver"]))
157
158 if config["storage"]["driver"] == "local":
159 self.fs = fslocal.FsLocal()
160 self.fs.fs_connect(config["storage"])
161 elif config["storage"]["driver"] == "mongo":
162 self.fs = fsmongo.FsMongo()
163 self.fs.fs_connect(config["storage"])
164 else:
165 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
166 config["storage"]["driver"]))
167
168 # copy message configuration in order to remove 'group_id' for msg_admin
169 config_message = config["message"].copy()
170 config_message["loop"] = self.loop
171 if config_message["driver"] == "local":
172 self.msg = msglocal.MsgLocal()
173 self.msg.connect(config_message)
174 self.msg_admin = msglocal.MsgLocal()
175 config_message.pop("group_id", None)
176 self.msg_admin.connect(config_message)
177 elif config_message["driver"] == "kafka":
178 self.msg = msgkafka.MsgKafka()
179 self.msg.connect(config_message)
180 self.msg_admin = msgkafka.MsgKafka()
181 config_message.pop("group_id", None)
182 self.msg_admin.connect(config_message)
183 else:
184 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
185 config["message"]["driver"]))
186 except (DbException, FsException, MsgException) as e:
187 self.logger.critical(str(e), exc_info=True)
188 raise LcmException(str(e))
189
190 # contains created tasks/futures to be able to cancel
191 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
192
193 if self.config.get("prometheus"):
194 self.prometheus = prometheus.Prometheus(self.config["prometheus"], self.worker_id, self.db, self.loop)
195 else:
196 self.prometheus = None
197 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
198 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
199 self.ns)
200 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
201 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
202 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
203 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
204 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
205
206 async def check_RO_version(self):
207 tries = 14
208 last_error = None
209 while True:
210 try:
211 if self.config["ro_config"].get("ng"):
212 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
213 else:
214 ro_server = ROClient(self.loop, **self.config["ro_config"])
215 ro_version = await ro_server.get_version()
216 if versiontuple(ro_version) < versiontuple(min_RO_version):
217 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
218 ro_version, min_RO_version))
219 self.logger.info("Connected to RO version {}".format(ro_version))
220 return
221 except (ROClientException, NgRoException) as e:
222 tries -= 1
223 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
224 if tries <= 0:
225 self.logger.critical(error_text)
226 raise LcmException(error_text)
227 if last_error != error_text:
228 last_error = error_text
229 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
230 await asyncio.sleep(5)
231
232 async def test(self, param=None):
233 self.logger.debug("Starting/Ending test task: {}".format(param))
234
235 async def kafka_ping(self):
236 self.logger.debug("Task kafka_ping Enter")
237 consecutive_errors = 0
238 first_start = True
239 kafka_has_received = False
240 self.pings_not_received = 1
241 while True:
242 try:
243 await self.msg_admin.aiowrite(
244 "admin", "ping",
245 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
246 self.loop)
247 # time between pings are low when it is not received and at starting
248 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
249 if not self.pings_not_received:
250 kafka_has_received = True
251 self.pings_not_received += 1
252 await asyncio.sleep(wait_time, loop=self.loop)
253 if self.pings_not_received > 10:
254 raise LcmException("It is not receiving pings from Kafka bus")
255 consecutive_errors = 0
256 first_start = False
257 except LcmException:
258 raise
259 except Exception as e:
260 # if not first_start is the first time after starting. So leave more time and wait
261 # to allow kafka starts
262 if consecutive_errors == 8 if not first_start else 30:
263 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
264 raise
265 consecutive_errors += 1
266 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
267 wait_time = 2 if not first_start else 5
268 await asyncio.sleep(wait_time, loop=self.loop)
269
270 def kafka_read_callback(self, topic, command, params):
271 order_id = 1
272
273 if topic != "admin" and command != "ping":
274 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
275 self.consecutive_errors = 0
276 self.first_start = False
277 order_id += 1
278 if command == "exit":
279 raise LcmExceptionExit
280 elif command.startswith("#"):
281 return
282 elif command == "echo":
283 # just for test
284 print(params)
285 sys.stdout.flush()
286 return
287 elif command == "test":
288 asyncio.Task(self.test(params), loop=self.loop)
289 return
290
291 if topic == "admin":
292 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
293 if params.get("worker_id") != self.worker_id:
294 return
295 self.pings_not_received = 0
296 try:
297 with open(health_check_file, "w") as f:
298 f.write(str(time()))
299 except Exception as e:
300 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
301 return
302 elif topic == "pla":
303 if command == "placement":
304 self.ns.update_nsrs_with_pla_result(params)
305 return
306 elif topic == "k8scluster":
307 if command == "create" or command == "created":
308 k8scluster_id = params.get("_id")
309 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
310 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
311 return
312 elif command == "delete" or command == "deleted":
313 k8scluster_id = params.get("_id")
314 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
315 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
316 return
317 elif topic == "k8srepo":
318 if command == "create" or command == "created":
319 k8srepo_id = params.get("_id")
320 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
321 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
322 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
323 return
324 elif command == "delete" or command == "deleted":
325 k8srepo_id = params.get("_id")
326 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
327 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
328 return
329 elif topic == "ns":
330 if command == "instantiate":
331 # self.logger.debug("Deploying NS {}".format(nsr_id))
332 nslcmop = params
333 nslcmop_id = nslcmop["_id"]
334 nsr_id = nslcmop["nsInstanceId"]
335 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
336 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
337 return
338 elif command == "terminate":
339 # self.logger.debug("Deleting NS {}".format(nsr_id))
340 nslcmop = params
341 nslcmop_id = nslcmop["_id"]
342 nsr_id = nslcmop["nsInstanceId"]
343 self.lcm_tasks.cancel(topic, nsr_id)
344 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
345 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
346 return
347 elif command == "action":
348 # self.logger.debug("Update NS {}".format(nsr_id))
349 nslcmop = params
350 nslcmop_id = nslcmop["_id"]
351 nsr_id = nslcmop["nsInstanceId"]
352 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
353 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
354 return
355 elif command == "scale":
356 # self.logger.debug("Update NS {}".format(nsr_id))
357 nslcmop = params
358 nslcmop_id = nslcmop["_id"]
359 nsr_id = nslcmop["nsInstanceId"]
360 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
361 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
362 return
363 elif command == "show":
364 nsr_id = params
365 try:
366 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
367 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
368 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
369 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
370 db_nsr["detailed-status"],
371 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
372 except Exception as e:
373 print("nsr {} not found: {}".format(nsr_id, e))
374 sys.stdout.flush()
375 return
376 elif command == "deleted":
377 return # TODO cleaning of task just in case should be done
378 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
379 return
380 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
381 if command == "instantiate":
382 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
383 nsilcmop = params
384 nsilcmop_id = nsilcmop["_id"] # slice operation id
385 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
386 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
387 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
388 return
389 elif command == "terminate":
390 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
391 nsilcmop = params
392 nsilcmop_id = nsilcmop["_id"] # slice operation id
393 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
394 self.lcm_tasks.cancel(topic, nsir_id)
395 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
396 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
397 return
398 elif command == "show":
399 nsir_id = params
400 try:
401 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
402 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
403 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
404 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
405 db_nsir["detailed-status"],
406 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
407 except Exception as e:
408 print("nsir {} not found: {}".format(nsir_id, e))
409 sys.stdout.flush()
410 return
411 elif command == "deleted":
412 return # TODO cleaning of task just in case should be done
413 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
414 return
415 elif topic == "vim_account":
416 vim_id = params["_id"]
417 if command in ("create", "created"):
418 task = asyncio.ensure_future(self.vim.create(params, order_id))
419 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
420 return
421 elif command == "delete" or command == "deleted":
422 self.lcm_tasks.cancel(topic, vim_id)
423 task = asyncio.ensure_future(self.vim.delete(params, order_id))
424 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
425 return
426 elif command == "show":
427 print("not implemented show with vim_account")
428 sys.stdout.flush()
429 return
430 elif command in ("edit", "edited"):
431 task = asyncio.ensure_future(self.vim.edit(params, order_id))
432 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
433 return
434 elif command == "deleted":
435 return # TODO cleaning of task just in case should be done
436 elif topic == "wim_account":
437 wim_id = params["_id"]
438 if command in ("create", "created"):
439 task = asyncio.ensure_future(self.wim.create(params, order_id))
440 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
441 return
442 elif command == "delete" or command == "deleted":
443 self.lcm_tasks.cancel(topic, wim_id)
444 task = asyncio.ensure_future(self.wim.delete(params, order_id))
445 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
446 return
447 elif command == "show":
448 print("not implemented show with wim_account")
449 sys.stdout.flush()
450 return
451 elif command in ("edit", "edited"):
452 task = asyncio.ensure_future(self.wim.edit(params, order_id))
453 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
454 return
455 elif command == "deleted":
456 return # TODO cleaning of task just in case should be done
457 elif topic == "sdn":
458 _sdn_id = params["_id"]
459 if command in ("create", "created"):
460 task = asyncio.ensure_future(self.sdn.create(params, order_id))
461 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
462 return
463 elif command == "delete" or command == "deleted":
464 self.lcm_tasks.cancel(topic, _sdn_id)
465 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
466 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
467 return
468 elif command in ("edit", "edited"):
469 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
470 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
471 return
472 elif command == "deleted":
473 return # TODO cleaning of task just in case should be done
474 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
475
476 async def kafka_read(self):
477 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
478 # future = asyncio.Future()
479 self.consecutive_errors = 0
480 self.first_start = True
481 while self.consecutive_errors < 10:
482 try:
483 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
484 topics_admin = ("admin", )
485 await asyncio.gather(
486 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
487 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
488 )
489
490 except LcmExceptionExit:
491 self.logger.debug("Bye!")
492 break
493 except Exception as e:
494 # if not first_start is the first time after starting. So leave more time and wait
495 # to allow kafka starts
496 if self.consecutive_errors == 8 if not self.first_start else 30:
497 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
498 raise
499 self.consecutive_errors += 1
500 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
501 wait_time = 2 if not self.first_start else 5
502 await asyncio.sleep(wait_time, loop=self.loop)
503
504 # self.logger.debug("Task kafka_read terminating")
505 self.logger.debug("Task kafka_read exit")
506
507 def start(self):
508
509 # check RO version
510 self.loop.run_until_complete(self.check_RO_version())
511
512 # configure Prometheus
513 if self.prometheus:
514 self.loop.run_until_complete(self.prometheus.start())
515
516 self.loop.run_until_complete(asyncio.gather(
517 self.kafka_read(),
518 self.kafka_ping()
519 ))
520 # TODO
521 # self.logger.debug("Terminating cancelling creation tasks")
522 # self.lcm_tasks.cancel("ALL", "create")
523 # timeout = 200
524 # while self.is_pending_tasks():
525 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
526 # await asyncio.sleep(2, loop=self.loop)
527 # timeout -= 2
528 # if not timeout:
529 # self.lcm_tasks.cancel("ALL", "ALL")
530 self.loop.close()
531 self.loop = None
532 if self.db:
533 self.db.db_disconnect()
534 if self.msg:
535 self.msg.disconnect()
536 if self.msg_admin:
537 self.msg_admin.disconnect()
538 if self.fs:
539 self.fs.fs_disconnect()
540
541 def read_config_file(self, config_file):
542 # TODO make a [ini] + yaml inside parser
543 # the configparser library is not suitable, because it does not admit comments at the end of line,
544 # and not parse integer or boolean
545 try:
546 # read file as yaml format
547 with open(config_file) as f:
548 conf = yaml.load(f, Loader=yaml.Loader)
549 # Ensure all sections are not empty
550 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
551 if not conf.get(k):
552 conf[k] = {}
553
554 # read all environ that starts with OSMLCM_
555 for k, v in environ.items():
556 if not k.startswith("OSMLCM_"):
557 continue
558 subject, _, item = k[7:].lower().partition("_")
559 if not item:
560 continue
561 if subject in ("ro", "vca"):
562 # put in capital letter
563 subject = subject.upper()
564 try:
565 if item == "port" or subject == "timeout":
566 conf[subject][item] = int(v)
567 else:
568 conf[subject][item] = v
569 except Exception as e:
570 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
571
572 # backward compatibility of VCA parameters
573
574 if 'pubkey' in conf["VCA"]:
575 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
576 if 'cacert' in conf["VCA"]:
577 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
578 if 'apiproxy' in conf["VCA"]:
579 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
580
581 if 'enableosupgrade' in conf["VCA"]:
582 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
583 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
584 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
585 conf["VCA"]['enable_os_upgrade'] = False
586 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
587 conf["VCA"]['enable_os_upgrade'] = True
588
589 if 'aptmirror' in conf["VCA"]:
590 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
591
592 return conf
593 except Exception as e:
594 self.logger.critical("At config file '{}': {}".format(config_file, e))
595 exit(1)
596
597 @staticmethod
598 def get_process_id():
599 """
600 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
601 will provide a random one
602 :return: Obtained ID
603 """
604 # Try getting docker id. If fails, get pid
605 try:
606 with open("/proc/self/cgroup", "r") as f:
607 text_id_ = f.readline()
608 _, _, text_id = text_id_.rpartition("/")
609 text_id = text_id.replace('\n', '')[:12]
610 if text_id:
611 return text_id
612 except Exception:
613 pass
614 # Return a random id
615 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
616
617
618 def usage():
619 print("""Usage: {} [options]
620 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
621 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
622 -h|--help: shows this help
623 """.format(sys.argv[0]))
624 # --log-socket-host HOST: send logs to this host")
625 # --log-socket-port PORT: send logs using this port (default: 9022)")
626
627
628 if __name__ == '__main__':
629
630 try:
631 # print("SYS.PATH='{}'".format(sys.path))
632 # load parameters and configuration
633 # -h
634 # -c value
635 # --config value
636 # --help
637 # --health-check
638 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
639 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
640 config_file = None
641 for o, a in opts:
642 if o in ("-h", "--help"):
643 usage()
644 sys.exit()
645 elif o in ("-c", "--config"):
646 config_file = a
647 elif o == "--health-check":
648 from osm_lcm.lcm_hc import health_check
649 health_check(health_check_file, Lcm.ping_interval_pace)
650 # elif o == "--log-socket-port":
651 # log_socket_port = a
652 # elif o == "--log-socket-host":
653 # log_socket_host = a
654 # elif o == "--log-file":
655 # log_file = a
656 else:
657 assert False, "Unhandled option"
658
659 if config_file:
660 if not path.isfile(config_file):
661 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
662 exit(1)
663 else:
664 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
665 if path.isfile(config_file):
666 break
667 else:
668 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
669 exit(1)
670 lcm = Lcm(config_file)
671 lcm.start()
672 except (LcmException, getopt.GetoptError) as e:
673 print(str(e), file=sys.stderr)
674 # usage()
675 exit(1)