a3fa60942d353f782864958587f68e3c6abae34b
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from os import environ, path
46 from random import choice as random_choice
47 from n2vc import version as n2vc_version
48
49 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
50 pdb.set_trace()
51
52
53 __author__ = "Alfonso Tierno"
54 min_RO_version = "6.0.2"
55 min_n2vc_version = "0.0.2"
56
57 min_common_version = "0.1.19"
58 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
59
60
61 class Lcm:
62
63 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
64 ping_interval_boot = 5 # how many time ping is sent when booting
65 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
66 # ^ contains for each section at lcm.cfg the used logger name
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 self.db = None
75 self.msg = None
76 self.msg_admin = None
77 self.fs = None
78 self.pings_not_received = 1
79 self.consecutive_errors = 0
80 self.first_start = False
81
82 # logging
83 self.logger = logging.getLogger('lcm')
84 # get id
85 self.worker_id = self.get_process_id()
86 # load configuration
87 config = self.read_config_file(config_file)
88 self.config = config
89 self.config["ro_config"] = {
90 "ng": config["RO"].get("ng", False),
91 "uri": config["RO"].get("uri"),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.roclient",
94 "loglevel": config["RO"].get("loglevel", "ERROR"),
95 }
96 if not self.config["ro_config"]["uri"]:
97 if not self.config["ro_config"]["ng"]:
98 self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"],
99 config["RO"]["port"])
100 else:
101 self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"])
102
103 self.loop = loop or asyncio.get_event_loop()
104
105 # logging
106 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
108 config["database"]["logger_name"] = "lcm.db"
109 config["storage"]["logger_name"] = "lcm.fs"
110 config["message"]["logger_name"] = "lcm.msg"
111 if config["global"].get("logfile"):
112 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
113 maxBytes=100e6, backupCount=9, delay=0)
114 file_handler.setFormatter(log_formatter_simple)
115 self.logger.addHandler(file_handler)
116 if not config["global"].get("nologging"):
117 str_handler = logging.StreamHandler()
118 str_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(str_handler)
120
121 if config["global"].get("loglevel"):
122 self.logger.setLevel(config["global"]["loglevel"])
123
124 # logging other modules
125 for k1, logname in self.cfg_logger_name.items():
126 config[k1]["logger_name"] = logname
127 logger_module = logging.getLogger(logname)
128 if config[k1].get("logfile"):
129 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
130 maxBytes=100e6, backupCount=9, delay=0)
131 file_handler.setFormatter(log_formatter_simple)
132 logger_module.addHandler(file_handler)
133 if config[k1].get("loglevel"):
134 logger_module.setLevel(config[k1]["loglevel"])
135 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
136
137 # check version of N2VC
138 # TODO enhance with int conversion or from distutils.version import LooseVersion
139 # or with list(map(int, version.split(".")))
140 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
141 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
142 n2vc_version, min_n2vc_version))
143 # check version of common
144 if versiontuple(common_version) < versiontuple(min_common_version):
145 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
146 common_version, min_common_version))
147
148 try:
149 # TODO check database version
150 if config["database"]["driver"] == "mongo":
151 self.db = dbmongo.DbMongo()
152 self.db.db_connect(config["database"])
153 elif config["database"]["driver"] == "memory":
154 self.db = dbmemory.DbMemory()
155 self.db.db_connect(config["database"])
156 else:
157 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
158 config["database"]["driver"]))
159
160 if config["storage"]["driver"] == "local":
161 self.fs = fslocal.FsLocal()
162 self.fs.fs_connect(config["storage"])
163 elif config["storage"]["driver"] == "mongo":
164 self.fs = fsmongo.FsMongo()
165 self.fs.fs_connect(config["storage"])
166 else:
167 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
168 config["storage"]["driver"]))
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = config["message"].copy()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
187 config["message"]["driver"]))
188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
192 # contains created tasks/futures to be able to cancel
193 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
194
195 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
196 if self.config["tsdb"]["driver"] == "prometheus":
197 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.db, self.loop)
198 else:
199 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
200 config["tsdb"]["driver"]))
201 else:
202 self.prometheus = None
203 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
204 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
205 self.ns)
206 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
207 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
208 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
209 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
210 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
211
212 async def check_RO_version(self):
213 tries = 14
214 last_error = None
215 while True:
216 try:
217 if self.config["ro_config"].get("ng"):
218 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
219 else:
220 ro_server = ROClient(self.loop, **self.config["ro_config"])
221 ro_version = await ro_server.get_version()
222 if versiontuple(ro_version) < versiontuple(min_RO_version):
223 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
224 ro_version, min_RO_version))
225 self.logger.info("Connected to RO version {}".format(ro_version))
226 return
227 except (ROClientException, NgRoException) as e:
228 tries -= 1
229 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
230 if tries <= 0:
231 self.logger.critical(error_text)
232 raise LcmException(error_text)
233 if last_error != error_text:
234 last_error = error_text
235 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
236 await asyncio.sleep(5)
237
238 async def test(self, param=None):
239 self.logger.debug("Starting/Ending test task: {}".format(param))
240
241 async def kafka_ping(self):
242 self.logger.debug("Task kafka_ping Enter")
243 consecutive_errors = 0
244 first_start = True
245 kafka_has_received = False
246 self.pings_not_received = 1
247 while True:
248 try:
249 await self.msg_admin.aiowrite(
250 "admin", "ping",
251 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
252 self.loop)
253 # time between pings are low when it is not received and at starting
254 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
255 if not self.pings_not_received:
256 kafka_has_received = True
257 self.pings_not_received += 1
258 await asyncio.sleep(wait_time, loop=self.loop)
259 if self.pings_not_received > 10:
260 raise LcmException("It is not receiving pings from Kafka bus")
261 consecutive_errors = 0
262 first_start = False
263 except LcmException:
264 raise
265 except Exception as e:
266 # if not first_start is the first time after starting. So leave more time and wait
267 # to allow kafka starts
268 if consecutive_errors == 8 if not first_start else 30:
269 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
270 raise
271 consecutive_errors += 1
272 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
273 wait_time = 2 if not first_start else 5
274 await asyncio.sleep(wait_time, loop=self.loop)
275
276 def kafka_read_callback(self, topic, command, params):
277 order_id = 1
278
279 if topic != "admin" and command != "ping":
280 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
281 self.consecutive_errors = 0
282 self.first_start = False
283 order_id += 1
284 if command == "exit":
285 raise LcmExceptionExit
286 elif command.startswith("#"):
287 return
288 elif command == "echo":
289 # just for test
290 print(params)
291 sys.stdout.flush()
292 return
293 elif command == "test":
294 asyncio.Task(self.test(params), loop=self.loop)
295 return
296
297 if topic == "admin":
298 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
299 if params.get("worker_id") != self.worker_id:
300 return
301 self.pings_not_received = 0
302 try:
303 with open(health_check_file, "w") as f:
304 f.write(str(time()))
305 except Exception as e:
306 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
307 return
308 elif topic == "pla":
309 if command == "placement":
310 self.ns.update_nsrs_with_pla_result(params)
311 return
312 elif topic == "k8scluster":
313 if command == "create" or command == "created":
314 k8scluster_id = params.get("_id")
315 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
316 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
317 return
318 elif command == "delete" or command == "deleted":
319 k8scluster_id = params.get("_id")
320 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
321 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
322 return
323 elif topic == "k8srepo":
324 if command == "create" or command == "created":
325 k8srepo_id = params.get("_id")
326 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
327 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
328 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
329 return
330 elif command == "delete" or command == "deleted":
331 k8srepo_id = params.get("_id")
332 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
333 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
334 return
335 elif topic == "ns":
336 if command == "instantiate":
337 # self.logger.debug("Deploying NS {}".format(nsr_id))
338 nslcmop = params
339 nslcmop_id = nslcmop["_id"]
340 nsr_id = nslcmop["nsInstanceId"]
341 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
342 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
343 return
344 elif command == "terminate":
345 # self.logger.debug("Deleting NS {}".format(nsr_id))
346 nslcmop = params
347 nslcmop_id = nslcmop["_id"]
348 nsr_id = nslcmop["nsInstanceId"]
349 self.lcm_tasks.cancel(topic, nsr_id)
350 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
351 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
352 return
353 elif command == "vca_status_refresh":
354 nslcmop = params
355 nslcmop_id = nslcmop["_id"]
356 nsr_id = nslcmop["nsInstanceId"]
357 task = asyncio.ensure_future(self.ns.vca_status_refresh(nsr_id, nslcmop_id))
358 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task)
359 return
360 elif command == "action":
361 # self.logger.debug("Update NS {}".format(nsr_id))
362 nslcmop = params
363 nslcmop_id = nslcmop["_id"]
364 nsr_id = nslcmop["nsInstanceId"]
365 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
366 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
367 return
368 elif command == "scale":
369 # self.logger.debug("Update NS {}".format(nsr_id))
370 nslcmop = params
371 nslcmop_id = nslcmop["_id"]
372 nsr_id = nslcmop["nsInstanceId"]
373 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
374 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
375 return
376 elif command == "show":
377 nsr_id = params
378 try:
379 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
380 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
381 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
382 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
383 db_nsr["detailed-status"],
384 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
385 except Exception as e:
386 print("nsr {} not found: {}".format(nsr_id, e))
387 sys.stdout.flush()
388 return
389 elif command == "deleted":
390 return # TODO cleaning of task just in case should be done
391 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
392 return
393 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
394 if command == "instantiate":
395 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
396 nsilcmop = params
397 nsilcmop_id = nsilcmop["_id"] # slice operation id
398 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
399 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
400 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
401 return
402 elif command == "terminate":
403 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
404 nsilcmop = params
405 nsilcmop_id = nsilcmop["_id"] # slice operation id
406 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
407 self.lcm_tasks.cancel(topic, nsir_id)
408 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
409 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
410 return
411 elif command == "show":
412 nsir_id = params
413 try:
414 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
415 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
416 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
417 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
418 db_nsir["detailed-status"],
419 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
420 except Exception as e:
421 print("nsir {} not found: {}".format(nsir_id, e))
422 sys.stdout.flush()
423 return
424 elif command == "deleted":
425 return # TODO cleaning of task just in case should be done
426 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
427 return
428 elif topic == "vim_account":
429 vim_id = params["_id"]
430 if command in ("create", "created"):
431 task = asyncio.ensure_future(self.vim.create(params, order_id))
432 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
433 return
434 elif command == "delete" or command == "deleted":
435 self.lcm_tasks.cancel(topic, vim_id)
436 task = asyncio.ensure_future(self.vim.delete(params, order_id))
437 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
438 return
439 elif command == "show":
440 print("not implemented show with vim_account")
441 sys.stdout.flush()
442 return
443 elif command in ("edit", "edited"):
444 task = asyncio.ensure_future(self.vim.edit(params, order_id))
445 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
446 return
447 elif command == "deleted":
448 return # TODO cleaning of task just in case should be done
449 elif topic == "wim_account":
450 wim_id = params["_id"]
451 if command in ("create", "created"):
452 task = asyncio.ensure_future(self.wim.create(params, order_id))
453 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
454 return
455 elif command == "delete" or command == "deleted":
456 self.lcm_tasks.cancel(topic, wim_id)
457 task = asyncio.ensure_future(self.wim.delete(params, order_id))
458 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
459 return
460 elif command == "show":
461 print("not implemented show with wim_account")
462 sys.stdout.flush()
463 return
464 elif command in ("edit", "edited"):
465 task = asyncio.ensure_future(self.wim.edit(params, order_id))
466 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
467 return
468 elif command == "deleted":
469 return # TODO cleaning of task just in case should be done
470 elif topic == "sdn":
471 _sdn_id = params["_id"]
472 if command in ("create", "created"):
473 task = asyncio.ensure_future(self.sdn.create(params, order_id))
474 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
475 return
476 elif command == "delete" or command == "deleted":
477 self.lcm_tasks.cancel(topic, _sdn_id)
478 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
479 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
480 return
481 elif command in ("edit", "edited"):
482 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
483 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
484 return
485 elif command == "deleted":
486 return # TODO cleaning of task just in case should be done
487 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
488
489 async def kafka_read(self):
490 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
491 # future = asyncio.Future()
492 self.consecutive_errors = 0
493 self.first_start = True
494 while self.consecutive_errors < 10:
495 try:
496 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
497 topics_admin = ("admin", )
498 await asyncio.gather(
499 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
500 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
501 )
502
503 except LcmExceptionExit:
504 self.logger.debug("Bye!")
505 break
506 except Exception as e:
507 # if not first_start is the first time after starting. So leave more time and wait
508 # to allow kafka starts
509 if self.consecutive_errors == 8 if not self.first_start else 30:
510 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
511 raise
512 self.consecutive_errors += 1
513 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
514 wait_time = 2 if not self.first_start else 5
515 await asyncio.sleep(wait_time, loop=self.loop)
516
517 # self.logger.debug("Task kafka_read terminating")
518 self.logger.debug("Task kafka_read exit")
519
520 def start(self):
521
522 # check RO version
523 self.loop.run_until_complete(self.check_RO_version())
524
525 # configure tsdb prometheus
526 if self.prometheus:
527 self.loop.run_until_complete(self.prometheus.start())
528
529 self.loop.run_until_complete(asyncio.gather(
530 self.kafka_read(),
531 self.kafka_ping()
532 ))
533 # TODO
534 # self.logger.debug("Terminating cancelling creation tasks")
535 # self.lcm_tasks.cancel("ALL", "create")
536 # timeout = 200
537 # while self.is_pending_tasks():
538 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
539 # await asyncio.sleep(2, loop=self.loop)
540 # timeout -= 2
541 # if not timeout:
542 # self.lcm_tasks.cancel("ALL", "ALL")
543 self.loop.close()
544 self.loop = None
545 if self.db:
546 self.db.db_disconnect()
547 if self.msg:
548 self.msg.disconnect()
549 if self.msg_admin:
550 self.msg_admin.disconnect()
551 if self.fs:
552 self.fs.fs_disconnect()
553
554 def read_config_file(self, config_file):
555 # TODO make a [ini] + yaml inside parser
556 # the configparser library is not suitable, because it does not admit comments at the end of line,
557 # and not parse integer or boolean
558 try:
559 # read file as yaml format
560 with open(config_file) as f:
561 conf = yaml.load(f, Loader=yaml.Loader)
562 # Ensure all sections are not empty
563 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
564 if not conf.get(k):
565 conf[k] = {}
566
567 # read all environ that starts with OSMLCM_
568 for k, v in environ.items():
569 if not k.startswith("OSMLCM_"):
570 continue
571 subject, _, item = k[7:].lower().partition("_")
572 if not item:
573 continue
574 if subject in ("ro", "vca"):
575 # put in capital letter
576 subject = subject.upper()
577 try:
578 if item == "port" or subject == "timeout":
579 conf[subject][item] = int(v)
580 else:
581 conf[subject][item] = v
582 except Exception as e:
583 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
584
585 # backward compatibility of VCA parameters
586
587 if 'pubkey' in conf["VCA"]:
588 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
589 if 'cacert' in conf["VCA"]:
590 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
591 if 'apiproxy' in conf["VCA"]:
592 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
593
594 if 'enableosupgrade' in conf["VCA"]:
595 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
596 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
597 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
598 conf["VCA"]['enable_os_upgrade'] = False
599 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
600 conf["VCA"]['enable_os_upgrade'] = True
601
602 if 'aptmirror' in conf["VCA"]:
603 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
604
605 return conf
606 except Exception as e:
607 self.logger.critical("At config file '{}': {}".format(config_file, e))
608 exit(1)
609
610 @staticmethod
611 def get_process_id():
612 """
613 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
614 will provide a random one
615 :return: Obtained ID
616 """
617 # Try getting docker id. If fails, get pid
618 try:
619 with open("/proc/self/cgroup", "r") as f:
620 text_id_ = f.readline()
621 _, _, text_id = text_id_.rpartition("/")
622 text_id = text_id.replace('\n', '')[:12]
623 if text_id:
624 return text_id
625 except Exception:
626 pass
627 # Return a random id
628 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
629
630
631 def usage():
632 print("""Usage: {} [options]
633 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
634 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
635 -h|--help: shows this help
636 """.format(sys.argv[0]))
637 # --log-socket-host HOST: send logs to this host")
638 # --log-socket-port PORT: send logs using this port (default: 9022)")
639
640
641 if __name__ == '__main__':
642
643 try:
644 # print("SYS.PATH='{}'".format(sys.path))
645 # load parameters and configuration
646 # -h
647 # -c value
648 # --config value
649 # --help
650 # --health-check
651 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
652 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
653 config_file = None
654 for o, a in opts:
655 if o in ("-h", "--help"):
656 usage()
657 sys.exit()
658 elif o in ("-c", "--config"):
659 config_file = a
660 elif o == "--health-check":
661 from osm_lcm.lcm_hc import health_check
662 health_check(health_check_file, Lcm.ping_interval_pace)
663 # elif o == "--log-socket-port":
664 # log_socket_port = a
665 # elif o == "--log-socket-host":
666 # log_socket_host = a
667 # elif o == "--log-file":
668 # log_file = a
669 else:
670 assert False, "Unhandled option"
671
672 if config_file:
673 if not path.isfile(config_file):
674 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
675 exit(1)
676 else:
677 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
678 if path.isfile(config_file):
679 break
680 else:
681 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
682 exit(1)
683 lcm = Lcm(config_file)
684 lcm.start()
685 except (LcmException, getopt.GetoptError) as e:
686 print(str(e), file=sys.stderr)
687 # usage()
688 exit(1)