ebfca7e957a927e17acc3259896bf3ea9b6c3054
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from osm_lcm.data_utils.database.database import Database
46 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
47 from os import environ, path
48 from random import choice as random_choice
49 from n2vc import version as n2vc_version
50 import traceback
51
52 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
53 pdb.set_trace()
54
55
56 __author__ = "Alfonso Tierno"
57 min_RO_version = "6.0.2"
58 min_n2vc_version = "0.0.2"
59
60 min_common_version = "0.1.19"
61 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
62
63
64 class Lcm:
65
66 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
67 ping_interval_boot = 5 # how many time ping is sent when booting
68 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
69 # ^ contains for each section at lcm.cfg the used logger name
70
71 def __init__(self, config_file, loop=None):
72 """
73 Init, Connect to database, filesystem storage, and messaging
74 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 :return: None
76 """
77 self.db = None
78 self.msg = None
79 self.msg_admin = None
80 self.fs = None
81 self.pings_not_received = 1
82 self.consecutive_errors = 0
83 self.first_start = False
84
85 # logging
86 self.logger = logging.getLogger('lcm')
87 # get id
88 self.worker_id = self.get_process_id()
89 # load configuration
90 config = self.read_config_file(config_file)
91 self.config = config
92 self.config["ro_config"] = {
93 "ng": config["RO"].get("ng", False),
94 "uri": config["RO"].get("uri"),
95 "tenant": config.get("tenant", "osm"),
96 "logger_name": "lcm.roclient",
97 "loglevel": config["RO"].get("loglevel", "ERROR"),
98 }
99 if not self.config["ro_config"]["uri"]:
100 self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
101 elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
102 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
103 index = self.config["ro_config"]["uri"][-1].rfind("/")
104 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
105
106 self.loop = loop or asyncio.get_event_loop()
107 self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
108
109 # logging
110 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
111 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
112 config["database"]["logger_name"] = "lcm.db"
113 config["storage"]["logger_name"] = "lcm.fs"
114 config["message"]["logger_name"] = "lcm.msg"
115 if config["global"].get("logfile"):
116 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
117 maxBytes=100e6, backupCount=9, delay=0)
118 file_handler.setFormatter(log_formatter_simple)
119 self.logger.addHandler(file_handler)
120 if not config["global"].get("nologging"):
121 str_handler = logging.StreamHandler()
122 str_handler.setFormatter(log_formatter_simple)
123 self.logger.addHandler(str_handler)
124
125 if config["global"].get("loglevel"):
126 self.logger.setLevel(config["global"]["loglevel"])
127
128 # logging other modules
129 for k1, logname in self.cfg_logger_name.items():
130 config[k1]["logger_name"] = logname
131 logger_module = logging.getLogger(logname)
132 if config[k1].get("logfile"):
133 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
134 maxBytes=100e6, backupCount=9, delay=0)
135 file_handler.setFormatter(log_formatter_simple)
136 logger_module.addHandler(file_handler)
137 if config[k1].get("loglevel"):
138 logger_module.setLevel(config[k1]["loglevel"])
139 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
140
141 # check version of N2VC
142 # TODO enhance with int conversion or from distutils.version import LooseVersion
143 # or with list(map(int, version.split(".")))
144 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
145 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
146 n2vc_version, min_n2vc_version))
147 # check version of common
148 if versiontuple(common_version) < versiontuple(min_common_version):
149 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
150 common_version, min_common_version))
151
152 try:
153 self.db = Database(config).instance.db
154
155 self.fs = Filesystem(config).instance.fs
156
157 # copy message configuration in order to remove 'group_id' for msg_admin
158 config_message = config["message"].copy()
159 config_message["loop"] = self.loop
160 if config_message["driver"] == "local":
161 self.msg = msglocal.MsgLocal()
162 self.msg.connect(config_message)
163 self.msg_admin = msglocal.MsgLocal()
164 config_message.pop("group_id", None)
165 self.msg_admin.connect(config_message)
166 elif config_message["driver"] == "kafka":
167 self.msg = msgkafka.MsgKafka()
168 self.msg.connect(config_message)
169 self.msg_admin = msgkafka.MsgKafka()
170 config_message.pop("group_id", None)
171 self.msg_admin.connect(config_message)
172 else:
173 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
174 config["message"]["driver"]))
175 except (DbException, FsException, MsgException) as e:
176 self.logger.critical(str(e), exc_info=True)
177 raise LcmException(str(e))
178
179 # contains created tasks/futures to be able to cancel
180 self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
181
182 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
183 if self.config["tsdb"]["driver"] == "prometheus":
184 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
185 else:
186 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
187 config["tsdb"]["driver"]))
188 else:
189 self.prometheus = None
190
191 async def check_RO_version(self):
192 tries = 14
193 last_error = None
194 while True:
195 ro_uri = self.config["ro_config"]["uri"]
196 try:
197 # try new RO, if fail old RO
198 try:
199 self.config["ro_config"]["uri"] = ro_uri + "ro"
200 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
201 ro_version = await ro_server.get_version()
202 self.config["ro_config"]["ng"] = True
203 except Exception:
204 self.config["ro_config"]["uri"] = ro_uri + "openmano"
205 ro_server = ROClient(self.loop, **self.config["ro_config"])
206 ro_version = await ro_server.get_version()
207 self.config["ro_config"]["ng"] = False
208 if versiontuple(ro_version) < versiontuple(min_RO_version):
209 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
210 ro_version, min_RO_version))
211 self.logger.info("Connected to RO version {} new-generation version {}".
212 format(ro_version, self.config["ro_config"]["ng"]))
213 return
214 except (ROClientException, NgRoException) as e:
215 self.config["ro_config"]["uri"] = ro_uri
216 tries -= 1
217 traceback.print_tb(e.__traceback__)
218 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
219 if tries <= 0:
220 self.logger.critical(error_text)
221 raise LcmException(error_text)
222 if last_error != error_text:
223 last_error = error_text
224 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
225 await asyncio.sleep(5)
226
227 async def test(self, param=None):
228 self.logger.debug("Starting/Ending test task: {}".format(param))
229
230 async def kafka_ping(self):
231 self.logger.debug("Task kafka_ping Enter")
232 consecutive_errors = 0
233 first_start = True
234 kafka_has_received = False
235 self.pings_not_received = 1
236 while True:
237 try:
238 await self.msg_admin.aiowrite(
239 "admin", "ping",
240 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
241 self.loop)
242 # time between pings are low when it is not received and at starting
243 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
244 if not self.pings_not_received:
245 kafka_has_received = True
246 self.pings_not_received += 1
247 await asyncio.sleep(wait_time, loop=self.loop)
248 if self.pings_not_received > 10:
249 raise LcmException("It is not receiving pings from Kafka bus")
250 consecutive_errors = 0
251 first_start = False
252 except LcmException:
253 raise
254 except Exception as e:
255 # if not first_start is the first time after starting. So leave more time and wait
256 # to allow kafka starts
257 if consecutive_errors == 8 if not first_start else 30:
258 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
259 raise
260 consecutive_errors += 1
261 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
262 wait_time = 2 if not first_start else 5
263 await asyncio.sleep(wait_time, loop=self.loop)
264
265 def kafka_read_callback(self, topic, command, params):
266 order_id = 1
267
268 if topic != "admin" and command != "ping":
269 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
270 self.consecutive_errors = 0
271 self.first_start = False
272 order_id += 1
273 if command == "exit":
274 raise LcmExceptionExit
275 elif command.startswith("#"):
276 return
277 elif command == "echo":
278 # just for test
279 print(params)
280 sys.stdout.flush()
281 return
282 elif command == "test":
283 asyncio.Task(self.test(params), loop=self.loop)
284 return
285
286 if topic == "admin":
287 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
288 if params.get("worker_id") != self.worker_id:
289 return
290 self.pings_not_received = 0
291 try:
292 with open(health_check_file, "w") as f:
293 f.write(str(time()))
294 except Exception as e:
295 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
296 return
297 elif topic == "pla":
298 if command == "placement":
299 self.ns.update_nsrs_with_pla_result(params)
300 return
301 elif topic == "k8scluster":
302 if command == "create" or command == "created":
303 k8scluster_id = params.get("_id")
304 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
305 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
306 return
307 elif command == "delete" or command == "deleted":
308 k8scluster_id = params.get("_id")
309 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
310 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
311 return
312 elif topic == "vca":
313 if command == "create" or command == "created":
314 vca_id = params.get("_id")
315 task = asyncio.ensure_future(self.vca.create(params, order_id))
316 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
317 return
318 elif command == "delete" or command == "deleted":
319 vca_id = params.get("_id")
320 task = asyncio.ensure_future(self.vca.delete(params, order_id))
321 self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
322 return
323 elif topic == "k8srepo":
324 if command == "create" or command == "created":
325 k8srepo_id = params.get("_id")
326 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
327 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
328 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
329 return
330 elif command == "delete" or command == "deleted":
331 k8srepo_id = params.get("_id")
332 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
333 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
334 return
335 elif topic == "ns":
336 if command == "instantiate":
337 # self.logger.debug("Deploying NS {}".format(nsr_id))
338 nslcmop = params
339 nslcmop_id = nslcmop["_id"]
340 nsr_id = nslcmop["nsInstanceId"]
341 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
342 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
343 return
344 elif command == "terminate":
345 # self.logger.debug("Deleting NS {}".format(nsr_id))
346 nslcmop = params
347 nslcmop_id = nslcmop["_id"]
348 nsr_id = nslcmop["nsInstanceId"]
349 self.lcm_tasks.cancel(topic, nsr_id)
350 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
351 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
352 return
353 elif command == "vca_status_refresh":
354 nslcmop = params
355 nslcmop_id = nslcmop["_id"]
356 nsr_id = nslcmop["nsInstanceId"]
357 task = asyncio.ensure_future(self.ns.vca_status_refresh(nsr_id, nslcmop_id))
358 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task)
359 return
360 elif command == "action":
361 # self.logger.debug("Update NS {}".format(nsr_id))
362 nslcmop = params
363 nslcmop_id = nslcmop["_id"]
364 nsr_id = nslcmop["nsInstanceId"]
365 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
366 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
367 return
368 elif command == "scale":
369 # self.logger.debug("Update NS {}".format(nsr_id))
370 nslcmop = params
371 nslcmop_id = nslcmop["_id"]
372 nsr_id = nslcmop["nsInstanceId"]
373 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
374 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
375 return
376 elif command == "show":
377 nsr_id = params
378 try:
379 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
380 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
381 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
382 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
383 db_nsr["detailed-status"],
384 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
385 except Exception as e:
386 print("nsr {} not found: {}".format(nsr_id, e))
387 sys.stdout.flush()
388 return
389 elif command == "deleted":
390 return # TODO cleaning of task just in case should be done
391 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
392 return
393 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
394 if command == "instantiate":
395 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
396 nsilcmop = params
397 nsilcmop_id = nsilcmop["_id"] # slice operation id
398 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
399 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
400 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
401 return
402 elif command == "terminate":
403 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
404 nsilcmop = params
405 nsilcmop_id = nsilcmop["_id"] # slice operation id
406 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
407 self.lcm_tasks.cancel(topic, nsir_id)
408 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
409 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
410 return
411 elif command == "show":
412 nsir_id = params
413 try:
414 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
415 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
416 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
417 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
418 db_nsir["detailed-status"],
419 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
420 except Exception as e:
421 print("nsir {} not found: {}".format(nsir_id, e))
422 sys.stdout.flush()
423 return
424 elif command == "deleted":
425 return # TODO cleaning of task just in case should be done
426 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
427 return
428 elif topic == "vim_account":
429 vim_id = params["_id"]
430 if command in ("create", "created"):
431 if not self.config["ro_config"].get("ng"):
432 task = asyncio.ensure_future(self.vim.create(params, order_id))
433 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
434 return
435 elif command == "delete" or command == "deleted":
436 self.lcm_tasks.cancel(topic, vim_id)
437 task = asyncio.ensure_future(self.vim.delete(params, order_id))
438 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
439 return
440 elif command == "show":
441 print("not implemented show with vim_account")
442 sys.stdout.flush()
443 return
444 elif command in ("edit", "edited"):
445 if not self.config["ro_config"].get("ng"):
446 task = asyncio.ensure_future(self.vim.edit(params, order_id))
447 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
448 return
449 elif command == "deleted":
450 return # TODO cleaning of task just in case should be done
451 elif topic == "wim_account":
452 wim_id = params["_id"]
453 if command in ("create", "created"):
454 if not self.config["ro_config"].get("ng"):
455 task = asyncio.ensure_future(self.wim.create(params, order_id))
456 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
457 return
458 elif command == "delete" or command == "deleted":
459 self.lcm_tasks.cancel(topic, wim_id)
460 task = asyncio.ensure_future(self.wim.delete(params, order_id))
461 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
462 return
463 elif command == "show":
464 print("not implemented show with wim_account")
465 sys.stdout.flush()
466 return
467 elif command in ("edit", "edited"):
468 task = asyncio.ensure_future(self.wim.edit(params, order_id))
469 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
470 return
471 elif command == "deleted":
472 return # TODO cleaning of task just in case should be done
473 elif topic == "sdn":
474 _sdn_id = params["_id"]
475 if command in ("create", "created"):
476 if not self.config["ro_config"].get("ng"):
477 task = asyncio.ensure_future(self.sdn.create(params, order_id))
478 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
479 return
480 elif command == "delete" or command == "deleted":
481 self.lcm_tasks.cancel(topic, _sdn_id)
482 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
483 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
484 return
485 elif command in ("edit", "edited"):
486 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
487 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
488 return
489 elif command == "deleted":
490 return # TODO cleaning of task just in case should be done
491 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
492
493 async def kafka_read(self):
494 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
495 # future = asyncio.Future()
496 self.consecutive_errors = 0
497 self.first_start = True
498 while self.consecutive_errors < 10:
499 try:
500 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "vca", "k8srepo", "pla")
501 topics_admin = ("admin", )
502 await asyncio.gather(
503 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
504 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
505 )
506
507 except LcmExceptionExit:
508 self.logger.debug("Bye!")
509 break
510 except Exception as e:
511 # if not first_start is the first time after starting. So leave more time and wait
512 # to allow kafka starts
513 if self.consecutive_errors == 8 if not self.first_start else 30:
514 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
515 raise
516 self.consecutive_errors += 1
517 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
518 wait_time = 2 if not self.first_start else 5
519 await asyncio.sleep(wait_time, loop=self.loop)
520
521 # self.logger.debug("Task kafka_read terminating")
522 self.logger.debug("Task kafka_read exit")
523
524 def start(self):
525
526 # check RO version
527 self.loop.run_until_complete(self.check_RO_version())
528
529 self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
530 self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
531 self.ns)
532 self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
533 self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
534 self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
535 self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
536 self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
537 self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
538
539 # configure tsdb prometheus
540 if self.prometheus:
541 self.loop.run_until_complete(self.prometheus.start())
542
543 self.loop.run_until_complete(asyncio.gather(
544 self.kafka_read(),
545 self.kafka_ping()
546 ))
547 # TODO
548 # self.logger.debug("Terminating cancelling creation tasks")
549 # self.lcm_tasks.cancel("ALL", "create")
550 # timeout = 200
551 # while self.is_pending_tasks():
552 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
553 # await asyncio.sleep(2, loop=self.loop)
554 # timeout -= 2
555 # if not timeout:
556 # self.lcm_tasks.cancel("ALL", "ALL")
557 self.loop.close()
558 self.loop = None
559 if self.db:
560 self.db.db_disconnect()
561 if self.msg:
562 self.msg.disconnect()
563 if self.msg_admin:
564 self.msg_admin.disconnect()
565 if self.fs:
566 self.fs.fs_disconnect()
567
568 def read_config_file(self, config_file):
569 # TODO make a [ini] + yaml inside parser
570 # the configparser library is not suitable, because it does not admit comments at the end of line,
571 # and not parse integer or boolean
572 try:
573 # read file as yaml format
574 with open(config_file) as f:
575 conf = yaml.load(f, Loader=yaml.Loader)
576 # Ensure all sections are not empty
577 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
578 if not conf.get(k):
579 conf[k] = {}
580
581 # read all environ that starts with OSMLCM_
582 for k, v in environ.items():
583 if not k.startswith("OSMLCM_"):
584 continue
585 subject, _, item = k[7:].lower().partition("_")
586 if not item:
587 continue
588 if subject in ("ro", "vca"):
589 # put in capital letter
590 subject = subject.upper()
591 try:
592 if item == "port" or subject == "timeout":
593 conf[subject][item] = int(v)
594 else:
595 conf[subject][item] = v
596 except Exception as e:
597 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
598
599 # backward compatibility of VCA parameters
600
601 if 'pubkey' in conf["VCA"]:
602 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
603 if 'cacert' in conf["VCA"]:
604 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
605 if 'apiproxy' in conf["VCA"]:
606 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
607
608 if 'enableosupgrade' in conf["VCA"]:
609 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
610 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
611 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
612 conf["VCA"]['enable_os_upgrade'] = False
613 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
614 conf["VCA"]['enable_os_upgrade'] = True
615
616 if 'aptmirror' in conf["VCA"]:
617 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
618
619 return conf
620 except Exception as e:
621 self.logger.critical("At config file '{}': {}".format(config_file, e))
622 exit(1)
623
624 @staticmethod
625 def get_process_id():
626 """
627 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
628 will provide a random one
629 :return: Obtained ID
630 """
631 # Try getting docker id. If fails, get pid
632 try:
633 with open("/proc/self/cgroup", "r") as f:
634 text_id_ = f.readline()
635 _, _, text_id = text_id_.rpartition("/")
636 text_id = text_id.replace('\n', '')[:12]
637 if text_id:
638 return text_id
639 except Exception:
640 pass
641 # Return a random id
642 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
643
644
645 def usage():
646 print("""Usage: {} [options]
647 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
648 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
649 -h|--help: shows this help
650 """.format(sys.argv[0]))
651 # --log-socket-host HOST: send logs to this host")
652 # --log-socket-port PORT: send logs using this port (default: 9022)")
653
654
655 if __name__ == '__main__':
656
657 try:
658 # print("SYS.PATH='{}'".format(sys.path))
659 # load parameters and configuration
660 # -h
661 # -c value
662 # --config value
663 # --help
664 # --health-check
665 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
666 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
667 config_file = None
668 for o, a in opts:
669 if o in ("-h", "--help"):
670 usage()
671 sys.exit()
672 elif o in ("-c", "--config"):
673 config_file = a
674 elif o == "--health-check":
675 from osm_lcm.lcm_hc import health_check
676 health_check(health_check_file, Lcm.ping_interval_pace)
677 # elif o == "--log-socket-port":
678 # log_socket_port = a
679 # elif o == "--log-socket-host":
680 # log_socket_host = a
681 # elif o == "--log-file":
682 # log_file = a
683 else:
684 assert False, "Unhandled option"
685
686 if config_file:
687 if not path.isfile(config_file):
688 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
689 exit(1)
690 else:
691 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
692 if path.isfile(config_file):
693 break
694 else:
695 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
696 exit(1)
697 lcm = Lcm(config_file)
698 lcm.start()
699 except (LcmException, getopt.GetoptError) as e:
700 print(str(e), file=sys.stderr)
701 # usage()
702 exit(1)