Feature 7184 New Generation RO enhancemnt
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20
21 # DEBUG WITH PDB
22 import os
23 import pdb
24
25 import asyncio
26 import yaml
27 import logging
28 import logging.handlers
29 import getopt
30 import sys
31
32 from osm_lcm import ns, prometheus, vim_sdn, netslice
33 from osm_lcm.ng_ro import NgRoException, NgRoClient
34 from osm_lcm.ROclient import ROClient, ROClientException
35
36 from time import time
37 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
38 from osm_lcm import version as lcm_version, version_date as lcm_version_date
39
40 from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
41 from osm_common import version as common_version
42 from osm_common.dbbase import DbException
43 from osm_common.fsbase import FsException
44 from osm_common.msgbase import MsgException
45 from os import environ, path
46 from random import choice as random_choice
47 from n2vc import version as n2vc_version
48
49 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
50 pdb.set_trace()
51
52
53 __author__ = "Alfonso Tierno"
54 min_RO_version = "6.0.2"
55 min_n2vc_version = "0.0.2"
56
57 min_common_version = "0.1.19"
58 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
59
60
61 class Lcm:
62
63 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
64 ping_interval_boot = 5 # how many time ping is sent when booting
65 cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
66 # ^ contains for each section at lcm.cfg the used logger name
67
68 def __init__(self, config_file, loop=None):
69 """
70 Init, Connect to database, filesystem storage, and messaging
71 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
72 :return: None
73 """
74 self.db = None
75 self.msg = None
76 self.msg_admin = None
77 self.fs = None
78 self.pings_not_received = 1
79 self.consecutive_errors = 0
80 self.first_start = False
81
82 # logging
83 self.logger = logging.getLogger('lcm')
84 # get id
85 self.worker_id = self.get_process_id()
86 # load configuration
87 config = self.read_config_file(config_file)
88 self.config = config
89 self.config["ro_config"] = {
90 "ng": config["RO"].get("ng", False),
91 "uri": config["RO"].get("uri"),
92 "tenant": config.get("tenant", "osm"),
93 "logger_name": "lcm.roclient",
94 "loglevel": config["RO"].get("loglevel", "ERROR"),
95 }
96 if not self.config["ro_config"]["uri"]:
97 self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
98 elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
99 # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
100 index = self.config["ro_config"]["uri"][-1].rfind("/")
101 self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
102
103 self.loop = loop or asyncio.get_event_loop()
104 self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
105
106 # logging
107 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
108 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
109 config["database"]["logger_name"] = "lcm.db"
110 config["storage"]["logger_name"] = "lcm.fs"
111 config["message"]["logger_name"] = "lcm.msg"
112 if config["global"].get("logfile"):
113 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
114 maxBytes=100e6, backupCount=9, delay=0)
115 file_handler.setFormatter(log_formatter_simple)
116 self.logger.addHandler(file_handler)
117 if not config["global"].get("nologging"):
118 str_handler = logging.StreamHandler()
119 str_handler.setFormatter(log_formatter_simple)
120 self.logger.addHandler(str_handler)
121
122 if config["global"].get("loglevel"):
123 self.logger.setLevel(config["global"]["loglevel"])
124
125 # logging other modules
126 for k1, logname in self.cfg_logger_name.items():
127 config[k1]["logger_name"] = logname
128 logger_module = logging.getLogger(logname)
129 if config[k1].get("logfile"):
130 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
131 maxBytes=100e6, backupCount=9, delay=0)
132 file_handler.setFormatter(log_formatter_simple)
133 logger_module.addHandler(file_handler)
134 if config[k1].get("loglevel"):
135 logger_module.setLevel(config[k1]["loglevel"])
136 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
137
138 # check version of N2VC
139 # TODO enhance with int conversion or from distutils.version import LooseVersion
140 # or with list(map(int, version.split(".")))
141 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
142 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
143 n2vc_version, min_n2vc_version))
144 # check version of common
145 if versiontuple(common_version) < versiontuple(min_common_version):
146 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
147 common_version, min_common_version))
148
149 try:
150 # TODO check database version
151 if config["database"]["driver"] == "mongo":
152 self.db = dbmongo.DbMongo()
153 self.db.db_connect(config["database"])
154 elif config["database"]["driver"] == "memory":
155 self.db = dbmemory.DbMemory()
156 self.db.db_connect(config["database"])
157 else:
158 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
159 config["database"]["driver"]))
160
161 if config["storage"]["driver"] == "local":
162 self.fs = fslocal.FsLocal()
163 self.fs.fs_connect(config["storage"])
164 elif config["storage"]["driver"] == "mongo":
165 self.fs = fsmongo.FsMongo()
166 self.fs.fs_connect(config["storage"])
167 else:
168 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
169 config["storage"]["driver"]))
170
171 # copy message configuration in order to remove 'group_id' for msg_admin
172 config_message = config["message"].copy()
173 config_message["loop"] = self.loop
174 if config_message["driver"] == "local":
175 self.msg = msglocal.MsgLocal()
176 self.msg.connect(config_message)
177 self.msg_admin = msglocal.MsgLocal()
178 config_message.pop("group_id", None)
179 self.msg_admin.connect(config_message)
180 elif config_message["driver"] == "kafka":
181 self.msg = msgkafka.MsgKafka()
182 self.msg.connect(config_message)
183 self.msg_admin = msgkafka.MsgKafka()
184 config_message.pop("group_id", None)
185 self.msg_admin.connect(config_message)
186 else:
187 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
188 config["message"]["driver"]))
189 except (DbException, FsException, MsgException) as e:
190 self.logger.critical(str(e), exc_info=True)
191 raise LcmException(str(e))
192
193 # contains created tasks/futures to be able to cancel
194 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
195
196 if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
197 if self.config["tsdb"]["driver"] == "prometheus":
198 self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.db, self.loop)
199 else:
200 raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
201 config["tsdb"]["driver"]))
202 else:
203 self.prometheus = None
204
205 async def check_RO_version(self):
206 tries = 14
207 last_error = None
208 while True:
209 ro_uri = self.config["ro_config"]["uri"]
210 try:
211 # try new RO, if fail old RO
212 try:
213 self.config["ro_config"]["uri"] = ro_uri + "ro"
214 ro_server = NgRoClient(self.loop, **self.config["ro_config"])
215 ro_version = await ro_server.get_version()
216 self.config["ro_config"]["ng"] = True
217 except Exception:
218 self.config["ro_config"]["uri"] = ro_uri + "openmano"
219 ro_server = ROClient(self.loop, **self.config["ro_config"])
220 ro_version = await ro_server.get_version()
221 self.config["ro_config"]["ng"] = False
222 if versiontuple(ro_version) < versiontuple(min_RO_version):
223 raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
224 ro_version, min_RO_version))
225 self.logger.info("Connected to RO version {} new-generation version {}".
226 format(ro_version, self.config["ro_config"]["ng"]))
227 return
228 except (ROClientException, NgRoException) as e:
229 self.config["ro_config"]["uri"] = ro_uri
230 tries -= 1
231 error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
232 if tries <= 0:
233 self.logger.critical(error_text)
234 raise LcmException(error_text)
235 if last_error != error_text:
236 last_error = error_text
237 self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
238 await asyncio.sleep(5)
239
240 async def test(self, param=None):
241 self.logger.debug("Starting/Ending test task: {}".format(param))
242
243 async def kafka_ping(self):
244 self.logger.debug("Task kafka_ping Enter")
245 consecutive_errors = 0
246 first_start = True
247 kafka_has_received = False
248 self.pings_not_received = 1
249 while True:
250 try:
251 await self.msg_admin.aiowrite(
252 "admin", "ping",
253 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
254 self.loop)
255 # time between pings are low when it is not received and at starting
256 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
257 if not self.pings_not_received:
258 kafka_has_received = True
259 self.pings_not_received += 1
260 await asyncio.sleep(wait_time, loop=self.loop)
261 if self.pings_not_received > 10:
262 raise LcmException("It is not receiving pings from Kafka bus")
263 consecutive_errors = 0
264 first_start = False
265 except LcmException:
266 raise
267 except Exception as e:
268 # if not first_start is the first time after starting. So leave more time and wait
269 # to allow kafka starts
270 if consecutive_errors == 8 if not first_start else 30:
271 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
272 raise
273 consecutive_errors += 1
274 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
275 wait_time = 2 if not first_start else 5
276 await asyncio.sleep(wait_time, loop=self.loop)
277
278 def kafka_read_callback(self, topic, command, params):
279 order_id = 1
280
281 if topic != "admin" and command != "ping":
282 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
283 self.consecutive_errors = 0
284 self.first_start = False
285 order_id += 1
286 if command == "exit":
287 raise LcmExceptionExit
288 elif command.startswith("#"):
289 return
290 elif command == "echo":
291 # just for test
292 print(params)
293 sys.stdout.flush()
294 return
295 elif command == "test":
296 asyncio.Task(self.test(params), loop=self.loop)
297 return
298
299 if topic == "admin":
300 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
301 if params.get("worker_id") != self.worker_id:
302 return
303 self.pings_not_received = 0
304 try:
305 with open(health_check_file, "w") as f:
306 f.write(str(time()))
307 except Exception as e:
308 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
309 return
310 elif topic == "pla":
311 if command == "placement":
312 self.ns.update_nsrs_with_pla_result(params)
313 return
314 elif topic == "k8scluster":
315 if command == "create" or command == "created":
316 k8scluster_id = params.get("_id")
317 task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
318 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
319 return
320 elif command == "delete" or command == "deleted":
321 k8scluster_id = params.get("_id")
322 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
323 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
324 return
325 elif topic == "k8srepo":
326 if command == "create" or command == "created":
327 k8srepo_id = params.get("_id")
328 self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
329 task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
330 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
331 return
332 elif command == "delete" or command == "deleted":
333 k8srepo_id = params.get("_id")
334 task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
335 self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
336 return
337 elif topic == "ns":
338 if command == "instantiate":
339 # self.logger.debug("Deploying NS {}".format(nsr_id))
340 nslcmop = params
341 nslcmop_id = nslcmop["_id"]
342 nsr_id = nslcmop["nsInstanceId"]
343 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
344 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
345 return
346 elif command == "terminate":
347 # self.logger.debug("Deleting NS {}".format(nsr_id))
348 nslcmop = params
349 nslcmop_id = nslcmop["_id"]
350 nsr_id = nslcmop["nsInstanceId"]
351 self.lcm_tasks.cancel(topic, nsr_id)
352 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
353 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
354 return
355 elif command == "action":
356 # self.logger.debug("Update NS {}".format(nsr_id))
357 nslcmop = params
358 nslcmop_id = nslcmop["_id"]
359 nsr_id = nslcmop["nsInstanceId"]
360 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
361 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
362 return
363 elif command == "scale":
364 # self.logger.debug("Update NS {}".format(nsr_id))
365 nslcmop = params
366 nslcmop_id = nslcmop["_id"]
367 nsr_id = nslcmop["nsInstanceId"]
368 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
369 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
370 return
371 elif command == "show":
372 nsr_id = params
373 try:
374 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
375 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
376 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
377 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
378 db_nsr["detailed-status"],
379 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
380 except Exception as e:
381 print("nsr {} not found: {}".format(nsr_id, e))
382 sys.stdout.flush()
383 return
384 elif command == "deleted":
385 return # TODO cleaning of task just in case should be done
386 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
387 return
388 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
389 if command == "instantiate":
390 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
391 nsilcmop = params
392 nsilcmop_id = nsilcmop["_id"] # slice operation id
393 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
394 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
395 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
396 return
397 elif command == "terminate":
398 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
399 nsilcmop = params
400 nsilcmop_id = nsilcmop["_id"] # slice operation id
401 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
402 self.lcm_tasks.cancel(topic, nsir_id)
403 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
404 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
405 return
406 elif command == "show":
407 nsir_id = params
408 try:
409 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
410 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
411 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
412 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
413 db_nsir["detailed-status"],
414 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
415 except Exception as e:
416 print("nsir {} not found: {}".format(nsir_id, e))
417 sys.stdout.flush()
418 return
419 elif command == "deleted":
420 return # TODO cleaning of task just in case should be done
421 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
422 return
423 elif topic == "vim_account":
424 vim_id = params["_id"]
425 if command in ("create", "created"):
426 if not self.config["ro_config"].get("ng"):
427 task = asyncio.ensure_future(self.vim.create(params, order_id))
428 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
429 return
430 elif command == "delete" or command == "deleted":
431 self.lcm_tasks.cancel(topic, vim_id)
432 task = asyncio.ensure_future(self.vim.delete(params, order_id))
433 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
434 return
435 elif command == "show":
436 print("not implemented show with vim_account")
437 sys.stdout.flush()
438 return
439 elif command in ("edit", "edited"):
440 if not self.config["ro_config"].get("ng"):
441 task = asyncio.ensure_future(self.vim.edit(params, order_id))
442 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
443 return
444 elif command == "deleted":
445 return # TODO cleaning of task just in case should be done
446 elif topic == "wim_account":
447 wim_id = params["_id"]
448 if command in ("create", "created"):
449 if not self.config["ro_config"].get("ng"):
450 task = asyncio.ensure_future(self.wim.create(params, order_id))
451 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
452 return
453 elif command == "delete" or command == "deleted":
454 self.lcm_tasks.cancel(topic, wim_id)
455 task = asyncio.ensure_future(self.wim.delete(params, order_id))
456 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
457 return
458 elif command == "show":
459 print("not implemented show with wim_account")
460 sys.stdout.flush()
461 return
462 elif command in ("edit", "edited"):
463 task = asyncio.ensure_future(self.wim.edit(params, order_id))
464 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
465 return
466 elif command == "deleted":
467 return # TODO cleaning of task just in case should be done
468 elif topic == "sdn":
469 _sdn_id = params["_id"]
470 if command in ("create", "created"):
471 if not self.config["ro_config"].get("ng"):
472 task = asyncio.ensure_future(self.sdn.create(params, order_id))
473 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
474 return
475 elif command == "delete" or command == "deleted":
476 self.lcm_tasks.cancel(topic, _sdn_id)
477 task = asyncio.ensure_future(self.sdn.delete(params, order_id))
478 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
479 return
480 elif command in ("edit", "edited"):
481 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
482 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
483 return
484 elif command == "deleted":
485 return # TODO cleaning of task just in case should be done
486 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
487
488 async def kafka_read(self):
489 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
490 # future = asyncio.Future()
491 self.consecutive_errors = 0
492 self.first_start = True
493 while self.consecutive_errors < 10:
494 try:
495 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
496 topics_admin = ("admin", )
497 await asyncio.gather(
498 self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
499 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
500 )
501
502 except LcmExceptionExit:
503 self.logger.debug("Bye!")
504 break
505 except Exception as e:
506 # if not first_start is the first time after starting. So leave more time and wait
507 # to allow kafka starts
508 if self.consecutive_errors == 8 if not self.first_start else 30:
509 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
510 raise
511 self.consecutive_errors += 1
512 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
513 wait_time = 2 if not self.first_start else 5
514 await asyncio.sleep(wait_time, loop=self.loop)
515
516 # self.logger.debug("Task kafka_read terminating")
517 self.logger.debug("Task kafka_read exit")
518
519 def start(self):
520
521 # check RO version
522 self.loop.run_until_complete(self.check_RO_version())
523
524 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
525 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
526 self.ns)
527 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
528 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
529 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
530 self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
531 self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
532
533 # configure tsdb prometheus
534 if self.prometheus:
535 self.loop.run_until_complete(self.prometheus.start())
536
537 self.loop.run_until_complete(asyncio.gather(
538 self.kafka_read(),
539 self.kafka_ping()
540 ))
541 # TODO
542 # self.logger.debug("Terminating cancelling creation tasks")
543 # self.lcm_tasks.cancel("ALL", "create")
544 # timeout = 200
545 # while self.is_pending_tasks():
546 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
547 # await asyncio.sleep(2, loop=self.loop)
548 # timeout -= 2
549 # if not timeout:
550 # self.lcm_tasks.cancel("ALL", "ALL")
551 self.loop.close()
552 self.loop = None
553 if self.db:
554 self.db.db_disconnect()
555 if self.msg:
556 self.msg.disconnect()
557 if self.msg_admin:
558 self.msg_admin.disconnect()
559 if self.fs:
560 self.fs.fs_disconnect()
561
562 def read_config_file(self, config_file):
563 # TODO make a [ini] + yaml inside parser
564 # the configparser library is not suitable, because it does not admit comments at the end of line,
565 # and not parse integer or boolean
566 try:
567 # read file as yaml format
568 with open(config_file) as f:
569 conf = yaml.load(f, Loader=yaml.Loader)
570 # Ensure all sections are not empty
571 for k in ("global", "timeout", "RO", "VCA", "database", "storage", "message"):
572 if not conf.get(k):
573 conf[k] = {}
574
575 # read all environ that starts with OSMLCM_
576 for k, v in environ.items():
577 if not k.startswith("OSMLCM_"):
578 continue
579 subject, _, item = k[7:].lower().partition("_")
580 if not item:
581 continue
582 if subject in ("ro", "vca"):
583 # put in capital letter
584 subject = subject.upper()
585 try:
586 if item == "port" or subject == "timeout":
587 conf[subject][item] = int(v)
588 else:
589 conf[subject][item] = v
590 except Exception as e:
591 self.logger.warning("skipping environ '{}' on exception '{}'".format(k, e))
592
593 # backward compatibility of VCA parameters
594
595 if 'pubkey' in conf["VCA"]:
596 conf["VCA"]['public_key'] = conf["VCA"].pop('pubkey')
597 if 'cacert' in conf["VCA"]:
598 conf["VCA"]['ca_cert'] = conf["VCA"].pop('cacert')
599 if 'apiproxy' in conf["VCA"]:
600 conf["VCA"]['api_proxy'] = conf["VCA"].pop('apiproxy')
601
602 if 'enableosupgrade' in conf["VCA"]:
603 conf["VCA"]['enable_os_upgrade'] = conf["VCA"].pop('enableosupgrade')
604 if isinstance(conf["VCA"].get('enable_os_upgrade'), str):
605 if conf["VCA"]['enable_os_upgrade'].lower() == 'false':
606 conf["VCA"]['enable_os_upgrade'] = False
607 elif conf["VCA"]['enable_os_upgrade'].lower() == 'true':
608 conf["VCA"]['enable_os_upgrade'] = True
609
610 if 'aptmirror' in conf["VCA"]:
611 conf["VCA"]['apt_mirror'] = conf["VCA"].pop('aptmirror')
612
613 return conf
614 except Exception as e:
615 self.logger.critical("At config file '{}': {}".format(config_file, e))
616 exit(1)
617
618 @staticmethod
619 def get_process_id():
620 """
621 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
622 will provide a random one
623 :return: Obtained ID
624 """
625 # Try getting docker id. If fails, get pid
626 try:
627 with open("/proc/self/cgroup", "r") as f:
628 text_id_ = f.readline()
629 _, _, text_id = text_id_.rpartition("/")
630 text_id = text_id.replace('\n', '')[:12]
631 if text_id:
632 return text_id
633 except Exception:
634 pass
635 # Return a random id
636 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
637
638
639 def usage():
640 print("""Usage: {} [options]
641 -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg)
642 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
643 -h|--help: shows this help
644 """.format(sys.argv[0]))
645 # --log-socket-host HOST: send logs to this host")
646 # --log-socket-port PORT: send logs using this port (default: 9022)")
647
648
649 if __name__ == '__main__':
650
651 try:
652 # print("SYS.PATH='{}'".format(sys.path))
653 # load parameters and configuration
654 # -h
655 # -c value
656 # --config value
657 # --help
658 # --health-check
659 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
660 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
661 config_file = None
662 for o, a in opts:
663 if o in ("-h", "--help"):
664 usage()
665 sys.exit()
666 elif o in ("-c", "--config"):
667 config_file = a
668 elif o == "--health-check":
669 from osm_lcm.lcm_hc import health_check
670 health_check(health_check_file, Lcm.ping_interval_pace)
671 # elif o == "--log-socket-port":
672 # log_socket_port = a
673 # elif o == "--log-socket-host":
674 # log_socket_host = a
675 # elif o == "--log-file":
676 # log_file = a
677 else:
678 assert False, "Unhandled option"
679
680 if config_file:
681 if not path.isfile(config_file):
682 print("configuration file '{}' does not exist".format(config_file), file=sys.stderr)
683 exit(1)
684 else:
685 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
686 if path.isfile(config_file):
687 break
688 else:
689 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
690 exit(1)
691 lcm = Lcm(config_file)
692 lcm.start()
693 except (LcmException, getopt.GetoptError) as e:
694 print(str(e), file=sys.stderr)
695 # usage()
696 exit(1)