feat(sol006): LCM migration to SOL006
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
62
63
64 class Lcm:
65
66 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
67 ping_interval_boot = 5 # how many time ping is sent when booting
68 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
69 # ^ contains for each section at lcm.cfg the used logger name
70
71 def __init__(self, config_file, loop=None):
72 """
73 Init, Connect to database, filesystem storage, and messaging
74 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 :return: None
76 """
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.config["ro_config"] = {
93 "ng": config["RO"].get("ng", False),
94 "uri": config["RO"].get("uri"),
95 "tenant": config.get("tenant", "osm"),
96 "logger_name": "lcm.roclient",
97 "loglevel": config["RO"].get("loglevel", "ERROR"),
98 }
99 if not self.config["ro_config"]["uri"]:
100 self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
101 elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
102 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
103 index = self.config["ro_config"]["uri"][-1].rfind("/")
104 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
105
106 self.loop = loop or asyncio.get_event_loop()
107 self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
108
109 # logging
110 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
111 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
112 config["database"]["logger_name"] = "lcm.db"
113 config["storage"]["logger_name"] = "lcm.fs"
114 config["message"]["logger_name"] = "lcm.msg"
115 if config["global"].get("logfile"):
116 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
117 maxBytes=100e6, backupCount=9, delay=0)
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not config["global"].get("nologging"):
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if config["global"].get("loglevel"):
126 self.logger.setLevel(config["global"]["loglevel"])
127
128 # logging other modules
129 for k1, logname in self.cfg_logger_name.items():
130 config[k1]["logger_name"] = logname
131 logger_module = logging.getLogger(logname)
132 if config[k1].get("logfile"):
133 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
134 maxBytes=100e6, backupCount=9, delay=0)
135 file_handler.setFormatter(log_formatter_simple)
136 logger_module.addHandler(file_handler)
137 if config[k1].get("loglevel"):
138 logger_module.setLevel(config[k1]["loglevel"])
139 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
140
141 # check version of N2VC
142 # TODO enhance with int conversion or from distutils.version import LooseVersion
143 # or with list(map(int, version.split(".")))
144 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
145 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
146 n2vc_version, min_n2vc_version))
147 # check version of common
148 if versiontuple(common_version) < versiontuple(min_common_version):
149 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
150 common_version, min_common_version))
151
152 try:
153 self.db = Database(config).instance.db
154
155 self.fs = Filesystem(config).instance.fs
156
157 # copy message configuration in order to remove 'group_id' for msg_admin
158 config_message = config["message"].copy()
159 config_message["loop"] = self.loop
160 if config_message["driver"] == "local":
161 self.msg = msglocal.MsgLocal()
162 self.msg.connect(config_message)
163 self.msg_admin = msglocal.MsgLocal()
164 config_message.pop("group_id", None)
165 self.msg_admin.connect(config_message)
166 elif config_message["driver"] == "kafka":
167 self.msg = msgkafka.MsgKafka()
168 self.msg.connect(config_message)
169 self.msg_admin = msgkafka.MsgKafka()
170 config_message.pop("group_id", None)
171 self.msg_admin.connect(config_message)
172 else:
173 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
174 config["message"]["driver"]))
175 except (DbException, FsException, MsgException) as e:
176 self.logger.critical(str(e), exc_info=True)
177 raise LcmException(str(e))
178
179 # contains created tasks/futures to be able to cancel
180 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
181
182 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
183 if self.config["tsdb"]["driver"] == "prometheus":
184 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
187 config["tsdb"]["driver"]))
188 else:
189 self.prometheus = None
190
191 async def check_RO_version(self):
192 tries = 14
193 last_error = None
194 while True:
195 ro_uri = self.config["ro_config"]["uri"]
196 try:
197 # try new RO, if fail old RO
198 try:
199 self.config["ro_config"]["uri"] = ro_uri + "ro"
200 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
201 ro_version = await ro_server.get_version()
202 self.config["ro_config"]["ng"] = True
203 except Exception:
204 self.config["ro_config"]["uri"] = ro_uri + "openmano"
205 ro_server = ROClient(self.loop, **self.config["ro_config"])
206 ro_version = await ro_server.get_version()
207 self.config["ro_config"]["ng"] = False
208 if versiontuple(ro_version) < versiontuple(min_RO_version):
209 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
210 ro_version, min_RO_version))
211 self.logger.info("Connected to RO version {} new-generation version {}".
212 format(ro_version, self.config["ro_config"]["ng"]))
213 return
214 except (ROClientException, NgRoException) as e:
215 self.config["ro_config"]["uri"] = ro_uri
216 tries -= 1
217 traceback.print_tb(e.__traceback__)
218 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
219 if tries <= 0:
220 self.logger.critical(error_text)
221 raise LcmException(error_text)
222 if last_error != error_text:
223 last_error = error_text
224 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
225 await asyncio.sleep(5)
226
227 async def test(self, param=None):
228 self.logger.debug("Starting/Ending test task: {}".format(param))
229
230 async def kafka_ping(self):
231 self.logger.debug("Task kafka_ping Enter")
232 consecutive_errors = 0
233 first_start = True
234 kafka_has_received = False
235 self.pings_not_received = 1
236 while True:
237 try:
238 await self.msg_admin.aiowrite(
239 "admin", "ping",
240 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
241 self.loop)
242 # time between pings are low when it is not received and at starting
243 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
244 if not self.pings_not_received:
245 kafka_has_received = True
246 self.pings_not_received += 1
247 await asyncio.sleep(wait_time, loop=self.loop)
248 if self.pings_not_received > 10:
249 raise LcmException("It is not receiving pings from Kafka bus")
250 consecutive_errors = 0
251 first_start = False
252 except LcmException:
253 raise
254 except Exception as e:
255 # if not first_start is the first time after starting. So leave more time and wait
256 # to allow kafka starts
257 if consecutive_errors == 8 if not first_start else 30:
258 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
259 raise
260 consecutive_errors += 1
261 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
262 wait_time = 2 if not first_start else 5
263 await asyncio.sleep(wait_time, loop=self.loop)
264
265 def kafka_read_callback(self, topic, command, params):
266 order_id = 1
267
268 if topic != "admin" and command != "ping":
269 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
270 self.consecutive_errors = 0
271 self.first_start = False
272 order_id += 1
273 if command == "exit":
274 raise LcmExceptionExit
275 elif command.startswith("#"):
276 return
277 elif command == "echo":
278 # just for test
279 print(params)
280 sys.stdout.flush()
281 return
282 elif command == "test":
283 asyncio.Task(self.test(params), loop=self.loop)
284 return
285
286 if topic == "admin":
287 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
288 if params.get("worker_id") != self.worker_id:
289 return
290 self.pings_not_received = 0
291 try:
292 with open(health_check_file, "w") as f:
293 f.write(str(time()))
294 except Exception as e:
295 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
296 return
297 elif topic == "pla":
298 if command == "placement":
299 self.ns.update_nsrs_with_pla_result(params)
300 return
301 elif topic == "k8scluster":
302 if command == "create" or command == "created":
303 k8scluster_id = params.get("_id")
304 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
305 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
306 return
307 elif command == "delete" or command == "deleted":
308 k8scluster_id = params.get("_id")
309 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
310 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
311 return
312 elif topic == "k8srepo":
313 if command == "create" or command == "created":
314 k8srepo_id = params.get("_id")
315 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
316 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
317 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
318 return
319 elif command == "delete" or command == "deleted":
320 k8srepo_id = params.get("_id")
321 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
322 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
323 return
324 elif topic == "ns":
325 if command == "instantiate":
326 # self.logger.debug("Deploying NS {}".format(nsr_id))
327 nslcmop = params
328 nslcmop_id = nslcmop["_id"]
329 nsr_id = nslcmop["nsInstanceId"]
330 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
331 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
332 return
333 elif command == "terminate":
334 # self.logger.debug("Deleting NS {}".format(nsr_id))
335 nslcmop = params
336 nslcmop_id = nslcmop["_id"]
337 nsr_id = nslcmop["nsInstanceId"]
338 self.lcm_tasks.cancel(topic, nsr_id)
339 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
340 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
341 return
342 elif command == "action":
343 # self.logger.debug("Update NS {}".format(nsr_id))
344 nslcmop = params
345 nslcmop_id = nslcmop["_id"]
346 nsr_id = nslcmop["nsInstanceId"]
347 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
348 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
349 return
350 elif command == "scale":
351 # self.logger.debug("Update NS {}".format(nsr_id))
352 nslcmop = params
353 nslcmop_id = nslcmop["_id"]
354 nsr_id = nslcmop["nsInstanceId"]
355 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
356 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
357 return
358 elif command == "show":
359 nsr_id = params
360 try:
361 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
362 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
363 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
364 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
365 db_nsr["detailed-status"],
366 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
367 except Exception as e:
368 print("nsr {} not found: {}".format(nsr_id, e))
369 sys.stdout.flush()
370 return
371 elif command == "deleted":
372 return # TODO cleaning of task just in case should be done
373 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
374 return
375 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
376 if command == "instantiate":
377 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
378 nsilcmop = params
379 nsilcmop_id = nsilcmop["_id"] # slice operation id
380 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
381 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
382 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
383 return
384 elif command == "terminate":
385 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
386 nsilcmop = params
387 nsilcmop_id = nsilcmop["_id"] # slice operation id
388 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
389 self.lcm_tasks.cancel(topic, nsir_id)
390 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
391 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
392 return
393 elif command == "show":
394 nsir_id = params
395 try:
396 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
397 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
398 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
399 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
400 db_nsir["detailed-status"],
401 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
402 except Exception as e:
403 print("nsir {} not found: {}".format(nsir_id, e))
404 sys.stdout.flush()
405 return
406 elif command == "deleted":
407 return # TODO cleaning of task just in case should be done
408 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
409 return
410 elif topic == "vim_account":
411 vim_id = params["_id"]
412 if command in ("create", "created"):
413 if not self.config["ro_config"].get("ng"):
414 task = asyncio.ensure_future(self.vim.create(params, order_id))
415 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
416 return
417 elif command == "delete" or command == "deleted":
418 self.lcm_tasks.cancel(topic, vim_id)
419 task = asyncio.ensure_future(self.vim.delete(params, order_id))
420 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
421 return
422 elif command == "show":
423 print("not implemented show with vim_account")
424 sys.stdout.flush()
425 return
426 elif command in ("edit", "edited"):
427 if not self.config["ro_config"].get("ng"):
428 task = asyncio.ensure_future(self.vim.edit(params, order_id))
429 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
430 return
431 elif command == "deleted":
432 return # TODO cleaning of task just in case should be done
433 elif topic == "wim_account":
434 wim_id = params["_id"]
435 if command in ("create", "created"):
436 if not self.config["ro_config"].get("ng"):
437 task = asyncio.ensure_future(self.wim.create(params, order_id))
438 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
439 return
440 elif command == "delete" or command == "deleted":
441 self.lcm_tasks.cancel(topic, wim_id)
442 task = asyncio.ensure_future(self.wim.delete(params, order_id))
443 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
444 return
445 elif command == "show":
446 print("not implemented show with wim_account")
447 sys.stdout.flush()
448 return
449 elif command in ("edit", "edited"):
450 task = asyncio.ensure_future(self.wim.edit(params, order_id))
451 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
452 return
453 elif command == "deleted":
454 return # TODO cleaning of task just in case should be done
455 elif topic == "sdn":
456 _sdn_id = params["_id"]
457 if command in ("create", "created"):
458 if not self.config["ro_config"].get("ng"):
459 task = asyncio.ensure_future(self.sdn.create(params, order_id))
460 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
461 return
462 elif command == "delete" or command == "deleted":
463 self.lcm_tasks.cancel(topic, _sdn_id)
464 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
465 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
466 return
467 elif command in ("edit", "edited"):
468 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
469 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
470 return
471 elif command == "deleted":
472 return # TODO cleaning of task just in case should be done
473 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
474
475 async def kafka_read(self):
476 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
477 # future = asyncio.Future()
478 self.consecutive_errors = 0
479 self.first_start = True
480 while self.consecutive_errors < 10:
481 try:
482 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
483 topics_admin = ("admin", )
484 await asyncio.gather(
485 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
486 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
487 )
488
489 except LcmExceptionExit:
490 self.logger.debug("Bye!")
491 break
492 except Exception as e:
493 # if not first_start is the first time after starting. So leave more time and wait
494 # to allow kafka starts
495 if self.consecutive_errors == 8 if not self.first_start else 30:
496 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
497 raise
498 self.consecutive_errors += 1
499 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
500 wait_time = 2 if not self.first_start else 5
501 await asyncio.sleep(wait_time, loop=self.loop)
502
503 # self.logger.debug("Task kafka_read terminating")
504 self.logger.debug("Task kafka_read exit")
505
506 def start(self):
507
508 # check RO version
509 self.loop.run_until_complete(self.check_RO_version())
510
511 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
512 self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
513 self.ns)
514 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
515 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
516 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
517 self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
518 self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
519
520 # configure tsdb prometheus
521 if self.prometheus:
522 self.loop.run_until_complete(self.prometheus.start())
523
524 self.loop.run_until_complete(asyncio.gather(
525 self.kafka_read(),
526 self.kafka_ping()
527 ))
528 # TODO
529 # self.logger.debug("Terminating cancelling creation tasks")
530 # self.lcm_tasks.cancel("ALL", "create")
531 # timeout = 200
532 # while self.is_pending_tasks():
533 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
534 # await asyncio.sleep(2, loop=self.loop)
535 # timeout -= 2
536 # if not timeout:
537 # self.lcm_tasks.cancel("ALL", "ALL")
538 self.loop.close()
539 self.loop = None
540 if self.db:
541 self.db.db_disconnect()
542 if self.msg:
543 self.msg.disconnect()
544 if self.msg_admin:
545 self.msg_admin.disconnect()
546 if self.fs:
547 self.fs.fs_disconnect()
548
549 def read_config_file(self, config_file):
550 # TODO make a [ini] + yaml inside parser
551 # the configparser library is not suitable, because it does not admit comments at the end of line,
552 # and not parse integer or boolean
553 try:
554 # read file as yaml format
555 with open(config_file) as f:
556 conf = yaml.load(f, Loader=yaml.Loader)
557 # Ensure all sections are not empty
558 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
559 if not conf.get(k):
560 conf[k] = {}
561
562 # read all environ that starts with OSMLCM_
563 for k, v in environ.items():
564 if not k.startswith("OSMLCM_"):
565 continue
566 subject, _, item = k[7:].lower().partition("_")
567 if not item:
568 continue
569 if subject in ("ro", "vca"):
570 # put in capital letter
571 subject = subject.upper()
572 try:
573 if item == "port" or subject == "timeout":
574 conf[subject][item] = int(v)
575 else:
576 conf[subject][item] = v
577 except Exception as e:
578 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
579
580 # backward compatibility of VCA parameters
581
582 if 'pubkey' in conf["VCA"]:
583 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
584 if 'cacert' in conf["VCA"]:
585 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
586 if 'apiproxy' in conf["VCA"]:
587 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
588
589 if 'enableosupgrade' in conf["VCA"]:
590 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
591 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
592 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
593 conf["VCA"]['enable_os_upgrade'] = False
594 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
595 conf["VCA"]['enable_os_upgrade'] = True
596
597 if 'aptmirror' in conf["VCA"]:
598 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
599
600 return conf
601 except Exception as e:
602 self.logger.critical("At config file '{}': {}".format(config_file, e))
603 exit(1)
604
605 @staticmethod
606 def get_process_id():
607 """
608 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
609 will provide a random one
610 :return: Obtained ID
611 """
612 # Try getting docker id. If fails, get pid
613 try:
614 with open("/proc/self/cgroup", "r") as f:
615 text_id_ = f.readline()
616 _, _, text_id = text_id_.rpartition("/")
617 text_id = text_id.replace('\n', '')[:12]
618 if text_id:
619 return text_id
620 except Exception:
621 pass
622 # Return a random id
623 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
624
625
626 def usage():
627 print("""Usage: {} [options]
628 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
629 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
630 -h|--help: shows this help
631 """.format(sys.argv[0]))
632 # --log-socket-host HOST: send logs to this host")
633 # --log-socket-port PORT: send logs using this port (default: 9022)")
634
635
636 if __name__ == '__main__':
637
638 try:
639 # print("SYS.PATH='{}'".format(sys.path))
640 # load parameters and configuration
641 # -h
642 # -c value
643 # --config value
644 # --help
645 # --health-check
646 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
647 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
648 config_file = None
649 for o, a in opts:
650 if o in ("-h", "--help"):
651 usage()
652 sys.exit()
653 elif o in ("-c", "--config"):
654 config_file = a
655 elif o == "--health-check":
656 from osm_lcm.lcm_hc import health_check
657 health_check(health_check_file, Lcm.ping_interval_pace)
658 # elif o == "--log-socket-port":
659 # log_socket_port = a
660 # elif o == "--log-socket-host":
661 # log_socket_host = a
662 # elif o == "--log-file":
663 # log_file = a
664 else:
665 assert False, "Unhandled option"
666
667 if config_file:
668 if not path.isfile(config_file):
669 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
670 exit(1)
671 else:
672 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
673 if path.isfile(config_file):
674 break
675 else:
676 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
677 exit(1)
678 lcm = Lcm(config_file)
679 lcm.start()
680 except (LcmException, getopt.GetoptError) as e:
681 print(str(e), file=sys.stderr)
682 # usage()
683 exit(1)