5d27277f43002de9f9d4187c9ef98d055c61841e
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns
33 from osm_lcm import vim_sdn
34 from osm_lcm import netslice
35 from osm_lcm.ng_ro import NgRoException, NgRoClient
36 from osm_lcm.ROclient import ROClient, ROClientException
37
38 from time import time
39 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
40 from osm_lcm import version as lcm_version, version_date as lcm_version_date
41
42 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
43 from osm_common import version as common_version
44 from osm_common.dbbase import DbException
45 from osm_common.fsbase import FsException
46 from osm_common.msgbase import MsgException
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50
51 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
52 pdb.set_trace()
53
54
55 __author__ = "Alfonso Tierno"
56 min_RO_version = "6.0.2"
57 min_n2vc_version = "0.0.2"
58
59 min_common_version = "0.1.19"
60 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
61
62
63 class Lcm:
64
65 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
66 ping_interval_boot = 5 # how many time ping is sent when booting
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 self.db = None
75 self.msg = None
76 self.msg_admin = None
77 self.fs = None
78 self.pings_not_received = 1
79 self.consecutive_errors = 0
80 self.first_start = False
81
82 # logging
83 self.logger = logging.getLogger('lcm')
84 # get id
85 self.worker_id = self.get_process_id()
86 # load configuration
87 config = self.read_config_file(config_file)
88 self.config = config
89 self.config["ro_config"] = {
90 "ng": config["RO"].get("ng", False),
91 "uri": config["RO"].get("uri"),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.roclient",
94 "loglevel": config["RO"].get("loglevel", "ERROR"),
95 }
96 if not self.config["ro_config"]["uri"]:
97 if not self.config["ro_config"]["ng"]:
98 self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"],
99 config["RO"]["port"])
100 else:
101 self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"])
102
103 self.loop = loop or asyncio.get_event_loop()
104
105 # logging
106 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
107 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
108 config["database"]["logger_name"] = "lcm.db"
109 config["storage"]["logger_name"] = "lcm.fs"
110 config["message"]["logger_name"] = "lcm.msg"
111 if config["global"].get("logfile"):
112 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
113 maxBytes=100e6, backupCount=9, delay=0)
114 file_handler.setFormatter(log_formatter_simple)
115 self.logger.addHandler(file_handler)
116 if not config["global"].get("nologging"):
117 str_handler = logging.StreamHandler()
118 str_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(str_handler)
120
121 if config["global"].get("loglevel"):
122 self.logger.setLevel(config["global"]["loglevel"])
123
124 # logging other modules
125 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
126 config[k1]["logger_name"] = logname
127 logger_module = logging.getLogger(logname)
128 if config[k1].get("logfile"):
129 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
130 maxBytes=100e6, backupCount=9, delay=0)
131 file_handler.setFormatter(log_formatter_simple)
132 logger_module.addHandler(file_handler)
133 if config[k1].get("loglevel"):
134 logger_module.setLevel(config[k1]["loglevel"])
135 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
136
137 # check version of N2VC
138 # TODO enhance with int conversion or from distutils.version import LooseVersion
139 # or with list(map(int, version.split(".")))
140 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
141 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
142 n2vc_version, min_n2vc_version))
143 # check version of common
144 if versiontuple(common_version) < versiontuple(min_common_version):
145 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
146 common_version, min_common_version))
147
148 try:
149 # TODO check database version
150 if config["database"]["driver"] == "mongo":
151 self.db = dbmongo.DbMongo()
152 self.db.db_connect(config["database"])
153 elif config["database"]["driver"] == "memory":
154 self.db = dbmemory.DbMemory()
155 self.db.db_connect(config["database"])
156 else:
157 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
158 config["database"]["driver"]))
159
160 if config["storage"]["driver"] == "local":
161 self.fs = fslocal.FsLocal()
162 self.fs.fs_connect(config["storage"])
163 elif config["storage"]["driver"] == "mongo":
164 self.fs = fsmongo.FsMongo()
165 self.fs.fs_connect(config["storage"])
166 else:
167 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
168 config["storage"]["driver"]))
169
170 # copy message configuration in order to remove 'group_id' for msg_admin
171 config_message = config["message"].copy()
172 config_message["loop"] = self.loop
173 if config_message["driver"] == "local":
174 self.msg = msglocal.MsgLocal()
175 self.msg.connect(config_message)
176 self.msg_admin = msglocal.MsgLocal()
177 config_message.pop("group_id", None)
178 self.msg_admin.connect(config_message)
179 elif config_message["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_message)
182 self.msg_admin = msgkafka.MsgKafka()
183 config_message.pop("group_id", None)
184 self.msg_admin.connect(config_message)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
187 config["message"]["driver"]))
188 except (DbException, FsException, MsgException) as e:
189 self.logger.critical(str(e), exc_info=True)
190 raise LcmException(str(e))
191
192 # contains created tasks/futures to be able to cancel
193 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
194
195 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
196 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
197 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
198 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
199 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
200 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
201 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
202
203 async def check_RO_version(self):
204 tries = 14
205 last_error = None
206 while True:
207 try:
208 if self.config["ro_config"].get("ng"):
209 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
210 else:
211 ro_server = ROClient(self.loop, **self.config["ro_config"])
212 ro_version = await ro_server.get_version()
213 if versiontuple(ro_version) < versiontuple(min_RO_version):
214 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
215 ro_version, min_RO_version))
216 self.logger.info("Connected to RO version {}".format(ro_version))
217 return
218 except (ROClientException, NgRoException) as e:
219 tries -= 1
220 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
221 if tries <= 0:
222 self.logger.critical(error_text)
223 raise LcmException(error_text)
224 if last_error != error_text:
225 last_error = error_text
226 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
227 await asyncio.sleep(5)
228
229 async def test(self, param=None):
230 self.logger.debug("Starting/Ending test task: {}".format(param))
231
232 async def kafka_ping(self):
233 self.logger.debug("Task kafka_ping Enter")
234 consecutive_errors = 0
235 first_start = True
236 kafka_has_received = False
237 self.pings_not_received = 1
238 while True:
239 try:
240 await self.msg_admin.aiowrite(
241 "admin", "ping",
242 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
243 self.loop)
244 # time between pings are low when it is not received and at starting
245 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
246 if not self.pings_not_received:
247 kafka_has_received = True
248 self.pings_not_received += 1
249 await asyncio.sleep(wait_time, loop=self.loop)
250 if self.pings_not_received > 10:
251 raise LcmException("It is not receiving pings from Kafka bus")
252 consecutive_errors = 0
253 first_start = False
254 except LcmException:
255 raise
256 except Exception as e:
257 # if not first_start is the first time after starting. So leave more time and wait
258 # to allow kafka starts
259 if consecutive_errors == 8 if not first_start else 30:
260 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
261 raise
262 consecutive_errors += 1
263 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
264 wait_time = 2 if not first_start else 5
265 await asyncio.sleep(wait_time, loop=self.loop)
266
267 def kafka_read_callback(self, topic, command, params):
268 order_id = 1
269
270 if topic != "admin" and command != "ping":
271 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
272 self.consecutive_errors = 0
273 self.first_start = False
274 order_id += 1
275 if command == "exit":
276 raise LcmExceptionExit
277 elif command.startswith("#"):
278 return
279 elif command == "echo":
280 # just for test
281 print(params)
282 sys.stdout.flush()
283 return
284 elif command == "test":
285 asyncio.Task(self.test(params), loop=self.loop)
286 return
287
288 if topic == "admin":
289 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
290 if params.get("worker_id") != self.worker_id:
291 return
292 self.pings_not_received = 0
293 try:
294 with open(health_check_file, "w") as f:
295 f.write(str(time()))
296 except Exception as e:
297 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
298 return
299 elif topic == "pla":
300 if command == "placement":
301 self.ns.update_nsrs_with_pla_result(params)
302 return
303 elif topic == "k8scluster":
304 if command == "create" or command == "created":
305 k8scluster_id = params.get("_id")
306 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
307 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
308 return
309 elif command == "delete" or command == "deleted":
310 k8scluster_id = params.get("_id")
311 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
312 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
313 return
314 elif topic == "k8srepo":
315 if command == "create" or command == "created":
316 k8srepo_id = params.get("_id")
317 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
318 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
319 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
320 return
321 elif command == "delete" or command == "deleted":
322 k8srepo_id = params.get("_id")
323 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
324 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
325 return
326 elif topic == "ns":
327 if command == "instantiate":
328 # self.logger.debug("Deploying NS {}".format(nsr_id))
329 nslcmop = params
330 nslcmop_id = nslcmop["_id"]
331 nsr_id = nslcmop["nsInstanceId"]
332 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
333 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
334 return
335 elif command == "terminate":
336 # self.logger.debug("Deleting NS {}".format(nsr_id))
337 nslcmop = params
338 nslcmop_id = nslcmop["_id"]
339 nsr_id = nslcmop["nsInstanceId"]
340 self.lcm_tasks.cancel(topic, nsr_id)
341 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
342 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
343 return
344 elif command == "action":
345 # self.logger.debug("Update NS {}".format(nsr_id))
346 nslcmop = params
347 nslcmop_id = nslcmop["_id"]
348 nsr_id = nslcmop["nsInstanceId"]
349 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
350 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
351 return
352 elif command == "scale":
353 # self.logger.debug("Update NS {}".format(nsr_id))
354 nslcmop = params
355 nslcmop_id = nslcmop["_id"]
356 nsr_id = nslcmop["nsInstanceId"]
357 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
358 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
359 return
360 elif command == "show":
361 nsr_id = params
362 try:
363 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
364 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
365 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
366 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
367 db_nsr["detailed-status"],
368 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
369 except Exception as e:
370 print("nsr {} not found: {}".format(nsr_id, e))
371 sys.stdout.flush()
372 return
373 elif command == "deleted":
374 return # TODO cleaning of task just in case should be done
375 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
376 return
377 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
378 if command == "instantiate":
379 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
380 nsilcmop = params
381 nsilcmop_id = nsilcmop["_id"] # slice operation id
382 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
383 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
384 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
385 return
386 elif command == "terminate":
387 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
388 nsilcmop = params
389 nsilcmop_id = nsilcmop["_id"] # slice operation id
390 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
391 self.lcm_tasks.cancel(topic, nsir_id)
392 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
393 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
394 return
395 elif command == "show":
396 nsir_id = params
397 try:
398 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
399 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
400 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
401 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
402 db_nsir["detailed-status"],
403 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
404 except Exception as e:
405 print("nsir {} not found: {}".format(nsir_id, e))
406 sys.stdout.flush()
407 return
408 elif command == "deleted":
409 return # TODO cleaning of task just in case should be done
410 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
411 return
412 elif topic == "vim_account":
413 vim_id = params["_id"]
414 if command in ("create", "created"):
415 task = asyncio.ensure_future(self.vim.create(params, order_id))
416 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
417 return
418 elif command == "delete" or command == "deleted":
419 self.lcm_tasks.cancel(topic, vim_id)
420 task = asyncio.ensure_future(self.vim.delete(params, order_id))
421 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
422 return
423 elif command == "show":
424 print("not implemented show with vim_account")
425 sys.stdout.flush()
426 return
427 elif command in ("edit", "edited"):
428 task = asyncio.ensure_future(self.vim.edit(params, order_id))
429 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
430 return
431 elif command == "deleted":
432 return # TODO cleaning of task just in case should be done
433 elif topic == "wim_account":
434 wim_id = params["_id"]
435 if command in ("create", "created"):
436 task = asyncio.ensure_future(self.wim.create(params, order_id))
437 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
438 return
439 elif command == "delete" or command == "deleted":
440 self.lcm_tasks.cancel(topic, wim_id)
441 task = asyncio.ensure_future(self.wim.delete(params, order_id))
442 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
443 return
444 elif command == "show":
445 print("not implemented show with wim_account")
446 sys.stdout.flush()
447 return
448 elif command in ("edit", "edited"):
449 task = asyncio.ensure_future(self.wim.edit(params, order_id))
450 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
451 return
452 elif command == "deleted":
453 return # TODO cleaning of task just in case should be done
454 elif topic == "sdn":
455 _sdn_id = params["_id"]
456 if command in ("create", "created"):
457 task = asyncio.ensure_future(self.sdn.create(params, order_id))
458 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
459 return
460 elif command == "delete" or command == "deleted":
461 self.lcm_tasks.cancel(topic, _sdn_id)
462 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
463 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
464 return
465 elif command in ("edit", "edited"):
466 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
467 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
468 return
469 elif command == "deleted":
470 return # TODO cleaning of task just in case should be done
471 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
472
473 async def kafka_read(self):
474 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
475 # future = asyncio.Future()
476 self.consecutive_errors = 0
477 self.first_start = True
478 while self.consecutive_errors < 10:
479 try:
480 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
481 topics_admin = ("admin", )
482 await asyncio.gather(
483 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
484 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
485 )
486
487 except LcmExceptionExit:
488 self.logger.debug("Bye!")
489 break
490 except Exception as e:
491 # if not first_start is the first time after starting. So leave more time and wait
492 # to allow kafka starts
493 if self.consecutive_errors == 8 if not self.first_start else 30:
494 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
495 raise
496 self.consecutive_errors += 1
497 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
498 wait_time = 2 if not self.first_start else 5
499 await asyncio.sleep(wait_time, loop=self.loop)
500
501 # self.logger.debug("Task kafka_read terminating")
502 self.logger.debug("Task kafka_read exit")
503
504 def start(self):
505
506 # check RO version
507 self.loop.run_until_complete(self.check_RO_version())
508
509 self.loop.run_until_complete(asyncio.gather(
510 self.kafka_read(),
511 self.kafka_ping()
512 ))
513 # TODO
514 # self.logger.debug("Terminating cancelling creation tasks")
515 # self.lcm_tasks.cancel("ALL", "create")
516 # timeout = 200
517 # while self.is_pending_tasks():
518 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
519 # await asyncio.sleep(2, loop=self.loop)
520 # timeout -= 2
521 # if not timeout:
522 # self.lcm_tasks.cancel("ALL", "ALL")
523 self.loop.close()
524 self.loop = None
525 if self.db:
526 self.db.db_disconnect()
527 if self.msg:
528 self.msg.disconnect()
529 if self.msg_admin:
530 self.msg_admin.disconnect()
531 if self.fs:
532 self.fs.fs_disconnect()
533
534 def read_config_file(self, config_file):
535 # TODO make a [ini] + yaml inside parser
536 # the configparser library is not suitable, because it does not admit comments at the end of line,
537 # and not parse integer or boolean
538 try:
539 # read file as yaml format
540 with open(config_file) as f:
541 conf = yaml.load(f, Loader=yaml.Loader)
542 # Ensure all sections are not empty
543 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
544 if not conf.get(k):
545 conf[k] = {}
546
547 # read all environ that starts with OSMLCM_
548 for k, v in environ.items():
549 if not k.startswith("OSMLCM_"):
550 continue
551 subject, _, item = k[7:].lower().partition("_")
552 if not item:
553 continue
554 if subject in ("ro", "vca"):
555 # put in capital letter
556 subject = subject.upper()
557 try:
558 if item == "port" or subject == "timeout":
559 conf[subject][item] = int(v)
560 else:
561 conf[subject][item] = v
562 except Exception as e:
563 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
564
565 # backward compatibility of VCA parameters
566
567 if 'pubkey' in conf["VCA"]:
568 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
569 if 'cacert' in conf["VCA"]:
570 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
571 if 'apiproxy' in conf["VCA"]:
572 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
573
574 if 'enableosupgrade' in conf["VCA"]:
575 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
576 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
577 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
578 conf["VCA"]['enable_os_upgrade'] = False
579 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
580 conf["VCA"]['enable_os_upgrade'] = True
581
582 if 'aptmirror' in conf["VCA"]:
583 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
584
585 return conf
586 except Exception as e:
587 self.logger.critical("At config file '{}': {}".format(config_file, e))
588 exit(1)
589
590 @staticmethod
591 def get_process_id():
592 """
593 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
594 will provide a random one
595 :return: Obtained ID
596 """
597 # Try getting docker id. If fails, get pid
598 try:
599 with open("/proc/self/cgroup", "r") as f:
600 text_id_ = f.readline()
601 _, _, text_id = text_id_.rpartition("/")
602 text_id = text_id.replace('\n', '')[:12]
603 if text_id:
604 return text_id
605 except Exception:
606 pass
607 # Return a random id
608 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
609
610
611 def usage():
612 print("""Usage: {} [options]
613 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
614 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
615 -h|--help: shows this help
616 """.format(sys.argv[0]))
617 # --log-socket-host HOST: send logs to this host")
618 # --log-socket-port PORT: send logs using this port (default: 9022)")
619
620
621 if __name__ == '__main__':
622
623 try:
624 # print("SYS.PATH='{}'".format(sys.path))
625 # load parameters and configuration
626 # -h
627 # -c value
628 # --config value
629 # --help
630 # --health-check
631 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
632 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
633 config_file = None
634 for o, a in opts:
635 if o in ("-h", "--help"):
636 usage()
637 sys.exit()
638 elif o in ("-c", "--config"):
639 config_file = a
640 elif o == "--health-check":
641 from osm_lcm.lcm_hc import health_check
642 health_check(health_check_file, Lcm.ping_interval_pace)
643 # elif o == "--log-socket-port":
644 # log_socket_port = a
645 # elif o == "--log-socket-host":
646 # log_socket_host = a
647 # elif o == "--log-file":
648 # log_file = a
649 else:
650 assert False, "Unhandled option"
651
652 if config_file:
653 if not path.isfile(config_file):
654 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
655 exit(1)
656 else:
657 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
658 if path.isfile(config_file):
659 break
660 else:
661 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
662 exit(1)
663 lcm = Lcm(config_file)
664 lcm.start()
665 except (LcmException, getopt.GetoptError) as e:
666 print(str(e), file=sys.stderr)
667 # usage()
668 exit(1)