Fix bug 1604 - Adding fs.sync() at startup
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
62
63
64 class Lcm:
65
66 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
67 ping_interval_boot = 5 # how many time ping is sent when booting
68 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
69 # ^ contains for each section at lcm.cfg the used logger name
70
71 def __init__(self, config_file, loop=None):
72 """
73 Init, Connect to database, filesystem storage, and messaging
74 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 :return: None
76 """
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.config["ro_config"] = {
93 "ng": config["RO"].get("ng", False),
94 "uri": config["RO"].get("uri"),
95 "tenant": config.get("tenant", "osm"),
96 "logger_name": "lcm.roclient",
97 "loglevel": config["RO"].get("loglevel", "ERROR"),
98 }
99 if not self.config["ro_config"]["uri"]:
100 self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
101 elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
102 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
103 index = self.config["ro_config"]["uri"][-1].rfind("/")
104 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
105
106 self.loop = loop or asyncio.get_event_loop()
107 self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
108
109 # logging
110 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
111 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
112 config["database"]["logger_name"] = "lcm.db"
113 config["storage"]["logger_name"] = "lcm.fs"
114 config["message"]["logger_name"] = "lcm.msg"
115 if config["global"].get("logfile"):
116 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
117 maxBytes=100e6, backupCount=9, delay=0)
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not config["global"].get("nologging"):
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if config["global"].get("loglevel"):
126 self.logger.setLevel(config["global"]["loglevel"])
127
128 # logging other modules
129 for k1, logname in self.cfg_logger_name.items():
130 config[k1]["logger_name"] = logname
131 logger_module = logging.getLogger(logname)
132 if config[k1].get("logfile"):
133 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
134 maxBytes=100e6, backupCount=9, delay=0)
135 file_handler.setFormatter(log_formatter_simple)
136 logger_module.addHandler(file_handler)
137 if config[k1].get("loglevel"):
138 logger_module.setLevel(config[k1]["loglevel"])
139 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
140
141 # check version of N2VC
142 # TODO enhance with int conversion or from distutils.version import LooseVersion
143 # or with list(map(int, version.split(".")))
144 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
145 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
146 n2vc_version, min_n2vc_version))
147 # check version of common
148 if versiontuple(common_version) < versiontuple(min_common_version):
149 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
150 common_version, min_common_version))
151
152 try:
153 self.db = Database(config).instance.db
154
155 self.fs = Filesystem(config).instance.fs
156 self.fs.sync()
157
158 # copy message configuration in order to remove 'group_id' for msg_admin
159 config_message = config["message"].copy()
160 config_message["loop"] = self.loop
161 if config_message["driver"] == "local":
162 self.msg = msglocal.MsgLocal()
163 self.msg.connect(config_message)
164 self.msg_admin = msglocal.MsgLocal()
165 config_message.pop("group_id", None)
166 self.msg_admin.connect(config_message)
167 elif config_message["driver"] == "kafka":
168 self.msg = msgkafka.MsgKafka()
169 self.msg.connect(config_message)
170 self.msg_admin = msgkafka.MsgKafka()
171 config_message.pop("group_id", None)
172 self.msg_admin.connect(config_message)
173 else:
174 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
175 config["message"]["driver"]))
176 except (DbException, FsException, MsgException) as e:
177 self.logger.critical(str(e), exc_info=True)
178 raise LcmException(str(e))
179
180 # contains created tasks/futures to be able to cancel
181 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
182
183 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
184 if self.config["tsdb"]["driver"] == "prometheus":
185 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
186 else:
187 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
188 config["tsdb"]["driver"]))
189 else:
190 self.prometheus = None
191
192 async def check_RO_version(self):
193 tries = 14
194 last_error = None
195 while True:
196 ro_uri = self.config["ro_config"]["uri"]
197 try:
198 # try new RO, if fail old RO
199 try:
200 self.config["ro_config"]["uri"] = ro_uri + "ro"
201 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
202 ro_version = await ro_server.get_version()
203 self.config["ro_config"]["ng"] = True
204 except Exception:
205 self.config["ro_config"]["uri"] = ro_uri + "openmano"
206 ro_server = ROClient(self.loop, **self.config["ro_config"])
207 ro_version = await ro_server.get_version()
208 self.config["ro_config"]["ng"] = False
209 if versiontuple(ro_version) < versiontuple(min_RO_version):
210 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
211 ro_version, min_RO_version))
212 self.logger.info("Connected to RO version {} new-generation version {}".
213 format(ro_version, self.config["ro_config"]["ng"]))
214 return
215 except (ROClientException, NgRoException) as e:
216 self.config["ro_config"]["uri"] = ro_uri
217 tries -= 1
218 traceback.print_tb(e.__traceback__)
219 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
220 if tries <= 0:
221 self.logger.critical(error_text)
222 raise LcmException(error_text)
223 if last_error != error_text:
224 last_error = error_text
225 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
226 await asyncio.sleep(5)
227
228 async def test(self, param=None):
229 self.logger.debug("Starting/Ending test task: {}".format(param))
230
231 async def kafka_ping(self):
232 self.logger.debug("Task kafka_ping Enter")
233 consecutive_errors = 0
234 first_start = True
235 kafka_has_received = False
236 self.pings_not_received = 1
237 while True:
238 try:
239 await self.msg_admin.aiowrite(
240 "admin", "ping",
241 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
242 self.loop)
243 # time between pings are low when it is not received and at starting
244 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
245 if not self.pings_not_received:
246 kafka_has_received = True
247 self.pings_not_received += 1
248 await asyncio.sleep(wait_time, loop=self.loop)
249 if self.pings_not_received > 10:
250 raise LcmException("It is not receiving pings from Kafka bus")
251 consecutive_errors = 0
252 first_start = False
253 except LcmException:
254 raise
255 except Exception as e:
256 # if not first_start is the first time after starting. So leave more time and wait
257 # to allow kafka starts
258 if consecutive_errors == 8 if not first_start else 30:
259 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
260 raise
261 consecutive_errors += 1
262 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
263 wait_time = 2 if not first_start else 5
264 await asyncio.sleep(wait_time, loop=self.loop)
265
266 def kafka_read_callback(self, topic, command, params):
267 order_id = 1
268
269 if topic != "admin" and command != "ping":
270 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
271 self.consecutive_errors = 0
272 self.first_start = False
273 order_id += 1
274 if command == "exit":
275 raise LcmExceptionExit
276 elif command.startswith("#"):
277 return
278 elif command == "echo":
279 # just for test
280 print(params)
281 sys.stdout.flush()
282 return
283 elif command == "test":
284 asyncio.Task(self.test(params), loop=self.loop)
285 return
286
287 if topic == "admin":
288 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
289 if params.get("worker_id") != self.worker_id:
290 return
291 self.pings_not_received = 0
292 try:
293 with open(health_check_file, "w") as f:
294 f.write(str(time()))
295 except Exception as e:
296 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
297 return
298 elif topic == "pla":
299 if command == "placement":
300 self.ns.update_nsrs_with_pla_result(params)
301 return
302 elif topic == "k8scluster":
303 if command == "create" or command == "created":
304 k8scluster_id = params.get("_id")
305 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
306 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
307 return
308 elif command == "delete" or command == "deleted":
309 k8scluster_id = params.get("_id")
310 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
311 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
312 return
313 elif topic == "k8srepo":
314 if command == "create" or command == "created":
315 k8srepo_id = params.get("_id")
316 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
317 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
318 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
319 return
320 elif command == "delete" or command == "deleted":
321 k8srepo_id = params.get("_id")
322 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
323 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
324 return
325 elif topic == "ns":
326 if command == "instantiate":
327 # self.logger.debug("Deploying NS {}".format(nsr_id))
328 nslcmop = params
329 nslcmop_id = nslcmop["_id"]
330 nsr_id = nslcmop["nsInstanceId"]
331 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
332 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
333 return
334 elif command == "terminate":
335 # self.logger.debug("Deleting NS {}".format(nsr_id))
336 nslcmop = params
337 nslcmop_id = nslcmop["_id"]
338 nsr_id = nslcmop["nsInstanceId"]
339 self.lcm_tasks.cancel(topic, nsr_id)
340 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
341 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
342 return
343 elif command == "action":
344 # self.logger.debug("Update NS {}".format(nsr_id))
345 nslcmop = params
346 nslcmop_id = nslcmop["_id"]
347 nsr_id = nslcmop["nsInstanceId"]
348 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
349 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
350 return
351 elif command == "scale":
352 # self.logger.debug("Update NS {}".format(nsr_id))
353 nslcmop = params
354 nslcmop_id = nslcmop["_id"]
355 nsr_id = nslcmop["nsInstanceId"]
356 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
357 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
358 return
359 elif command == "show":
360 nsr_id = params
361 try:
362 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
363 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
364 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
365 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
366 db_nsr["detailed-status"],
367 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
368 except Exception as e:
369 print("nsr {} not found: {}".format(nsr_id, e))
370 sys.stdout.flush()
371 return
372 elif command == "deleted":
373 return # TODO cleaning of task just in case should be done
374 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
375 return
376 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
377 if command == "instantiate":
378 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
379 nsilcmop = params
380 nsilcmop_id = nsilcmop["_id"] # slice operation id
381 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
382 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
383 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
384 return
385 elif command == "terminate":
386 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
387 nsilcmop = params
388 nsilcmop_id = nsilcmop["_id"] # slice operation id
389 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
390 self.lcm_tasks.cancel(topic, nsir_id)
391 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
392 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
393 return
394 elif command == "show":
395 nsir_id = params
396 try:
397 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
398 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
399 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
400 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
401 db_nsir["detailed-status"],
402 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
403 except Exception as e:
404 print("nsir {} not found: {}".format(nsir_id, e))
405 sys.stdout.flush()
406 return
407 elif command == "deleted":
408 return # TODO cleaning of task just in case should be done
409 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
410 return
411 elif topic == "vim_account":
412 vim_id = params["_id"]
413 if command in ("create", "created"):
414 if not self.config["ro_config"].get("ng"):
415 task = asyncio.ensure_future(self.vim.create(params, order_id))
416 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
417 return
418 elif command == "delete" or command == "deleted":
419 self.lcm_tasks.cancel(topic, vim_id)
420 task = asyncio.ensure_future(self.vim.delete(params, order_id))
421 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
422 return
423 elif command == "show":
424 print("not implemented show with vim_account")
425 sys.stdout.flush()
426 return
427 elif command in ("edit", "edited"):
428 if not self.config["ro_config"].get("ng"):
429 task = asyncio.ensure_future(self.vim.edit(params, order_id))
430 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
431 return
432 elif command == "deleted":
433 return # TODO cleaning of task just in case should be done
434 elif topic == "wim_account":
435 wim_id = params["_id"]
436 if command in ("create", "created"):
437 if not self.config["ro_config"].get("ng"):
438 task = asyncio.ensure_future(self.wim.create(params, order_id))
439 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
440 return
441 elif command == "delete" or command == "deleted":
442 self.lcm_tasks.cancel(topic, wim_id)
443 task = asyncio.ensure_future(self.wim.delete(params, order_id))
444 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
445 return
446 elif command == "show":
447 print("not implemented show with wim_account")
448 sys.stdout.flush()
449 return
450 elif command in ("edit", "edited"):
451 task = asyncio.ensure_future(self.wim.edit(params, order_id))
452 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
453 return
454 elif command == "deleted":
455 return # TODO cleaning of task just in case should be done
456 elif topic == "sdn":
457 _sdn_id = params["_id"]
458 if command in ("create", "created"):
459 if not self.config["ro_config"].get("ng"):
460 task = asyncio.ensure_future(self.sdn.create(params, order_id))
461 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
462 return
463 elif command == "delete" or command == "deleted":
464 self.lcm_tasks.cancel(topic, _sdn_id)
465 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
466 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
467 return
468 elif command in ("edit", "edited"):
469 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
470 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
471 return
472 elif command == "deleted":
473 return # TODO cleaning of task just in case should be done
474 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
475
476 async def kafka_read(self):
477 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
478 # future = asyncio.Future()
479 self.consecutive_errors = 0
480 self.first_start = True
481 while self.consecutive_errors < 10:
482 try:
483 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
484 topics_admin = ("admin", )
485 await asyncio.gather(
486 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
487 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
488 )
489
490 except LcmExceptionExit:
491 self.logger.debug("Bye!")
492 break
493 except Exception as e:
494 # if not first_start is the first time after starting. So leave more time and wait
495 # to allow kafka starts
496 if self.consecutive_errors == 8 if not self.first_start else 30:
497 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
498 raise
499 self.consecutive_errors += 1
500 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
501 wait_time = 2 if not self.first_start else 5
502 await asyncio.sleep(wait_time, loop=self.loop)
503
504 # self.logger.debug("Task kafka_read terminating")
505 self.logger.debug("Task kafka_read exit")
506
507 def start(self):
508
509 # check RO version
510 self.loop.run_until_complete(self.check_RO_version())
511
512 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
513 self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
514 self.ns)
515 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
516 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
517 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
518 self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
519 self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
520
521 # configure tsdb prometheus
522 if self.prometheus:
523 self.loop.run_until_complete(self.prometheus.start())
524
525 self.loop.run_until_complete(asyncio.gather(
526 self.kafka_read(),
527 self.kafka_ping()
528 ))
529 # TODO
530 # self.logger.debug("Terminating cancelling creation tasks")
531 # self.lcm_tasks.cancel("ALL", "create")
532 # timeout = 200
533 # while self.is_pending_tasks():
534 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
535 # await asyncio.sleep(2, loop=self.loop)
536 # timeout -= 2
537 # if not timeout:
538 # self.lcm_tasks.cancel("ALL", "ALL")
539 self.loop.close()
540 self.loop = None
541 if self.db:
542 self.db.db_disconnect()
543 if self.msg:
544 self.msg.disconnect()
545 if self.msg_admin:
546 self.msg_admin.disconnect()
547 if self.fs:
548 self.fs.fs_disconnect()
549
550 def read_config_file(self, config_file):
551 # TODO make a [ini] + yaml inside parser
552 # the configparser library is not suitable, because it does not admit comments at the end of line,
553 # and not parse integer or boolean
554 try:
555 # read file as yaml format
556 with open(config_file) as f:
557 conf = yaml.load(f, Loader=yaml.Loader)
558 # Ensure all sections are not empty
559 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
560 if not conf.get(k):
561 conf[k] = {}
562
563 # read all environ that starts with OSMLCM_
564 for k, v in environ.items():
565 if not k.startswith("OSMLCM_"):
566 continue
567 subject, _, item = k[7:].lower().partition("_")
568 if not item:
569 continue
570 if subject in ("ro", "vca"):
571 # put in capital letter
572 subject = subject.upper()
573 try:
574 if item == "port" or subject == "timeout":
575 conf[subject][item] = int(v)
576 else:
577 conf[subject][item] = v
578 except Exception as e:
579 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
580
581 # backward compatibility of VCA parameters
582
583 if 'pubkey' in conf["VCA"]:
584 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
585 if 'cacert' in conf["VCA"]:
586 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
587 if 'apiproxy' in conf["VCA"]:
588 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
589
590 if 'enableosupgrade' in conf["VCA"]:
591 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
592 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
593 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
594 conf["VCA"]['enable_os_upgrade'] = False
595 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
596 conf["VCA"]['enable_os_upgrade'] = True
597
598 if 'aptmirror' in conf["VCA"]:
599 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
600
601 return conf
602 except Exception as e:
603 self.logger.critical("At config file '{}': {}".format(config_file, e))
604 exit(1)
605
606 @staticmethod
607 def get_process_id():
608 """
609 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
610 will provide a random one
611 :return: Obtained ID
612 """
613 # Try getting docker id. If fails, get pid
614 try:
615 with open("/proc/self/cgroup", "r") as f:
616 text_id_ = f.readline()
617 _, _, text_id = text_id_.rpartition("/")
618 text_id = text_id.replace('\n', '')[:12]
619 if text_id:
620 return text_id
621 except Exception:
622 pass
623 # Return a random id
624 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
625
626
627 def usage():
628 print("""Usage: {} [options]
629 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
630 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
631 -h|--help: shows this help
632 """.format(sys.argv[0]))
633 # --log-socket-host HOST: send logs to this host")
634 # --log-socket-port PORT: send logs using this port (default: 9022)")
635
636
637 if __name__ == '__main__':
638
639 try:
640 # print("SYS.PATH='{}'".format(sys.path))
641 # load parameters and configuration
642 # -h
643 # -c value
644 # --config value
645 # --help
646 # --health-check
647 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
648 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
649 config_file = None
650 for o, a in opts:
651 if o in ("-h", "--help"):
652 usage()
653 sys.exit()
654 elif o in ("-c", "--config"):
655 config_file = a
656 elif o == "--health-check":
657 from osm_lcm.lcm_hc import health_check
658 health_check(health_check_file, Lcm.ping_interval_pace)
659 # elif o == "--log-socket-port":
660 # log_socket_port = a
661 # elif o == "--log-socket-host":
662 # log_socket_host = a
663 # elif o == "--log-file":
664 # log_file = a
665 else:
666 assert False, "Unhandled option"
667
668 if config_file:
669 if not path.isfile(config_file):
670 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
671 exit(1)
672 else:
673 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
674 if path.isfile(config_file):
675 break
676 else:
677 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
678 exit(1)
679 lcm = Lcm(config_file)
680 lcm.start()
681 except (LcmException, getopt.GetoptError) as e:
682 print(str(e), file=sys.stderr)
683 # usage()
684 exit(1)