lightweight exception capturing, logging
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 from dbbase import DbException
15 from fsbase import FsException
16 from msgbase import MsgException
17 from os import environ
18 from vca import DeployApplication, RemoveApplication
19 from copy import deepcopy
20 from http import HTTPStatus
21
22 class LcmException(Exception):
23 pass
24
25
26 class Lcm:
27
28 def __init__(self, config_file):
29 """
30 Init, Connect to database, filesystem storage, and messaging
31 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
32 :return: None
33 """
34 # contains created tasks/futures to be able to cancel
35 self.lcm_tasks = {}
36 # logging
37 self.logger = logging.getLogger('lcm')
38 # load configuration
39 config = self.read_config_file(config_file)
40 self.config = config
41 self.ro_config={
42 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
43 "tenant": config.get("tenant", "osm"),
44 "logger_name": "lcm.ROclient",
45 "loglevel": "ERROR",
46 }
47 self.vca = config["VCA"] # TODO VCA
48 self.loop = None
49
50 # logging
51 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
52 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
53 config["database"]["logger_name"] = "lcm.db"
54 config["storage"]["logger_name"] = "lcm.fs"
55 config["message"]["logger_name"] = "lcm.msg"
56 if "logfile" in config["global"]:
57 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
58 maxBytes=100e6, backupCount=9, delay=0)
59 file_handler.setFormatter(log_formatter_simple)
60 self.logger.addHandler(file_handler)
61 else:
62 str_handler = logging.StreamHandler()
63 str_handler.setFormatter(log_formatter_simple)
64 self.logger.addHandler(str_handler)
65
66 if config["global"].get("loglevel"):
67 self.logger.setLevel(config["global"]["loglevel"])
68
69 # logging other modules
70 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
71 config[k1]["logger_name"] = logname
72 logger_module = logging.getLogger(logname)
73 if "logfile" in config[k1]:
74 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
75 maxBytes=100e6, backupCount=9, delay=0)
76 file_handler.setFormatter(log_formatter_simple)
77 logger_module.addHandler(file_handler)
78 if "loglevel" in config[k1]:
79 logger_module.setLevel(config[k1]["loglevel"])
80
81 try:
82 if config["database"]["driver"] == "mongo":
83 self.db = dbmongo.DbMongo()
84 self.db.db_connect(config["database"])
85 elif config["database"]["driver"] == "memory":
86 self.db = dbmemory.DbMemory()
87 self.db.db_connect(config["database"])
88 else:
89 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
90 config["database"]["driver"]))
91
92 if config["storage"]["driver"] == "local":
93 self.fs = fslocal.FsLocal()
94 self.fs.fs_connect(config["storage"])
95 else:
96 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
97 config["storage"]["driver"]))
98
99 if config["message"]["driver"] == "local":
100 self.msg = msglocal.MsgLocal()
101 self.msg.connect(config["message"])
102 elif config["message"]["driver"] == "kafka":
103 self.msg = msgkafka.MsgKafka()
104 self.msg.connect(config["message"])
105 else:
106 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
107 config["storage"]["driver"]))
108 except (DbException, FsException, MsgException) as e:
109 self.logger.critical(str(e), exc_info=True)
110 raise LcmException(str(e))
111
112 def update_nsr_db(self, nsr_id, nsr_desc):
113 try:
114 self.db.replace("nsrs", nsr_id, nsr_desc)
115 except DbException as e:
116 self.logger.error("Updating nsr_id={}: {}".format(nsr_id, e))
117
118 def vca_deploy_callback(self, db_nsr, vnf_index, status, task):
119 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
120 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
121 nsr_id = db_nsr["_id"]
122 self.logger.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id))
123 try:
124 if task.cancelled():
125 return
126 if task.done():
127 exc = task.exception()
128 if exc:
129 nsr_lcm = db_nsr["_admin"]["deploy"]
130 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
131 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
132 db_nsr["config-status"] = "failed"
133 self.update_nsr_db(nsr_id, db_nsr)
134 else:
135 # TODO may be used to be called when VCA monitor status changes
136 pass
137 # except DbException as e:
138 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
139 except Exception as e:
140 self.logger.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e), exc_info=True)
141
142 async def create_ns(self, nsr_id, order_id):
143 logging_text = "Task create_ns={} ".format(nsr_id)
144 self.logger.debug(logging_text + "Enter")
145 # get all needed from database
146 db_nsr = None
147 exc = None
148 step = "Getting nsr from db"
149 try:
150 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
151 nsd = db_nsr["nsd"]
152 needed_vnfd = {}
153 for c_vnf in nsd["constituent-vnfd"]:
154 vnfd_id = c_vnf["vnfd-id-ref"]
155 if vnfd_id not in needed_vnfd:
156 step = "Getting vnfd={} from db".format(vnfd_id)
157 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
158
159 nsr_lcm = {
160 "id": nsr_id,
161 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
162 "nsr_ip": {},
163 "VCA": {},
164 }
165 db_nsr["_admin"]["deploy"] = nsr_lcm
166 db_nsr["detailed-status"] = "creating"
167 db_nsr["operational-status"] = "init"
168
169 deloyment_timeout = 120
170
171 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
172
173 # get vnfds, instantiate at RO
174 for vnfd_id, vnfd in needed_vnfd.items():
175 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
176 self.logger.debug(logging_text + step)
177 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
178
179 # look if present
180 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
181 if vnfd_list:
182 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
183 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
184 vnfd_id, vnfd_list[0]["uuid"]))
185 else:
186 vnfd_RO = deepcopy(vnfd)
187 vnfd_RO.pop("_id", None)
188 vnfd_RO.pop("_admin", None)
189 vnfd_RO["id"] = vnfd_id_RO
190 desc = await RO.create("vnfd", descriptor=vnfd_RO)
191 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
192 self.update_nsr_db(nsr_id, db_nsr)
193
194 # create nsd at RO
195 nsd_id = nsd["id"]
196 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
197 self.logger.debug(logging_text + step)
198
199 nsd_id_RO = nsd_id + "." + nsd_id[:200]
200 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
201 if nsd_list:
202 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
203 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
204 nsd_id, nsd_list[0]["uuid"]))
205 else:
206 nsd_RO = deepcopy(nsd)
207 nsd_RO["id"] = nsd_id_RO
208 nsd_RO.pop("_id", None)
209 nsd_RO.pop("_admin", None)
210 for c_vnf in nsd_RO["constituent-vnfd"]:
211 vnfd_id = c_vnf["vnfd-id-ref"]
212 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
213 desc = await RO.create("nsd", descriptor=nsd_RO)
214 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
215 self.update_nsr_db(nsr_id, db_nsr)
216
217 # Crate ns at RO
218 # if present use it unless in error status
219 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
220 if RO_nsr_id:
221 try:
222 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
223 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
224 desc = await RO.show("ns", RO_nsr_id)
225 except ROclient.ROClientException as e:
226 if e.http_code != HTTPStatus.NOT_FOUND:
227 raise
228 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
229 if RO_nsr_id:
230 ns_status, ns_status_info = RO.check_ns_status(desc)
231 nsr_lcm["RO"]["nsr_status"] = ns_status
232 if ns_status == "ERROR":
233 step = db_nsr["detailed-status"] = "Deleting ns at RO"
234 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
235 await RO.delete("ns", RO_nsr_id)
236 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
237
238 if not RO_nsr_id:
239 step = db_nsr["detailed-status"] = "Creating ns at RO"
240 self.logger.debug(logging_text + step)
241
242 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
243 scenario=nsr_lcm["RO"]["nsd_id"])
244 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
245 nsr_lcm["RO"]["nsr_status"] = "BUILD"
246 self.update_nsr_db(nsr_id, db_nsr)
247
248 # wait until NS is ready
249 step = ns_status_detailed = "Waiting ns ready at RO"
250 db_nsr["detailed-status"] = ns_status_detailed
251 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
252 deloyment_timeout = 600
253 while deloyment_timeout > 0:
254 desc = await RO.show("ns", RO_nsr_id)
255 ns_status, ns_status_info = RO.check_ns_status(desc)
256 nsr_lcm["RO"]["nsr_status"] = ns_status
257 if ns_status == "ERROR":
258 raise ROclient.ROClientException(ns_status_info)
259 elif ns_status == "BUILD":
260 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
261 self.update_nsr_db(nsr_id, db_nsr)
262 elif ns_status == "ACTIVE":
263 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
264 break
265 else:
266 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
267
268 await asyncio.sleep(5, loop=self.loop)
269 deloyment_timeout -= 5
270 if deloyment_timeout <= 0:
271 raise ROclient.ROClientException("Timeout waiting ns to be ready")
272 db_nsr["detailed-status"] = "Configuring vnfr"
273 self.update_nsr_db(nsr_id, db_nsr)
274
275 vnfd_to_config = 0
276 step = "Looking for needed vnfd to configure"
277 self.logger.debug(logging_text + step)
278 for c_vnf in nsd["constituent-vnfd"]:
279 vnfd_id = c_vnf["vnfd-id-ref"]
280 vnf_index = str(c_vnf["member-vnf-index"])
281 vnfd = needed_vnfd[vnfd_id]
282 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
283 nsr_lcm["VCA"][vnf_index] = {}
284 vnfd_to_config += 1
285 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
286
287 # Note: The charm needs to exist on disk at the location
288 # specified by charm_path.
289 base_folder = vnfd["_admin"]["storage"]
290 charm_path = "{}{}/{}/charms/{}".format(
291 base_folder["path"],
292 base_folder["folder"],
293 base_folder["file"],
294 proxy_charm
295 )
296 task = asyncio.ensure_future(
297 DeployApplication(
298 self.config['VCA'],
299 self.db,
300 db_nsr,
301 vnfd,
302 vnf_index,
303 charm_path,
304 )
305 )
306 task.add_done_callback(functools.partial(self.vca_deploy_callback, db_nsr, vnf_index, None))
307 self.lcm_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
308 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
309 db_nsr["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config) if vnfd_to_config else "done"
310 db_nsr["operational-status"] = "running"
311 self.update_nsr_db(nsr_id, db_nsr)
312
313 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
314 return nsr_lcm
315
316 except (ROclient.ROClientException, DbException) as e:
317 self.logger.error(logging_text + "Exit Exception {}".format(e))
318 exc = e
319 except Exception as e:
320 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
321 exc = e
322 finally:
323 if exc and db_nsr:
324 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
325 db_nsr["operational-status"] = "failed"
326 self.update_nsr_db(nsr_id, db_nsr)
327
328 async def delete_ns(self, nsr_id, order_id):
329 logging_text = "Task delete_ns={} ".format(nsr_id)
330 self.logger.debug(logging_text + "Enter")
331 db_nsr = None
332 exc = None
333 step = "Getting nsr from db"
334 try:
335 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
336 nsd = db_nsr["nsd"]
337 nsr_lcm = db_nsr["_admin"]["deploy"]
338
339 db_nsr["operational-status"] = "terminate"
340 db_nsr["config-status"] = "terminate"
341 db_nsr["detailed-status"] = "Deleting charms"
342 self.update_nsr_db(nsr_id, db_nsr)
343
344 try:
345 step = db_nsr["detailed-status"] = "Deleting charms"
346 self.logger.debug(logging_text + step)
347 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
348 if deploy_info and deploy_info.get("appliation"):
349 task = asyncio.ensure_future(
350 RemoveApplication(
351 self.config['VCA'],
352 self.db,
353 db_nsr,
354 vnf_index,
355 )
356 )
357 self.lcm_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
358 except Exception as e:
359 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
360 # remove from RO
361
362 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
363 # Delete ns
364 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
365 if RO_nsr_id:
366 try:
367 step = db_nsr["detailed-status"] = "Deleting ns at RO"
368 self.logger.debug(logging_text + step)
369 desc = await RO.delete("ns", RO_nsr_id)
370 nsr_lcm["RO"]["nsr_id"] = None
371 nsr_lcm["RO"]["nsr_status"] = "DELETED"
372 except ROclient.ROClientException as e:
373 if e.http_code == 404: # not found
374 nsr_lcm["RO"]["nsr_id"] = None
375 nsr_lcm["RO"]["nsr_status"] = "DELETED"
376 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
377 elif e.http_code == 409: #conflict
378 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
379 else:
380 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
381 self.update_nsr_db(nsr_id, db_nsr)
382
383 # Delete nsd
384 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
385 if RO_nsd_id:
386 try:
387 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
388 desc = await RO.delete("nsd", RO_nsd_id)
389 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
390 nsr_lcm["RO"]["nsd_id"] = None
391 except ROclient.ROClientException as e:
392 if e.http_code == 404: # not found
393 nsr_lcm["RO"]["nsd_id"] = None
394 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
395 elif e.http_code == 409: #conflict
396 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
397 else:
398 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
399 self.update_nsr_db(nsr_id, db_nsr)
400
401 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
402 if not RO_vnfd_id:
403 continue
404 try:
405 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
406 desc = await RO.delete("vnfd", RO_vnfd_id)
407 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
408 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
409 except ROclient.ROClientException as e:
410 if e.http_code == 404: # not found
411 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
412 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
413 elif e.http_code == 409: #conflict
414 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
415 else:
416 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
417 self.update_nsr_db(nsr_id, db_nsr)
418
419 # TODO delete from database or mark as deleted???
420 db_nsr["operational-status"] = "terminated"
421 self.db.del_one("nsrs", {"_id": nsr_id})
422 self.logger.debug(logging_text + "Exit")
423
424 except (ROclient.ROClientException, DbException) as e:
425 self.logger.error(logging_text + "Exit Exception {}".format(e))
426 exc = e
427 except Exception as e:
428 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
429 exc = e
430 finally:
431 if exc and db_nsr:
432 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
433 db_nsr["operational-status"] = "failed"
434 self.update_nsr_db(nsr_id, db_nsr)
435
436 async def test(self, param=None):
437 self.logger.debug("Starting/Ending test task: {}".format(param))
438
439 def cancel_tasks(self, nsr_id):
440 """
441 Cancel all active tasks of a concrete nsr identified for nsr_id
442 :param nsr_id: nsr identity
443 :return: None, or raises an exception if not possible
444 """
445 if not self.lcm_tasks.get(nsr_id):
446 return
447 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
448 for task_name, task in tasks_set.items():
449 result = task.cancel()
450 if result:
451 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
452 self.lcm_tasks[nsr_id] = {}
453
454 async def read_kafka(self):
455 self.logger.debug("kafka task Enter")
456 order_id = 1
457 # future = asyncio.Future()
458
459 while True:
460 command, params = await self.msg.aioread("ns", self.loop)
461 order_id += 1
462 if command == "exit":
463 print("Bye!")
464 break
465 elif command.startswith("#"):
466 continue
467 elif command == "echo":
468 # just for test
469 print(params)
470 elif command == "test":
471 asyncio.Task(self.test(params), loop=self.loop)
472 elif command == "break":
473 print("put a break in this line of code")
474 elif command == "create":
475 nsr_id = params.strip()
476 self.logger.debug("Deploying NS {}".format(nsr_id))
477 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
478 if nsr_id not in self.lcm_tasks:
479 self.lcm_tasks[nsr_id] = {}
480 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
481 elif command == "delete":
482 nsr_id = params.strip()
483 self.logger.debug("Deleting NS {}".format(nsr_id))
484 self.cancel_tasks(nsr_id)
485 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
486 if nsr_id not in self.lcm_tasks:
487 self.lcm_tasks[nsr_id] = {}
488 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
489 elif command == "show":
490 # just for test
491 nsr_id = params.strip()
492 try:
493 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
494 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
495 "{}\n deploy: {}\n tasks: {}".format(
496 nsr_id, db_nsr["operational-status"],
497 db_nsr["config-status"], db_nsr["detailed-status"],
498 db_nsr["_admin"]["deploy"], self.lcm_tasks.get(nsr_id)))
499 except Exception as e:
500 print("nsr {} not found: {}".format(nsr_id, e))
501 else:
502 self.logger.critical("unknown command '{}'".format(command))
503 self.logger.debug("kafka task Exit")
504
505
506 def start(self):
507 self.loop = asyncio.get_event_loop()
508 self.loop.run_until_complete(self.read_kafka())
509 self.loop.close()
510 self.loop = None
511
512
513 def read_config_file(self, config_file):
514 # TODO make a [ini] + yaml inside parser
515 # the configparser library is not suitable, because it does not admit comments at the end of line,
516 # and not parse integer or boolean
517 try:
518 with open(config_file) as f:
519 conf = yaml.load(f)
520 for k, v in environ.items():
521 if not k.startswith("OSMLCM_"):
522 continue
523 k_items = k.lower().split("_")
524 c = conf
525 try:
526 for k_item in k_items[1:-1]:
527 if k_item in ("ro", "vca"):
528 # put in capital letter
529 k_item = k_item.upper()
530 c = c[k_item]
531 if k_items[-1] == "port":
532 c[k_items[-1]] = int(v)
533 else:
534 c[k_items[-1]] = v
535 except Exception as e:
536 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
537
538 return conf
539 except Exception as e:
540 self.logger.critical("At config file '{}': {}".format(config_file, e))
541
542
543 if __name__ == '__main__':
544
545 config_file = "lcm.cfg"
546 lcm = Lcm(config_file)
547
548 lcm.start()