eaf2558533f922ef6746a916517a62628f7b64a1
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from os import environ, path
46 from random import choice as random_choice
47 from n2vc import version as n2vc_version
48
49 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
50 pdb.set_trace()
51
52
53 __author__ = "Alfonso Tierno"
54 min_RO_version = "6.0.2"
55 min_n2vc_version = "0.0.2"
56
57 min_common_version = "0.1.19"
58 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
59
60
61 class Lcm:
62
63 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
64 ping_interval_boot = 5 # how many time ping is sent when booting
65 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
66 # ^ contains for each section at lcm.cfg the used logger name
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 self.db = None
75 self.msg = None
76 self.msg_admin = None
77 self.fs = None
78 self.pings_not_received = 1
79 self.consecutive_errors = 0
80 self.first_start = False
81
82 # logging
83 self.logger = logging.getLogger('lcm')
84 # get id
85 self.worker_id = self.get_process_id()
86 # load configuration
87 config = self.read_config_file(config_file)
88 self.config = config
89 self.config["ro_config"] = {
90 "ng": config["RO"].get("ng", False),
91 "uri": config["RO"].get("uri"),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.roclient",
94 "loglevel": config["RO"].get("loglevel", "ERROR"),
95 }
96 if not self.config["ro_config"]["uri"]:
97 if not self.config["ro_config"]["ng"]:
98 self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"],
99 config["RO"]["port"])
100 else:
101 self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"])
102
103 self.loop = loop or asyncio.get_event_loop()
104
105 # logging
106 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
108 config["database"]["logger_name"] = "lcm.db"
109 config["storage"]["logger_name"] = "lcm.fs"
110 config["message"]["logger_name"] = "lcm.msg"
111 if config["global"].get("logfile"):
112 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
113 maxBytes=100e6, backupCount=9, delay=0)
114 file_handler.setFormatter(log_formatter_simple)
115 self.logger.addHandler(file_handler)
116 if not config["global"].get("nologging"):
117 str_handler = logging.StreamHandler()
118 str_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(str_handler)
120
121 if config["global"].get("loglevel"):
122 self.logger.setLevel(config["global"]["loglevel"])
123
124 # logging other modules
125 for k1, logname in self.cfg_logger_name.items():
126 config[k1]["logger_name"] = logname
127 logger_module = logging.getLogger(logname)
128 if config[k1].get("logfile"):
129 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
130 maxBytes=100e6, backupCount=9, delay=0)
131 file_handler.setFormatter(log_formatter_simple)
132 logger_module.addHandler(file_handler)
133 if config[k1].get("loglevel"):
134 logger_module.setLevel(config[k1]["loglevel"])
135 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
136
137 # check version of N2VC
138 # TODO enhance with int conversion or from distutils.version import LooseVersion
139 # or with list(map(int, version.split(".")))
140 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
141 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
142 n2vc_version, min_n2vc_version))
143 # check version of common
144 if versiontuple(common_version) < versiontuple(min_common_version):
145 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
146 common_version, min_common_version))
147
148 try:
149 # TODO check database version
150 if config["database"]["driver"] == "mongo":
151 self.db = dbmongo.DbMongo()
152 self.db.db_connect(config["database"])
153 elif config["database"]["driver"] == "memory":
154 self.db = dbmemory.DbMemory()
155 self.db.db_connect(config["database"])
156 else:
157 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
158 config["database"]["driver"]))
159
160 if config["storage"]["driver"] == "local":
161 self.fs = fslocal.FsLocal()
162 self.fs.fs_connect(config["storage"])
163 elif config["storage"]["driver"] == "mongo":
164 self.fs = fsmongo.FsMongo()
165 self.fs.fs_connect(config["storage"])
166 else:
167 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
168 config["storage"]["driver"]))
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = config["message"].copy()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
187 config["message"]["driver"]))
188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
192 # contains created tasks/futures to be able to cancel
193 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
194
195 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
196 if self.config["tsdb"]["driver"] == "prometheus":
197 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.db, self.loop)
198 else:
199 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
200 config["tsdb"]["driver"]))
201 else:
202 self.prometheus = None
203 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
204 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
205 self.ns)
206 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
207 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
208 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
209 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
210 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
211
212 async def check_RO_version(self):
213 tries = 14
214 last_error = None
215 while True:
216 try:
217 if self.config["ro_config"].get("ng"):
218 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
219 else:
220 ro_server = ROClient(self.loop, **self.config["ro_config"])
221 ro_version = await ro_server.get_version()
222 if versiontuple(ro_version) < versiontuple(min_RO_version):
223 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
224 ro_version, min_RO_version))
225 self.logger.info("Connected to RO version {}".format(ro_version))
226 return
227 except (ROClientException, NgRoException) as e:
228 tries -= 1
229 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
230 if tries <= 0:
231 self.logger.critical(error_text)
232 raise LcmException(error_text)
233 if last_error != error_text:
234 last_error = error_text
235 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
236 await asyncio.sleep(5)
237
238 async def test(self, param=None):
239 self.logger.debug("Starting/Ending test task: {}".format(param))
240
241 async def kafka_ping(self):
242 self.logger.debug("Task kafka_ping Enter")
243 consecutive_errors = 0
244 first_start = True
245 kafka_has_received = False
246 self.pings_not_received = 1
247 while True:
248 try:
249 await self.msg_admin.aiowrite(
250 "admin", "ping",
251 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
252 self.loop)
253 # time between pings are low when it is not received and at starting
254 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
255 if not self.pings_not_received:
256 kafka_has_received = True
257 self.pings_not_received += 1
258 await asyncio.sleep(wait_time, loop=self.loop)
259 if self.pings_not_received > 10:
260 raise LcmException("It is not receiving pings from Kafka bus")
261 consecutive_errors = 0
262 first_start = False
263 except LcmException:
264 raise
265 except Exception as e:
266 # if not first_start is the first time after starting. So leave more time and wait
267 # to allow kafka starts
268 if consecutive_errors == 8 if not first_start else 30:
269 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
270 raise
271 consecutive_errors += 1
272 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
273 wait_time = 2 if not first_start else 5
274 await asyncio.sleep(wait_time, loop=self.loop)
275
276 def kafka_read_callback(self, topic, command, params):
277 order_id = 1
278
279 if topic != "admin" and command != "ping":
280 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
281 self.consecutive_errors = 0
282 self.first_start = False
283 order_id += 1
284 if command == "exit":
285 raise LcmExceptionExit
286 elif command.startswith("#"):
287 return
288 elif command == "echo":
289 # just for test
290 print(params)
291 sys.stdout.flush()
292 return
293 elif command == "test":
294 asyncio.Task(self.test(params), loop=self.loop)
295 return
296
297 if topic == "admin":
298 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
299 if params.get("worker_id") != self.worker_id:
300 return
301 self.pings_not_received = 0
302 try:
303 with open(health_check_file, "w") as f:
304 f.write(str(time()))
305 except Exception as e:
306 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
307 return
308 elif topic == "pla":
309 if command == "placement":
310 self.ns.update_nsrs_with_pla_result(params)
311 return
312 elif topic == "k8scluster":
313 if command == "create" or command == "created":
314 k8scluster_id = params.get("_id")
315 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
316 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
317 return
318 elif command == "delete" or command == "deleted":
319 k8scluster_id = params.get("_id")
320 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
321 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
322 return
323 elif topic == "k8srepo":
324 if command == "create" or command == "created":
325 k8srepo_id = params.get("_id")
326 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
327 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
328 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
329 return
330 elif command == "delete" or command == "deleted":
331 k8srepo_id = params.get("_id")
332 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
333 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
334 return
335 elif topic == "ns":
336 if command == "instantiate":
337 # self.logger.debug("Deploying NS {}".format(nsr_id))
338 nslcmop = params
339 nslcmop_id = nslcmop["_id"]
340 nsr_id = nslcmop["nsInstanceId"]
341 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
342 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
343 return
344 elif command == "terminate":
345 # self.logger.debug("Deleting NS {}".format(nsr_id))
346 nslcmop = params
347 nslcmop_id = nslcmop["_id"]
348 nsr_id = nslcmop["nsInstanceId"]
349 self.lcm_tasks.cancel(topic, nsr_id)
350 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
351 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
352 return
353 elif command == "action":
354 # self.logger.debug("Update NS {}".format(nsr_id))
355 nslcmop = params
356 nslcmop_id = nslcmop["_id"]
357 nsr_id = nslcmop["nsInstanceId"]
358 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
359 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
360 return
361 elif command == "scale":
362 # self.logger.debug("Update NS {}".format(nsr_id))
363 nslcmop = params
364 nslcmop_id = nslcmop["_id"]
365 nsr_id = nslcmop["nsInstanceId"]
366 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
367 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
368 return
369 elif command == "show":
370 nsr_id = params
371 try:
372 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
373 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
374 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
375 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
376 db_nsr["detailed-status"],
377 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
378 except Exception as e:
379 print("nsr {} not found: {}".format(nsr_id, e))
380 sys.stdout.flush()
381 return
382 elif command == "deleted":
383 return # TODO cleaning of task just in case should be done
384 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
385 return
386 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
387 if command == "instantiate":
388 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
389 nsilcmop = params
390 nsilcmop_id = nsilcmop["_id"] # slice operation id
391 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
392 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
393 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
394 return
395 elif command == "terminate":
396 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
397 nsilcmop = params
398 nsilcmop_id = nsilcmop["_id"] # slice operation id
399 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
400 self.lcm_tasks.cancel(topic, nsir_id)
401 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
402 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
403 return
404 elif command == "show":
405 nsir_id = params
406 try:
407 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
408 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
409 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
410 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
411 db_nsir["detailed-status"],
412 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
413 except Exception as e:
414 print("nsir {} not found: {}".format(nsir_id, e))
415 sys.stdout.flush()
416 return
417 elif command == "deleted":
418 return # TODO cleaning of task just in case should be done
419 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
420 return
421 elif topic == "vim_account":
422 vim_id = params["_id"]
423 if command in ("create", "created"):
424 task = asyncio.ensure_future(self.vim.create(params, order_id))
425 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
426 return
427 elif command == "delete" or command == "deleted":
428 self.lcm_tasks.cancel(topic, vim_id)
429 task = asyncio.ensure_future(self.vim.delete(params, order_id))
430 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
431 return
432 elif command == "show":
433 print("not implemented show with vim_account")
434 sys.stdout.flush()
435 return
436 elif command in ("edit", "edited"):
437 task = asyncio.ensure_future(self.vim.edit(params, order_id))
438 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
439 return
440 elif command == "deleted":
441 return # TODO cleaning of task just in case should be done
442 elif topic == "wim_account":
443 wim_id = params["_id"]
444 if command in ("create", "created"):
445 task = asyncio.ensure_future(self.wim.create(params, order_id))
446 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
447 return
448 elif command == "delete" or command == "deleted":
449 self.lcm_tasks.cancel(topic, wim_id)
450 task = asyncio.ensure_future(self.wim.delete(params, order_id))
451 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
452 return
453 elif command == "show":
454 print("not implemented show with wim_account")
455 sys.stdout.flush()
456 return
457 elif command in ("edit", "edited"):
458 task = asyncio.ensure_future(self.wim.edit(params, order_id))
459 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
460 return
461 elif command == "deleted":
462 return # TODO cleaning of task just in case should be done
463 elif topic == "sdn":
464 _sdn_id = params["_id"]
465 if command in ("create", "created"):
466 task = asyncio.ensure_future(self.sdn.create(params, order_id))
467 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
468 return
469 elif command == "delete" or command == "deleted":
470 self.lcm_tasks.cancel(topic, _sdn_id)
471 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
472 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
473 return
474 elif command in ("edit", "edited"):
475 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
476 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
477 return
478 elif command == "deleted":
479 return # TODO cleaning of task just in case should be done
480 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
481
482 async def kafka_read(self):
483 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
484 # future = asyncio.Future()
485 self.consecutive_errors = 0
486 self.first_start = True
487 while self.consecutive_errors < 10:
488 try:
489 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
490 topics_admin = ("admin", )
491 await asyncio.gather(
492 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
493 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
494 )
495
496 except LcmExceptionExit:
497 self.logger.debug("Bye!")
498 break
499 except Exception as e:
500 # if not first_start is the first time after starting. So leave more time and wait
501 # to allow kafka starts
502 if self.consecutive_errors == 8 if not self.first_start else 30:
503 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
504 raise
505 self.consecutive_errors += 1
506 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
507 wait_time = 2 if not self.first_start else 5
508 await asyncio.sleep(wait_time, loop=self.loop)
509
510 # self.logger.debug("Task kafka_read terminating")
511 self.logger.debug("Task kafka_read exit")
512
513 def start(self):
514
515 # check RO version
516 self.loop.run_until_complete(self.check_RO_version())
517
518 # configure tsdb prometheus
519 if self.prometheus:
520 self.loop.run_until_complete(self.prometheus.start())
521
522 self.loop.run_until_complete(asyncio.gather(
523 self.kafka_read(),
524 self.kafka_ping()
525 ))
526 # TODO
527 # self.logger.debug("Terminating cancelling creation tasks")
528 # self.lcm_tasks.cancel("ALL", "create")
529 # timeout = 200
530 # while self.is_pending_tasks():
531 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
532 # await asyncio.sleep(2, loop=self.loop)
533 # timeout -= 2
534 # if not timeout:
535 # self.lcm_tasks.cancel("ALL", "ALL")
536 self.loop.close()
537 self.loop = None
538 if self.db:
539 self.db.db_disconnect()
540 if self.msg:
541 self.msg.disconnect()
542 if self.msg_admin:
543 self.msg_admin.disconnect()
544 if self.fs:
545 self.fs.fs_disconnect()
546
547 def read_config_file(self, config_file):
548 # TODO make a [ini] + yaml inside parser
549 # the configparser library is not suitable, because it does not admit comments at the end of line,
550 # and not parse integer or boolean
551 try:
552 # read file as yaml format
553 with open(config_file) as f:
554 conf = yaml.load(f, Loader=yaml.Loader)
555 # Ensure all sections are not empty
556 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
557 if not conf.get(k):
558 conf[k] = {}
559
560 # read all environ that starts with OSMLCM_
561 for k, v in environ.items():
562 if not k.startswith("OSMLCM_"):
563 continue
564 subject, _, item = k[7:].lower().partition("_")
565 if not item:
566 continue
567 if subject in ("ro", "vca"):
568 # put in capital letter
569 subject = subject.upper()
570 try:
571 if item == "port" or subject == "timeout":
572 conf[subject][item] = int(v)
573 else:
574 conf[subject][item] = v
575 except Exception as e:
576 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
577
578 # backward compatibility of VCA parameters
579
580 if 'pubkey' in conf["VCA"]:
581 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
582 if 'cacert' in conf["VCA"]:
583 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
584 if 'apiproxy' in conf["VCA"]:
585 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
586
587 if 'enableosupgrade' in conf["VCA"]:
588 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
589 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
590 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
591 conf["VCA"]['enable_os_upgrade'] = False
592 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
593 conf["VCA"]['enable_os_upgrade'] = True
594
595 if 'aptmirror' in conf["VCA"]:
596 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
597
598 return conf
599 except Exception as e:
600 self.logger.critical("At config file '{}': {}".format(config_file, e))
601 exit(1)
602
603 @staticmethod
604 def get_process_id():
605 """
606 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
607 will provide a random one
608 :return: Obtained ID
609 """
610 # Try getting docker id. If fails, get pid
611 try:
612 with open("/proc/self/cgroup", "r") as f:
613 text_id_ = f.readline()
614 _, _, text_id = text_id_.rpartition("/")
615 text_id = text_id.replace('\n', '')[:12]
616 if text_id:
617 return text_id
618 except Exception:
619 pass
620 # Return a random id
621 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
622
623
624 def usage():
625 print("""Usage: {} [options]
626 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
627 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
628 -h|--help: shows this help
629 """.format(sys.argv[0]))
630 # --log-socket-host HOST: send logs to this host")
631 # --log-socket-port PORT: send logs using this port (default: 9022)")
632
633
634 if __name__ == '__main__':
635
636 try:
637 # print("SYS.PATH='{}'".format(sys.path))
638 # load parameters and configuration
639 # -h
640 # -c value
641 # --config value
642 # --help
643 # --health-check
644 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
645 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
646 config_file = None
647 for o, a in opts:
648 if o in ("-h", "--help"):
649 usage()
650 sys.exit()
651 elif o in ("-c", "--config"):
652 config_file = a
653 elif o == "--health-check":
654 from osm_lcm.lcm_hc import health_check
655 health_check(health_check_file, Lcm.ping_interval_pace)
656 # elif o == "--log-socket-port":
657 # log_socket_port = a
658 # elif o == "--log-socket-host":
659 # log_socket_host = a
660 # elif o == "--log-file":
661 # log_file = a
662 else:
663 assert False, "Unhandled option"
664
665 if config_file:
666 if not path.isfile(config_file):
667 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
668 exit(1)
669 else:
670 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
671 if path.isfile(config_file):
672 break
673 else:
674 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
675 exit(1)
676 lcm = Lcm(config_file)
677 lcm.start()
678 except (LcmException, getopt.GetoptError) as e:
679 print(str(e), file=sys.stderr)
680 # usage()
681 exit(1)