659a57c340e9f2c49f5f47f343d39a98ce2d3348
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
62
63
64 class Lcm:
65
66 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
67 ping_interval_boot = 5 # how many time ping is sent when booting
68 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
69 # ^ contains for each section at lcm.cfg the used logger name
70
71 def __init__(self, config_file, loop=None):
72 """
73 Init, Connect to database, filesystem storage, and messaging
74 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 :return: None
76 """
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.config["ro_config"] = {
93 "ng": config["RO"].get("ng", False),
94 "uri": config["RO"].get("uri"),
95 "tenant": config.get("tenant", "osm"),
96 "logger_name": "lcm.roclient",
97 "loglevel": config["RO"].get("loglevel", "ERROR"),
98 }
99 if not self.config["ro_config"]["uri"]:
100 self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
101 elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
102 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
103 index = self.config["ro_config"]["uri"][-1].rfind("/")
104 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
105
106 self.loop = loop or asyncio.get_event_loop()
107 self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
108
109 # logging
110 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
111 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
112 config["database"]["logger_name"] = "lcm.db"
113 config["storage"]["logger_name"] = "lcm.fs"
114 config["message"]["logger_name"] = "lcm.msg"
115 if config["global"].get("logfile"):
116 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
117 maxBytes=100e6, backupCount=9, delay=0)
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not config["global"].get("nologging"):
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if config["global"].get("loglevel"):
126 self.logger.setLevel(config["global"]["loglevel"])
127
128 # logging other modules
129 for k1, logname in self.cfg_logger_name.items():
130 config[k1]["logger_name"] = logname
131 logger_module = logging.getLogger(logname)
132 if config[k1].get("logfile"):
133 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
134 maxBytes=100e6, backupCount=9, delay=0)
135 file_handler.setFormatter(log_formatter_simple)
136 logger_module.addHandler(file_handler)
137 if config[k1].get("loglevel"):
138 logger_module.setLevel(config[k1]["loglevel"])
139 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
140
141 # check version of N2VC
142 # TODO enhance with int conversion or from distutils.version import LooseVersion
143 # or with list(map(int, version.split(".")))
144 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
145 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
146 n2vc_version, min_n2vc_version))
147 # check version of common
148 if versiontuple(common_version) < versiontuple(min_common_version):
149 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
150 common_version, min_common_version))
151
152 try:
153 self.db = Database(config).instance.db
154
155 self.fs = Filesystem(config).instance.fs
156
157 # copy message configuration in order to remove 'group_id' for msg_admin
158 config_message = config["message"].copy()
159 config_message["loop"] = self.loop
160 if config_message["driver"] == "local":
161 self.msg = msglocal.MsgLocal()
162 self.msg.connect(config_message)
163 self.msg_admin = msglocal.MsgLocal()
164 config_message.pop("group_id", None)
165 self.msg_admin.connect(config_message)
166 elif config_message["driver"] == "kafka":
167 self.msg = msgkafka.MsgKafka()
168 self.msg.connect(config_message)
169 self.msg_admin = msgkafka.MsgKafka()
170 config_message.pop("group_id", None)
171 self.msg_admin.connect(config_message)
172 else:
173 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
174 config["message"]["driver"]))
175 except (DbException, FsException, MsgException) as e:
176 self.logger.critical(str(e), exc_info=True)
177 raise LcmException(str(e))
178
179 # contains created tasks/futures to be able to cancel
180 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
181
182 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
183 if self.config["tsdb"]["driver"] == "prometheus":
184 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
187 config["tsdb"]["driver"]))
188 else:
189 self.prometheus = None
190
191 async def check_RO_version(self):
192 tries = 14
193 last_error = None
194 while True:
195 ro_uri = self.config["ro_config"]["uri"]
196 try:
197 # try new RO, if fail old RO
198 try:
199 self.config["ro_config"]["uri"] = ro_uri + "ro"
200 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
201 ro_version = await ro_server.get_version()
202 self.config["ro_config"]["ng"] = True
203 except Exception:
204 self.config["ro_config"]["uri"] = ro_uri + "openmano"
205 ro_server = ROClient(self.loop, **self.config["ro_config"])
206 ro_version = await ro_server.get_version()
207 self.config["ro_config"]["ng"] = False
208 if versiontuple(ro_version) < versiontuple(min_RO_version):
209 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
210 ro_version, min_RO_version))
211 self.logger.info("Connected to RO version {} new-generation version {}".
212 format(ro_version, self.config["ro_config"]["ng"]))
213 return
214 except (ROClientException, NgRoException) as e:
215 self.config["ro_config"]["uri"] = ro_uri
216 tries -= 1
217 traceback.print_tb(e.__traceback__)
218 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
219 if tries <= 0:
220 self.logger.critical(error_text)
221 raise LcmException(error_text)
222 if last_error != error_text:
223 last_error = error_text
224 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
225 await asyncio.sleep(5)
226
227 async def test(self, param=None):
228 self.logger.debug("Starting/Ending test task: {}".format(param))
229
230 async def kafka_ping(self):
231 self.logger.debug("Task kafka_ping Enter")
232 consecutive_errors = 0
233 first_start = True
234 kafka_has_received = False
235 self.pings_not_received = 1
236 while True:
237 try:
238 await self.msg_admin.aiowrite(
239 "admin", "ping",
240 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
241 self.loop)
242 # time between pings are low when it is not received and at starting
243 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
244 if not self.pings_not_received:
245 kafka_has_received = True
246 self.pings_not_received += 1
247 await asyncio.sleep(wait_time, loop=self.loop)
248 if self.pings_not_received > 10:
249 raise LcmException("It is not receiving pings from Kafka bus")
250 consecutive_errors = 0
251 first_start = False
252 except LcmException:
253 raise
254 except Exception as e:
255 # if not first_start is the first time after starting. So leave more time and wait
256 # to allow kafka starts
257 if consecutive_errors == 8 if not first_start else 30:
258 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
259 raise
260 consecutive_errors += 1
261 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
262 wait_time = 2 if not first_start else 5
263 await asyncio.sleep(wait_time, loop=self.loop)
264
265 def kafka_read_callback(self, topic, command, params):
266 order_id = 1
267
268 if topic != "admin" and command != "ping":
269 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
270 self.consecutive_errors = 0
271 self.first_start = False
272 order_id += 1
273 if command == "exit":
274 raise LcmExceptionExit
275 elif command.startswith("#"):
276 return
277 elif command == "echo":
278 # just for test
279 print(params)
280 sys.stdout.flush()
281 return
282 elif command == "test":
283 asyncio.Task(self.test(params), loop=self.loop)
284 return
285
286 if topic == "admin":
287 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
288 if params.get("worker_id") != self.worker_id:
289 return
290 self.pings_not_received = 0
291 try:
292 with open(health_check_file, "w") as f:
293 f.write(str(time()))
294 except Exception as e:
295 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
296 return
297 elif topic == "pla":
298 if command == "placement":
299 self.ns.update_nsrs_with_pla_result(params)
300 return
301 elif topic == "k8scluster":
302 if command == "create" or command == "created":
303 k8scluster_id = params.get("_id")
304 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
305 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
306 return
307 elif command == "delete" or command == "deleted":
308 k8scluster_id = params.get("_id")
309 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
310 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
311 return
312 elif topic == "k8srepo":
313 if command == "create" or command == "created":
314 k8srepo_id = params.get("_id")
315 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
316 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
317 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
318 return
319 elif command == "delete" or command == "deleted":
320 k8srepo_id = params.get("_id")
321 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
322 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
323 return
324 elif topic == "ns":
325 if command == "instantiate":
326 # self.logger.debug("Deploying NS {}".format(nsr_id))
327 nslcmop = params
328 nslcmop_id = nslcmop["_id"]
329 nsr_id = nslcmop["nsInstanceId"]
330 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
331 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
332 return
333 elif command == "terminate":
334 # self.logger.debug("Deleting NS {}".format(nsr_id))
335 nslcmop = params
336 nslcmop_id = nslcmop["_id"]
337 nsr_id = nslcmop["nsInstanceId"]
338 self.lcm_tasks.cancel(topic, nsr_id)
339 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
340 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
341 return
342 elif command == "vca_status_refresh":
343 nslcmop = params
344 nslcmop_id = nslcmop["_id"]
345 nsr_id = nslcmop["nsInstanceId"]
346 task = asyncio.ensure_future(self.ns.vca_status_refresh(nsr_id, nslcmop_id))
347 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task)
348 return
349 elif command == "action":
350 # self.logger.debug("Update NS {}".format(nsr_id))
351 nslcmop = params
352 nslcmop_id = nslcmop["_id"]
353 nsr_id = nslcmop["nsInstanceId"]
354 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
355 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
356 return
357 elif command == "scale":
358 # self.logger.debug("Update NS {}".format(nsr_id))
359 nslcmop = params
360 nslcmop_id = nslcmop["_id"]
361 nsr_id = nslcmop["nsInstanceId"]
362 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
363 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
364 return
365 elif command == "show":
366 nsr_id = params
367 try:
368 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
369 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
370 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
371 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
372 db_nsr["detailed-status"],
373 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
374 except Exception as e:
375 print("nsr {} not found: {}".format(nsr_id, e))
376 sys.stdout.flush()
377 return
378 elif command == "deleted":
379 return # TODO cleaning of task just in case should be done
380 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
381 return
382 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
383 if command == "instantiate":
384 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
385 nsilcmop = params
386 nsilcmop_id = nsilcmop["_id"] # slice operation id
387 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
388 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
389 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
390 return
391 elif command == "terminate":
392 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
393 nsilcmop = params
394 nsilcmop_id = nsilcmop["_id"] # slice operation id
395 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
396 self.lcm_tasks.cancel(topic, nsir_id)
397 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
398 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
399 return
400 elif command == "show":
401 nsir_id = params
402 try:
403 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
404 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
405 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
406 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
407 db_nsir["detailed-status"],
408 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
409 except Exception as e:
410 print("nsir {} not found: {}".format(nsir_id, e))
411 sys.stdout.flush()
412 return
413 elif command == "deleted":
414 return # TODO cleaning of task just in case should be done
415 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
416 return
417 elif topic == "vim_account":
418 vim_id = params["_id"]
419 if command in ("create", "created"):
420 if not self.config["ro_config"].get("ng"):
421 task = asyncio.ensure_future(self.vim.create(params, order_id))
422 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
423 return
424 elif command == "delete" or command == "deleted":
425 self.lcm_tasks.cancel(topic, vim_id)
426 task = asyncio.ensure_future(self.vim.delete(params, order_id))
427 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
428 return
429 elif command == "show":
430 print("not implemented show with vim_account")
431 sys.stdout.flush()
432 return
433 elif command in ("edit", "edited"):
434 if not self.config["ro_config"].get("ng"):
435 task = asyncio.ensure_future(self.vim.edit(params, order_id))
436 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
437 return
438 elif command == "deleted":
439 return # TODO cleaning of task just in case should be done
440 elif topic == "wim_account":
441 wim_id = params["_id"]
442 if command in ("create", "created"):
443 if not self.config["ro_config"].get("ng"):
444 task = asyncio.ensure_future(self.wim.create(params, order_id))
445 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
446 return
447 elif command == "delete" or command == "deleted":
448 self.lcm_tasks.cancel(topic, wim_id)
449 task = asyncio.ensure_future(self.wim.delete(params, order_id))
450 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
451 return
452 elif command == "show":
453 print("not implemented show with wim_account")
454 sys.stdout.flush()
455 return
456 elif command in ("edit", "edited"):
457 task = asyncio.ensure_future(self.wim.edit(params, order_id))
458 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
459 return
460 elif command == "deleted":
461 return # TODO cleaning of task just in case should be done
462 elif topic == "sdn":
463 _sdn_id = params["_id"]
464 if command in ("create", "created"):
465 if not self.config["ro_config"].get("ng"):
466 task = asyncio.ensure_future(self.sdn.create(params, order_id))
467 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
468 return
469 elif command == "delete" or command == "deleted":
470 self.lcm_tasks.cancel(topic, _sdn_id)
471 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
472 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
473 return
474 elif command in ("edit", "edited"):
475 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
476 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
477 return
478 elif command == "deleted":
479 return # TODO cleaning of task just in case should be done
480 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
481
482 async def kafka_read(self):
483 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
484 # future = asyncio.Future()
485 self.consecutive_errors = 0
486 self.first_start = True
487 while self.consecutive_errors < 10:
488 try:
489 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
490 topics_admin = ("admin", )
491 await asyncio.gather(
492 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
493 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
494 )
495
496 except LcmExceptionExit:
497 self.logger.debug("Bye!")
498 break
499 except Exception as e:
500 # if not first_start is the first time after starting. So leave more time and wait
501 # to allow kafka starts
502 if self.consecutive_errors == 8 if not self.first_start else 30:
503 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
504 raise
505 self.consecutive_errors += 1
506 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
507 wait_time = 2 if not self.first_start else 5
508 await asyncio.sleep(wait_time, loop=self.loop)
509
510 # self.logger.debug("Task kafka_read terminating")
511 self.logger.debug("Task kafka_read exit")
512
513 def start(self):
514
515 # check RO version
516 self.loop.run_until_complete(self.check_RO_version())
517
518 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
519 self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
520 self.ns)
521 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
522 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
523 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
524 self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
525 self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
526
527 # configure tsdb prometheus
528 if self.prometheus:
529 self.loop.run_until_complete(self.prometheus.start())
530
531 self.loop.run_until_complete(asyncio.gather(
532 self.kafka_read(),
533 self.kafka_ping()
534 ))
535 # TODO
536 # self.logger.debug("Terminating cancelling creation tasks")
537 # self.lcm_tasks.cancel("ALL", "create")
538 # timeout = 200
539 # while self.is_pending_tasks():
540 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
541 # await asyncio.sleep(2, loop=self.loop)
542 # timeout -= 2
543 # if not timeout:
544 # self.lcm_tasks.cancel("ALL", "ALL")
545 self.loop.close()
546 self.loop = None
547 if self.db:
548 self.db.db_disconnect()
549 if self.msg:
550 self.msg.disconnect()
551 if self.msg_admin:
552 self.msg_admin.disconnect()
553 if self.fs:
554 self.fs.fs_disconnect()
555
556 def read_config_file(self, config_file):
557 # TODO make a [ini] + yaml inside parser
558 # the configparser library is not suitable, because it does not admit comments at the end of line,
559 # and not parse integer or boolean
560 try:
561 # read file as yaml format
562 with open(config_file) as f:
563 conf = yaml.load(f, Loader=yaml.Loader)
564 # Ensure all sections are not empty
565 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
566 if not conf.get(k):
567 conf[k] = {}
568
569 # read all environ that starts with OSMLCM_
570 for k, v in environ.items():
571 if not k.startswith("OSMLCM_"):
572 continue
573 subject, _, item = k[7:].lower().partition("_")
574 if not item:
575 continue
576 if subject in ("ro", "vca"):
577 # put in capital letter
578 subject = subject.upper()
579 try:
580 if item == "port" or subject == "timeout":
581 conf[subject][item] = int(v)
582 else:
583 conf[subject][item] = v
584 except Exception as e:
585 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
586
587 # backward compatibility of VCA parameters
588
589 if 'pubkey' in conf["VCA"]:
590 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
591 if 'cacert' in conf["VCA"]:
592 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
593 if 'apiproxy' in conf["VCA"]:
594 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
595
596 if 'enableosupgrade' in conf["VCA"]:
597 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
598 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
599 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
600 conf["VCA"]['enable_os_upgrade'] = False
601 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
602 conf["VCA"]['enable_os_upgrade'] = True
603
604 if 'aptmirror' in conf["VCA"]:
605 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
606
607 return conf
608 except Exception as e:
609 self.logger.critical("At config file '{}': {}".format(config_file, e))
610 exit(1)
611
612 @staticmethod
613 def get_process_id():
614 """
615 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
616 will provide a random one
617 :return: Obtained ID
618 """
619 # Try getting docker id. If fails, get pid
620 try:
621 with open("/proc/self/cgroup", "r") as f:
622 text_id_ = f.readline()
623 _, _, text_id = text_id_.rpartition("/")
624 text_id = text_id.replace('\n', '')[:12]
625 if text_id:
626 return text_id
627 except Exception:
628 pass
629 # Return a random id
630 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
631
632
633 def usage():
634 print("""Usage: {} [options]
635 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
636 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
637 -h|--help: shows this help
638 """.format(sys.argv[0]))
639 # --log-socket-host HOST: send logs to this host")
640 # --log-socket-port PORT: send logs using this port (default: 9022)")
641
642
643 if __name__ == '__main__':
644
645 try:
646 # print("SYS.PATH='{}'".format(sys.path))
647 # load parameters and configuration
648 # -h
649 # -c value
650 # --config value
651 # --help
652 # --health-check
653 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
654 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
655 config_file = None
656 for o, a in opts:
657 if o in ("-h", "--help"):
658 usage()
659 sys.exit()
660 elif o in ("-c", "--config"):
661 config_file = a
662 elif o == "--health-check":
663 from osm_lcm.lcm_hc import health_check
664 health_check(health_check_file, Lcm.ping_interval_pace)
665 # elif o == "--log-socket-port":
666 # log_socket_port = a
667 # elif o == "--log-socket-host":
668 # log_socket_host = a
669 # elif o == "--log-file":
670 # log_file = a
671 else:
672 assert False, "Unhandled option"
673
674 if config_file:
675 if not path.isfile(config_file):
676 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
677 exit(1)
678 else:
679 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
680 if path.isfile(config_file):
681 break
682 else:
683 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
684 exit(1)
685 lcm = Lcm(config_file)
686 lcm.start()
687 except (LcmException, getopt.GetoptError) as e:
688 print(str(e), file=sys.stderr)
689 # usage()
690 exit(1)