Prepare LCM tasks for HA
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2018 Telefonica S.A.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 ##
19
20 import asyncio
21 import yaml
22 import logging
23 import logging.handlers
24 import getopt
25 import sys
26 import ROclient
27 import ns
28 import vim_sdn
29 import netslice
30 from time import time, sleep
31 from lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
32
33 # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient
34 from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka
35 from osm_common import version as common_version
36 from osm_common.dbbase import DbException
37 from osm_common.fsbase import FsException
38 from osm_common.msgbase import MsgException
39 from os import environ, path
40 from random import choice as random_choice
41 from n2vc import version as n2vc_version
42
43
44 __author__ = "Alfonso Tierno"
45 min_RO_version = [0, 6, 3]
46 min_n2vc_version = "0.0.2"
47 min_common_version = "0.1.19"
48 # uncomment if LCM is installed as library and installed, and get them from __init__.py
49 lcm_version = '0.1.41'
50 lcm_version_date = '2019-06-19'
51 health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file
52
53
54 class Lcm:
55
56 ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
57 ping_interval_boot = 5 # how many time ping is sent when booting
58
59 def __init__(self, config_file, loop=None):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65
66 self.db = None
67 self.msg = None
68 self.msg_admin = None
69 self.fs = None
70 self.pings_not_received = 1
71 self.consecutive_errors = 0
72 self.first_start = False
73
74 # logging
75 self.logger = logging.getLogger('lcm')
76 # get id
77 self.worker_id = self.get_process_id()
78 # load configuration
79 config = self.read_config_file(config_file)
80 self.config = config
81 self.ro_config = {
82 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
83 "tenant": config.get("tenant", "osm"),
84 "logger_name": "lcm.ROclient",
85 "loglevel": "ERROR",
86 }
87
88 self.vca_config = config["VCA"]
89
90 self.loop = loop or asyncio.get_event_loop()
91
92 # logging
93 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
94 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
95 config["database"]["logger_name"] = "lcm.db"
96 config["storage"]["logger_name"] = "lcm.fs"
97 config["message"]["logger_name"] = "lcm.msg"
98 if config["global"].get("logfile"):
99 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
100 maxBytes=100e6, backupCount=9, delay=0)
101 file_handler.setFormatter(log_formatter_simple)
102 self.logger.addHandler(file_handler)
103 if not config["global"].get("nologging"):
104 str_handler = logging.StreamHandler()
105 str_handler.setFormatter(log_formatter_simple)
106 self.logger.addHandler(str_handler)
107
108 if config["global"].get("loglevel"):
109 self.logger.setLevel(config["global"]["loglevel"])
110
111 # logging other modules
112 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
113 config[k1]["logger_name"] = logname
114 logger_module = logging.getLogger(logname)
115 if config[k1].get("logfile"):
116 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
117 maxBytes=100e6, backupCount=9, delay=0)
118 file_handler.setFormatter(log_formatter_simple)
119 logger_module.addHandler(file_handler)
120 if config[k1].get("loglevel"):
121 logger_module.setLevel(config[k1]["loglevel"])
122 self.logger.critical("starting osm/lcm version {} {}".format(lcm_version, lcm_version_date))
123
124 # check version of N2VC
125 # TODO enhance with int conversion or from distutils.version import LooseVersion
126 # or with list(map(int, version.split(".")))
127 if versiontuple(n2vc_version) < versiontuple(min_n2vc_version):
128 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
129 n2vc_version, min_n2vc_version))
130 # check version of common
131 if versiontuple(common_version) < versiontuple(min_common_version):
132 raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
133 common_version, min_common_version))
134
135 try:
136 # TODO check database version
137 if config["database"]["driver"] == "mongo":
138 self.db = dbmongo.DbMongo()
139 self.db.db_connect(config["database"])
140 elif config["database"]["driver"] == "memory":
141 self.db = dbmemory.DbMemory()
142 self.db.db_connect(config["database"])
143 else:
144 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
145 config["database"]["driver"]))
146
147 if config["storage"]["driver"] == "local":
148 self.fs = fslocal.FsLocal()
149 self.fs.fs_connect(config["storage"])
150 else:
151 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
152 config["storage"]["driver"]))
153
154 config_message = config["message"].copy()
155 config_message["loop"] = self.loop
156 if config_message["driver"] == "local":
157 self.msg = msglocal.MsgLocal()
158 self.msg.connect(config_message)
159 self.msg_admin = msglocal.MsgLocal()
160 config_message.pop("group_id", None)
161 self.msg_admin.connect(config_message)
162 elif config_message["driver"] == "kafka":
163 self.msg = msgkafka.MsgKafka()
164 self.msg.connect(config_message)
165 self.msg_admin = msgkafka.MsgKafka()
166 config_message.pop("group_id", None)
167 self.msg_admin.connect(config_message)
168 else:
169 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
170 config["message"]["driver"]))
171 except (DbException, FsException, MsgException) as e:
172 self.logger.critical(str(e), exc_info=True)
173 raise LcmException(str(e))
174
175 # contains created tasks/futures to be able to cancel
176 self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
177
178 self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
179 self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
180 self.vca_config, self.loop)
181 self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
182 self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
183 self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
184
185 async def check_RO_version(self):
186 try:
187 RO = ROclient.ROClient(self.loop, **self.ro_config)
188 RO_version = await RO.get_version()
189 if RO_version < min_RO_version:
190 raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format(
191 *RO_version, *min_RO_version
192 ))
193 except ROclient.ROClientException as e:
194 error_text = "Error while conneting to osm/RO " + str(e)
195 self.logger.critical(error_text, exc_info=True)
196 raise LcmException(error_text)
197
198 async def test(self, param=None):
199 self.logger.debug("Starting/Ending test task: {}".format(param))
200
201 async def kafka_ping(self):
202 self.logger.debug("Task kafka_ping Enter")
203 consecutive_errors = 0
204 first_start = True
205 kafka_has_received = False
206 self.pings_not_received = 1
207 while True:
208 try:
209 await self.msg_admin.aiowrite(
210 "admin", "ping",
211 {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version},
212 self.loop)
213 # time between pings are low when it is not received and at starting
214 wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
215 if not self.pings_not_received:
216 kafka_has_received = True
217 self.pings_not_received += 1
218 await asyncio.sleep(wait_time, loop=self.loop)
219 if self.pings_not_received > 10:
220 raise LcmException("It is not receiving pings from Kafka bus")
221 consecutive_errors = 0
222 first_start = False
223 except LcmException:
224 raise
225 except Exception as e:
226 # if not first_start is the first time after starting. So leave more time and wait
227 # to allow kafka starts
228 if consecutive_errors == 8 if not first_start else 30:
229 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
230 raise
231 consecutive_errors += 1
232 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
233 wait_time = 2 if not first_start else 5
234 await asyncio.sleep(wait_time, loop=self.loop)
235
236 def kafka_read_callback(self, topic, command, params):
237 order_id = 1
238
239 if topic != "admin" and command != "ping":
240 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
241 self.consecutive_errors = 0
242 self.first_start = False
243 order_id += 1
244 if command == "exit":
245 raise LcmExceptionExit
246 elif command.startswith("#"):
247 return
248 elif command == "echo":
249 # just for test
250 print(params)
251 sys.stdout.flush()
252 return
253 elif command == "test":
254 asyncio.Task(self.test(params), loop=self.loop)
255 return
256
257 if topic == "admin":
258 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
259 if params.get("worker_id") != self.worker_id:
260 return
261 self.pings_not_received = 0
262 try:
263 with open(health_check_file, "w") as f:
264 f.write(str(time()))
265 except Exception as e:
266 self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
267 return
268 elif topic == "ns":
269 if command == "instantiate":
270 # self.logger.debug("Deploying NS {}".format(nsr_id))
271 nslcmop = params
272 nslcmop_id = nslcmop["_id"]
273 nsr_id = nslcmop["nsInstanceId"]
274 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
275 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
276 return
277 elif command == "terminate":
278 # self.logger.debug("Deleting NS {}".format(nsr_id))
279 nslcmop = params
280 nslcmop_id = nslcmop["_id"]
281 nsr_id = nslcmop["nsInstanceId"]
282 self.lcm_tasks.cancel(topic, nsr_id)
283 task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
284 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
285 return
286 elif command == "action":
287 # self.logger.debug("Update NS {}".format(nsr_id))
288 nslcmop = params
289 nslcmop_id = nslcmop["_id"]
290 nsr_id = nslcmop["nsInstanceId"]
291 task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
292 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
293 return
294 elif command == "scale":
295 # self.logger.debug("Update NS {}".format(nsr_id))
296 nslcmop = params
297 nslcmop_id = nslcmop["_id"]
298 nsr_id = nslcmop["nsInstanceId"]
299 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
300 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
301 return
302 elif command == "show":
303 nsr_id = params
304 try:
305 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
306 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
307 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
308 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
309 db_nsr["detailed-status"],
310 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
311 except Exception as e:
312 print("nsr {} not found: {}".format(nsr_id, e))
313 sys.stdout.flush()
314 return
315 elif command == "deleted":
316 return # TODO cleaning of task just in case should be done
317 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
318 return
319 elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
320 if command == "instantiate":
321 # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
322 nsilcmop = params
323 nsilcmop_id = nsilcmop["_id"] # slice operation id
324 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
325 task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
326 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
327 return
328 elif command == "terminate":
329 # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
330 nsilcmop = params
331 nsilcmop_id = nsilcmop["_id"] # slice operation id
332 nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
333 self.lcm_tasks.cancel(topic, nsir_id)
334 task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
335 self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
336 return
337 elif command == "show":
338 nsir_id = params
339 try:
340 db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
341 print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
342 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
343 "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
344 db_nsir["detailed-status"],
345 db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
346 except Exception as e:
347 print("nsir {} not found: {}".format(nsir_id, e))
348 sys.stdout.flush()
349 return
350 elif command == "deleted":
351 return # TODO cleaning of task just in case should be done
352 elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
353 return
354 elif topic == "vim_account":
355 vim_id = params["_id"]
356 if command == "create":
357 task = asyncio.ensure_future(self.vim.create(params, order_id))
358 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
359 return
360 elif command == "delete":
361 self.lcm_tasks.cancel(topic, vim_id)
362 task = asyncio.ensure_future(self.vim.delete(vim_id, order_id))
363 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
364 return
365 elif command == "show":
366 print("not implemented show with vim_account")
367 sys.stdout.flush()
368 return
369 elif command == "edit":
370 task = asyncio.ensure_future(self.vim.edit(params, order_id))
371 self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
372 return
373 elif topic == "wim_account":
374 wim_id = params["_id"]
375 if command == "create":
376 task = asyncio.ensure_future(self.wim.create(params, order_id))
377 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
378 return
379 elif command == "delete":
380 self.lcm_tasks.cancel(topic, wim_id)
381 task = asyncio.ensure_future(self.wim.delete(wim_id, order_id))
382 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
383 return
384 elif command == "show":
385 print("not implemented show with wim_account")
386 sys.stdout.flush()
387 return
388 elif command == "edit":
389 task = asyncio.ensure_future(self.wim.edit(params, order_id))
390 self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
391 return
392 elif topic == "sdn":
393 _sdn_id = params["_id"]
394 if command == "create":
395 task = asyncio.ensure_future(self.sdn.create(params, order_id))
396 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
397 return
398 elif command == "delete":
399 self.lcm_tasks.cancel(topic, _sdn_id)
400 task = asyncio.ensure_future(self.sdn.delete(_sdn_id, order_id))
401 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
402 return
403 elif command == "edit":
404 task = asyncio.ensure_future(self.sdn.edit(params, order_id))
405 self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
406 return
407 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
408
409 async def kafka_read(self):
410 self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id))
411 # future = asyncio.Future()
412 self.consecutive_errors = 0
413 self.first_start = True
414 while self.consecutive_errors < 10:
415 try:
416 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi")
417 topics_admin = ("admin", )
418 await asyncio.gather(
419 self.msg.aioread(topics, self.loop, self.kafka_read_callback),
420 self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
421 )
422
423 except LcmExceptionExit:
424 self.logger.debug("Bye!")
425 break
426 except Exception as e:
427 # if not first_start is the first time after starting. So leave more time and wait
428 # to allow kafka starts
429 if self.consecutive_errors == 8 if not self.first_start else 30:
430 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
431 raise
432 self.consecutive_errors += 1
433 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
434 wait_time = 2 if not self.first_start else 5
435 await asyncio.sleep(wait_time, loop=self.loop)
436
437 # self.logger.debug("Task kafka_read terminating")
438 self.logger.debug("Task kafka_read exit")
439
440 def start(self):
441
442 # check RO version
443 self.loop.run_until_complete(self.check_RO_version())
444
445 self.loop.run_until_complete(asyncio.gather(
446 self.kafka_read(),
447 self.kafka_ping()
448 ))
449 # TODO
450 # self.logger.debug("Terminating cancelling creation tasks")
451 # self.lcm_tasks.cancel("ALL", "create")
452 # timeout = 200
453 # while self.is_pending_tasks():
454 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
455 # await asyncio.sleep(2, loop=self.loop)
456 # timeout -= 2
457 # if not timeout:
458 # self.lcm_tasks.cancel("ALL", "ALL")
459 self.loop.close()
460 self.loop = None
461 if self.db:
462 self.db.db_disconnect()
463 if self.msg:
464 self.msg.disconnect()
465 if self.msg_admin:
466 self.msg_admin.disconnect()
467 if self.fs:
468 self.fs.fs_disconnect()
469
470 def read_config_file(self, config_file):
471 # TODO make a [ini] + yaml inside parser
472 # the configparser library is not suitable, because it does not admit comments at the end of line,
473 # and not parse integer or boolean
474 try:
475 with open(config_file) as f:
476 conf = yaml.load(f)
477 for k, v in environ.items():
478 if not k.startswith("OSMLCM_"):
479 continue
480 k_items = k.lower().split("_")
481 if len(k_items) < 3:
482 continue
483 if k_items[1] in ("ro", "vca"):
484 # put in capital letter
485 k_items[1] = k_items[1].upper()
486 c = conf
487 try:
488 for k_item in k_items[1:-1]:
489 c = c[k_item]
490 if k_items[-1] == "port":
491 c[k_items[-1]] = int(v)
492 else:
493 c[k_items[-1]] = v
494 except Exception as e:
495 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
496
497 return conf
498 except Exception as e:
499 self.logger.critical("At config file '{}': {}".format(config_file, e))
500 exit(1)
501
502 @staticmethod
503 def get_process_id():
504 """
505 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
506 will provide a random one
507 :return: Obtained ID
508 """
509 # Try getting docker id. If fails, get pid
510 try:
511 with open("/proc/self/cgroup", "r") as f:
512 text_id_ = f.readline()
513 _, _, text_id = text_id_.rpartition("/")
514 text_id = text_id.replace('\n', '')[:12]
515 if text_id:
516 return text_id
517 except Exception:
518 pass
519 # Return a random id
520 return ''.join(random_choice("0123456789abcdef") for _ in range(12))
521
522
523 def usage():
524 print("""Usage: {} [options]
525 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
526 --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
527 -h|--help: shows this help
528 """.format(sys.argv[0]))
529 # --log-socket-host HOST: send logs to this host")
530 # --log-socket-port PORT: send logs using this port (default: 9022)")
531
532
533 def health_check():
534 retry = 2
535 while retry:
536 retry -= 1
537 try:
538 with open(health_check_file, "r") as f:
539 last_received_ping = f.read()
540
541 if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10:
542 exit(0)
543 except Exception:
544 pass
545 if retry:
546 sleep(6)
547 exit(1)
548
549
550 if __name__ == '__main__':
551 try:
552 # load parameters and configuration
553 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
554 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
555 config_file = None
556 for o, a in opts:
557 if o in ("-h", "--help"):
558 usage()
559 sys.exit()
560 elif o in ("-c", "--config"):
561 config_file = a
562 elif o == "--health-check":
563 health_check()
564 # elif o == "--log-socket-port":
565 # log_socket_port = a
566 # elif o == "--log-socket-host":
567 # log_socket_host = a
568 # elif o == "--log-file":
569 # log_file = a
570 else:
571 assert False, "Unhandled option"
572 if config_file:
573 if not path.isfile(config_file):
574 print("configuration file '{}' not exist".format(config_file), file=sys.stderr)
575 exit(1)
576 else:
577 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
578 if path.isfile(config_file):
579 break
580 else:
581 print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
582 exit(1)
583 lcm = Lcm(config_file)
584 lcm.start()
585 except (LcmException, getopt.GetoptError) as e:
586 print(str(e), file=sys.stderr)
587 # usage()
588 exit(1)