6e0f8ab6f0d264fed33c0a5dc41210d9c1c27964
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 class LcmException(Exception):
30 pass
31
32
33 class Lcm:
34
35 def __init__(self, config_file):
36 """
37 Init, Connect to database, filesystem storage, and messaging
38 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
39 :return: None
40 """
41
42 self.db = None
43 self.msg = None
44 self.fs = None
45 self.pings_not_received = 1
46
47 # contains created tasks/futures to be able to cancel
48 self.lcm_ns_tasks = {}
49 self.lcm_vim_tasks = {}
50 self.lcm_sdn_tasks = {}
51 # logging
52 self.logger = logging.getLogger('lcm')
53 # load configuration
54 config = self.read_config_file(config_file)
55 self.config = config
56 self.ro_config={
57 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
58 "tenant": config.get("tenant", "osm"),
59 "logger_name": "lcm.ROclient",
60 "loglevel": "ERROR",
61 }
62
63 self.vca = config["VCA"] # TODO VCA
64 self.loop = None
65
66 # logging
67 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
68 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
69 config["database"]["logger_name"] = "lcm.db"
70 config["storage"]["logger_name"] = "lcm.fs"
71 config["message"]["logger_name"] = "lcm.msg"
72 if "logfile" in config["global"]:
73 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
74 maxBytes=100e6, backupCount=9, delay=0)
75 file_handler.setFormatter(log_formatter_simple)
76 self.logger.addHandler(file_handler)
77 else:
78 str_handler = logging.StreamHandler()
79 str_handler.setFormatter(log_formatter_simple)
80 self.logger.addHandler(str_handler)
81
82 if config["global"].get("loglevel"):
83 self.logger.setLevel(config["global"]["loglevel"])
84
85 # logging other modules
86 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
87 config[k1]["logger_name"] = logname
88 logger_module = logging.getLogger(logname)
89 if "logfile" in config[k1]:
90 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
91 maxBytes=100e6, backupCount=9, delay=0)
92 file_handler.setFormatter(log_formatter_simple)
93 logger_module.addHandler(file_handler)
94 if "loglevel" in config[k1]:
95 logger_module.setLevel(config[k1]["loglevel"])
96
97 self.n2vc = N2VC(
98 log=self.logger,
99 server=config['VCA']['host'],
100 port=config['VCA']['port'],
101 user=config['VCA']['user'],
102 secret=config['VCA']['secret'],
103 # TODO: This should point to the base folder where charms are stored,
104 # if there is a common one (like object storage). Otherwise, leave
105 # it unset and pass it via DeployCharms
106 # artifacts=config['VCA'][''],
107 artifacts=None,
108 )
109
110 try:
111 if config["database"]["driver"] == "mongo":
112 self.db = dbmongo.DbMongo()
113 self.db.db_connect(config["database"])
114 elif config["database"]["driver"] == "memory":
115 self.db = dbmemory.DbMemory()
116 self.db.db_connect(config["database"])
117 else:
118 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
119 config["database"]["driver"]))
120
121 if config["storage"]["driver"] == "local":
122 self.fs = fslocal.FsLocal()
123 self.fs.fs_connect(config["storage"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
126 config["storage"]["driver"]))
127
128 if config["message"]["driver"] == "local":
129 self.msg = msglocal.MsgLocal()
130 self.msg.connect(config["message"])
131 elif config["message"]["driver"] == "kafka":
132 self.msg = msgkafka.MsgKafka()
133 self.msg.connect(config["message"])
134 else:
135 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
136 config["storage"]["driver"]))
137 except (DbException, FsException, MsgException) as e:
138 self.logger.critical(str(e), exc_info=True)
139 raise LcmException(str(e))
140
141 def update_db(self, item, _id, _desc):
142 try:
143 self.db.replace(item, _id, _desc)
144 except DbException as e:
145 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
146
147 def update_db_2(self, item, _id, _desc):
148 try:
149 self.db.set_one(item, {"_id": _id}, _desc)
150 except DbException as e:
151 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
152
153 async def vim_create(self, vim_content, order_id):
154 vim_id = vim_content["_id"]
155 logging_text = "Task vim_create={} ".format(vim_id)
156 self.logger.debug(logging_text + "Enter")
157 db_vim = None
158 exc = None
159 try:
160 step = "Getting vim from db"
161 db_vim = self.db.get_one("vims", {"_id": vim_id})
162 if "_admin" not in db_vim:
163 db_vim["_admin"] = {}
164 if "deployed" not in db_vim["_admin"]:
165 db_vim["_admin"]["deployed"] = {}
166 db_vim["_admin"]["deployed"]["RO"] = None
167
168 step = "Creating vim at RO"
169 RO = ROclient.ROClient(self.loop, **self.ro_config)
170 vim_RO = deepcopy(vim_content)
171 vim_RO.pop("_id", None)
172 vim_RO.pop("_admin", None)
173 vim_RO.pop("schema_version", None)
174 vim_RO.pop("schema_type", None)
175 vim_RO.pop("vim_tenant_name", None)
176 vim_RO["type"] = vim_RO.pop("vim_type")
177 vim_RO.pop("vim_user", None)
178 vim_RO.pop("vim_password", None)
179 desc = await RO.create("vim", descriptor=vim_RO)
180 RO_vim_id = desc["uuid"]
181 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
182 self.update_db("vims", vim_id, db_vim)
183
184 step = "Attach vim to RO tenant"
185 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
186 "vim_username": vim_content["vim_user"],
187 "vim_password": vim_content["vim_password"],
188 "config": vim_content["config"]
189 }
190 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
191 db_vim["_admin"]["operationalState"] = "ENABLED"
192 self.update_db("vims", vim_id, db_vim)
193
194 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
195 return RO_vim_id
196
197 except (ROclient.ROClientException, DbException) as e:
198 self.logger.error(logging_text + "Exit Exception {}".format(e))
199 exc = e
200 except Exception as e:
201 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
202 exc = e
203 finally:
204 if exc and db_vim:
205 db_vim["_admin"]["operationalState"] = "ERROR"
206 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
207 self.update_db("vims", vim_id, db_vim)
208
209 async def vim_edit(self, vim_content, order_id):
210 vim_id = vim_content["_id"]
211 logging_text = "Task vim_edit={} ".format(vim_id)
212 self.logger.debug(logging_text + "Enter")
213 db_vim = None
214 exc = None
215 step = "Getting vim from db"
216 try:
217 db_vim = self.db.get_one("vims", {"_id": vim_id})
218 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
219 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
220 step = "Editing vim at RO"
221 RO = ROclient.ROClient(self.loop, **self.ro_config)
222 vim_RO = deepcopy(vim_content)
223 vim_RO.pop("_id", None)
224 vim_RO.pop("_admin", None)
225 vim_RO.pop("schema_version", None)
226 vim_RO.pop("schema_type", None)
227 vim_RO.pop("vim_tenant_name", None)
228 vim_RO["type"] = vim_RO.pop("vim_type")
229 vim_RO.pop("vim_user", None)
230 vim_RO.pop("vim_password", None)
231 if vim_RO:
232 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
233
234 step = "Editing vim-account at RO tenant"
235 vim_RO = {}
236 for k in ("vim_tenant_name", "vim_password", "config"):
237 if k in vim_content:
238 vim_RO[k] = vim_content[k]
239 if "vim_user" in vim_content:
240 vim_content["vim_username"] = vim_content["vim_user"]
241 if vim_RO:
242 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
243 db_vim["_admin"]["operationalState"] = "ENABLED"
244 self.update_db("vims", vim_id, db_vim)
245
246 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
247 return RO_vim_id
248
249 except (ROclient.ROClientException, DbException) as e:
250 self.logger.error(logging_text + "Exit Exception {}".format(e))
251 exc = e
252 except Exception as e:
253 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
254 exc = e
255 finally:
256 if exc and db_vim:
257 db_vim["_admin"]["operationalState"] = "ERROR"
258 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
259 self.update_db("vims", vim_id, db_vim)
260
261 async def vim_delete(self, vim_id, order_id):
262 logging_text = "Task vim_delete={} ".format(vim_id)
263 self.logger.debug(logging_text + "Enter")
264 db_vim = None
265 exc = None
266 step = "Getting vim from db"
267 try:
268 db_vim = self.db.get_one("vims", {"_id": vim_id})
269 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
270 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
271 RO = ROclient.ROClient(self.loop, **self.ro_config)
272 step = "Detaching vim from RO tenant"
273 try:
274 await RO.detach_datacenter(RO_vim_id)
275 except ROclient.ROClientException as e:
276 if e.http_code == 404: # not found
277 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
278 else:
279 raise
280
281 step = "Deleting vim from RO"
282 try:
283 await RO.delete("vim", RO_vim_id)
284 except ROclient.ROClientException as e:
285 if e.http_code == 404: # not found
286 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
287 else:
288 raise
289 else:
290 # nothing to delete
291 self.logger.error(logging_text + "Skipping. There is not RO information at database")
292 self.db.del_one("vims", {"_id": vim_id})
293 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
294 return None
295
296 except (ROclient.ROClientException, DbException) as e:
297 self.logger.error(logging_text + "Exit Exception {}".format(e))
298 exc = e
299 except Exception as e:
300 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
301 exc = e
302 finally:
303 if exc and db_vim:
304 db_vim["_admin"]["operationalState"] = "ERROR"
305 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
306 self.update_db("vims", vim_id, db_vim)
307
308 async def sdn_create(self, sdn_content, order_id):
309 sdn_id = sdn_content["_id"]
310 logging_text = "Task sdn_create={} ".format(sdn_id)
311 self.logger.debug(logging_text + "Enter")
312 db_sdn = None
313 exc = None
314 try:
315 step = "Getting sdn from db"
316 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
317 if "_admin" not in db_sdn:
318 db_sdn["_admin"] = {}
319 if "deployed" not in db_sdn["_admin"]:
320 db_sdn["_admin"]["deployed"] = {}
321 db_sdn["_admin"]["deployed"]["RO"] = None
322
323 step = "Creating sdn at RO"
324 RO = ROclient.ROClient(self.loop, **self.ro_config)
325 sdn_RO = deepcopy(sdn_content)
326 sdn_RO.pop("_id", None)
327 sdn_RO.pop("_admin", None)
328 sdn_RO.pop("schema_version", None)
329 sdn_RO.pop("schema_type", None)
330 desc = await RO.create("sdn", descriptor=sdn_RO)
331 RO_sdn_id = desc["uuid"]
332 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
333 db_sdn["_admin"]["operationalState"] = "ENABLED"
334 self.update_db("sdns", sdn_id, db_sdn)
335 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
336 return RO_sdn_id
337
338 except (ROclient.ROClientException, DbException) as e:
339 self.logger.error(logging_text + "Exit Exception {}".format(e))
340 exc = e
341 except Exception as e:
342 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
343 exc = e
344 finally:
345 if exc and db_sdn:
346 db_sdn["_admin"]["operationalState"] = "ERROR"
347 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
348 self.update_db("sdns", sdn_id, db_sdn)
349
350 async def sdn_edit(self, sdn_content, order_id):
351 sdn_id = sdn_content["_id"]
352 logging_text = "Task sdn_edit={} ".format(sdn_id)
353 self.logger.debug(logging_text + "Enter")
354 db_sdn = None
355 exc = None
356 step = "Getting sdn from db"
357 try:
358 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
359 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
360 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
361 RO = ROclient.ROClient(self.loop, **self.ro_config)
362 step = "Editing sdn at RO"
363 sdn_RO = deepcopy(sdn_content)
364 sdn_RO.pop("_id", None)
365 sdn_RO.pop("_admin", None)
366 sdn_RO.pop("schema_version", None)
367 sdn_RO.pop("schema_type", None)
368 if sdn_RO:
369 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
370 db_sdn["_admin"]["operationalState"] = "ENABLED"
371 self.update_db("sdns", sdn_id, db_sdn)
372
373 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
374 return RO_sdn_id
375
376 except (ROclient.ROClientException, DbException) as e:
377 self.logger.error(logging_text + "Exit Exception {}".format(e))
378 exc = e
379 except Exception as e:
380 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
381 exc = e
382 finally:
383 if exc and db_sdn:
384 db_sdn["_admin"]["operationalState"] = "ERROR"
385 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
386 self.update_db("sdns", sdn_id, db_sdn)
387
388 async def sdn_delete(self, sdn_id, order_id):
389 logging_text = "Task sdn_delete={} ".format(sdn_id)
390 self.logger.debug(logging_text + "Enter")
391 db_sdn = None
392 exc = None
393 step = "Getting sdn from db"
394 try:
395 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
396 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
397 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
398 RO = ROclient.ROClient(self.loop, **self.ro_config)
399 step = "Deleting sdn from RO"
400 try:
401 await RO.delete("sdn", RO_sdn_id)
402 except ROclient.ROClientException as e:
403 if e.http_code == 404: # not found
404 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
405 else:
406 raise
407 else:
408 # nothing to delete
409 self.logger.error(logging_text + "Skipping. There is not RO information at database")
410 self.db.del_one("sdns", {"_id": sdn_id})
411 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
412 return None
413
414 except (ROclient.ROClientException, DbException) as e:
415 self.logger.error(logging_text + "Exit Exception {}".format(e))
416 exc = e
417 except Exception as e:
418 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
419 exc = e
420 finally:
421 if exc and db_sdn:
422 db_sdn["_admin"]["operationalState"] = "ERROR"
423 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
424 self.update_db("sdns", sdn_id, db_sdn)
425
426 def vnfd2RO(self, vnfd, new_id=None):
427 """
428 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
429 :param vnfd: input vnfd
430 :param new_id: overrides vnf id if provided
431 :return: copy of vnfd
432 """
433 ci_file = None
434 try:
435 vnfd_RO = deepcopy(vnfd)
436 vnfd_RO.pop("_id", None)
437 vnfd_RO.pop("_admin", None)
438 if new_id:
439 vnfd_RO["id"] = new_id
440 for vdu in vnfd_RO["vdu"]:
441 if "cloud-init-file" in vdu:
442 base_folder = vnfd["_admin"]["storage"]
443 clout_init_file = "{}/{}/cloud_init/{}".format(
444 base_folder["folder"],
445 base_folder["pkg-dir"],
446 vdu["cloud-init-file"]
447 )
448 ci_file = self.fs.file_open(clout_init_file, "r")
449 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
450 clout_init_content = ci_file.read()
451 ci_file.close()
452 ci_file = None
453 vdu.pop("cloud-init-file", None)
454 vdu["cloud-init"] = clout_init_content
455 return vnfd_RO
456 except FsException as e:
457 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
458 finally:
459 if ci_file:
460 ci_file.close()
461
462 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, db_nslcmop, vnf_member_index, task=None):
463 """Update the lcm database with the status of the charm.
464
465 Updates the VNF's operational status with the state of the charm:
466 - blocked: The unit needs manual intervention
467 - maintenance: The unit is actively deploying/configuring
468 - waiting: The unit is waiting for another charm to be ready
469 - active: The unit is deployed, configured, and ready
470 - error: The charm has failed and needs attention.
471 - terminated: The charm has been destroyed
472 - removing,
473 - removed
474
475 Updates the network service's config-status to reflect the state of all
476 charms.
477 """
478 nsr_id = None
479 nslcmop_id = None
480 update_nsr = update_nslcmop = False
481 try:
482 nsr_id = db_nsr["_id"]
483 nslcmop_id = db_nslcmop["_id"]
484 nsr_lcm = db_nsr["_admin"]["deployed"]
485 ns_action = db_nslcmop["lcmOperationType"]
486 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
487 vnf_member_index)
488
489 if task:
490 if task.cancelled():
491 self.logger.debug(logging_text + " task Cancelled")
492 # TODO update db_nslcmop
493 return
494
495 if task.done():
496 exc = task.exception()
497 if exc:
498 self.logger.error(logging_text + " task Exception={}".format(exc))
499 if ns_action in ("instantiate", "terminate"):
500 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
501 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
502 elif ns_action == "action":
503 db_nslcmop["operationState"] = "FAILED"
504 db_nslcmop["detailedStatus"] = str(exc)
505 db_nslcmop["statusEnteredTime"] = time()
506 update_nslcmop = True
507 return
508
509 else:
510 self.logger.debug(logging_text + " task Done")
511 # TODO revise with Adam if action is finished and ok when task is done
512 if ns_action == "action":
513 db_nslcmop["operationState"] = "COMPLETED"
514 db_nslcmop["detailedStatus"] = "Done"
515 db_nslcmop["statusEnteredTime"] = time()
516 update_nslcmop = True
517 # task is Done, but callback is still ongoing. So ignore
518 return
519 elif workload_status:
520 self.logger.debug(logging_text + " Enter workload_status={}".format(workload_status))
521 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
522 return # same status, ignore
523 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
524 # TODO N2VC some error message in case of error should be obtained from N2VC
525 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
526 else:
527 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
528 return
529
530 some_failed = False
531 all_active = True
532 status_map = {}
533 for vnf_index, vca_info in nsr_lcm["VCA"].items():
534 vca_status = vca_info["operational-status"]
535 if vca_status not in status_map:
536 # Initialize it
537 status_map[vca_status] = 0
538 status_map[vca_status] += 1
539
540 if vca_status != "active":
541 all_active = False
542 if vca_status == "error":
543 some_failed = True
544 db_nsr["config-status"] = "failed"
545 error_text = "fail configuring vnf_index={} {}".format(vnf_member_index,
546 vca_info["detailed-status"])
547 db_nsr["detailed-status"] = error_text
548 db_nslcmop["operationState"] = "FAILED_TEMP"
549 db_nslcmop["detailedStatus"] = error_text
550 db_nslcmop["statusEnteredTime"] = time()
551 break
552
553 if all_active:
554 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
555 db_nsr["config-status"] = "configured"
556 db_nsr["detailed-status"] = "done"
557 db_nslcmop["operationState"] = "COMPLETED"
558 db_nslcmop["detailedStatus"] = "Done"
559 db_nslcmop["statusEnteredTime"] = time()
560 elif some_failed:
561 pass
562 else:
563 cs = "configuring: "
564 separator = ""
565 for status, num in status_map.items():
566 cs += separator + "{}: {}".format(status, num)
567 separator = ", "
568 db_nsr["config-status"] = cs
569 db_nslcmop["detailedStatus"] = cs
570 update_nsr = update_nslcmop = True
571
572 except Exception as e:
573 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
574 finally:
575 try:
576 if update_nslcmop:
577 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
578 if update_nsr:
579 self.update_db("nsrs", nsr_id, db_nsr)
580 except Exception as e:
581 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
582 vnf_member_index, e), exc_info=True)
583
584 async def ns_instantiate(self, nsr_id, nslcmop_id):
585 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
586 self.logger.debug(logging_text + "Enter")
587 # get all needed from database
588 db_nsr = None
589 db_nslcmop = None
590 exc = None
591 step = "Getting nsr, nslcmop, RO_vims from db"
592 try:
593 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
594 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
595 nsd = db_nsr["nsd"]
596 nsr_name = db_nsr["name"] # TODO short-name??
597
598 db_vim = self.db.get_one("vims", {"_id": db_nsr["datacenter"]})
599 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
600 # #TODO check if VIM is creating and wait
601 if db_vim["_admin"]["operationalState"] != "ENABLED":
602 raise LcmException("VIM={} is not available. operationalSstatus={}".format(
603 db_nsr["datacenter"], db_vim["_admin"]["operationalState"]))
604 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
605
606 needed_vnfd = {}
607 for c_vnf in nsd["constituent-vnfd"]:
608 vnfd_id = c_vnf["vnfd-id-ref"]
609 if vnfd_id not in needed_vnfd:
610 step = "Getting vnfd={} from db".format(vnfd_id)
611 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
612
613 nsr_lcm = db_nsr["_admin"].get("deployed")
614 if not nsr_lcm:
615 nsr_lcm = db_nsr["_admin"]["deployed"] = {
616 "id": nsr_id,
617 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
618 "nsr_ip": {},
619 "VCA": {},
620 }
621 db_nsr["detailed-status"] = "creating"
622 db_nsr["operational-status"] = "init"
623
624 deloyment_timeout = 120
625
626 RO = ROclient.ROClient(self.loop, datacenter=RO_vim_id, **self.ro_config)
627
628 # get vnfds, instantiate at RO
629 for vnfd_id, vnfd in needed_vnfd.items():
630 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
631 self.logger.debug(logging_text + step)
632 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
633
634 # look if present
635 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
636 if vnfd_list:
637 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
638 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
639 vnfd_id, vnfd_list[0]["uuid"]))
640 else:
641 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
642 desc = await RO.create("vnfd", descriptor=vnfd_RO)
643 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
644 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
645 self.update_db("nsrs", nsr_id, db_nsr)
646
647 # create nsd at RO
648 nsd_id = nsd["id"]
649 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
650 self.logger.debug(logging_text + step)
651
652 nsd_id_RO = nsd_id + "." + nsd_id[:200]
653 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
654 if nsd_list:
655 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
656 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
657 nsd_id, nsd_list[0]["uuid"]))
658 else:
659 nsd_RO = deepcopy(nsd)
660 nsd_RO["id"] = nsd_id_RO
661 nsd_RO.pop("_id", None)
662 nsd_RO.pop("_admin", None)
663 for c_vnf in nsd_RO["constituent-vnfd"]:
664 vnfd_id = c_vnf["vnfd-id-ref"]
665 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
666 desc = await RO.create("nsd", descriptor=nsd_RO)
667 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
668 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
669 self.update_db("nsrs", nsr_id, db_nsr)
670
671 # Crate ns at RO
672 # if present use it unless in error status
673 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
674 if RO_nsr_id:
675 try:
676 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
677 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
678 desc = await RO.show("ns", RO_nsr_id)
679 except ROclient.ROClientException as e:
680 if e.http_code != HTTPStatus.NOT_FOUND:
681 raise
682 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
683 if RO_nsr_id:
684 ns_status, ns_status_info = RO.check_ns_status(desc)
685 nsr_lcm["RO"]["nsr_status"] = ns_status
686 if ns_status == "ERROR":
687 step = db_nsr["detailed-status"] = "Deleting ns at RO"
688 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
689 await RO.delete("ns", RO_nsr_id)
690 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
691
692 if not RO_nsr_id:
693 step = db_nsr["detailed-status"] = "Creating ns at RO"
694 self.logger.debug(logging_text + step)
695
696 desc = await RO.create("ns", name=db_nsr["name"], datacenter=RO_vim_id,
697 scenario=nsr_lcm["RO"]["nsd_id"])
698 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
699 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
700 nsr_lcm["RO"]["nsr_status"] = "BUILD"
701 self.update_db("nsrs", nsr_id, db_nsr)
702
703 # wait until NS is ready
704 step = ns_status_detailed = "Waiting ns ready at RO"
705 db_nsr["detailed-status"] = ns_status_detailed
706 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
707 deloyment_timeout = 600
708 while deloyment_timeout > 0:
709 desc = await RO.show("ns", RO_nsr_id)
710 ns_status, ns_status_info = RO.check_ns_status(desc)
711 nsr_lcm["RO"]["nsr_status"] = ns_status
712 if ns_status == "ERROR":
713 raise ROclient.ROClientException(ns_status_info)
714 elif ns_status == "BUILD":
715 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
716 self.update_db("nsrs", nsr_id, db_nsr)
717 elif ns_status == "ACTIVE":
718 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
719 break
720 else:
721 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
722
723 await asyncio.sleep(5, loop=self.loop)
724 deloyment_timeout -= 5
725 if deloyment_timeout <= 0:
726 raise ROclient.ROClientException("Timeout waiting ns to be ready")
727 db_nsr["detailed-status"] = "Configuring vnfr"
728 self.update_db("nsrs", nsr_id, db_nsr)
729
730 # The parameters we'll need to deploy a charm
731 number_to_configure = 0
732
733 def deploy():
734 """An inner function to deploy the charm from either vnf or vdu
735 """
736
737 # Login to the VCA.
738 # if number_to_configure == 0:
739 # self.logger.debug("Logging into N2VC...")
740 # task = asyncio.ensure_future(self.n2vc.login())
741 # yield from asyncio.wait_for(task, 30.0)
742 # self.logger.debug("Logged into N2VC!")
743
744 ## await self.n2vc.login()
745
746 # Note: The charm needs to exist on disk at the location
747 # specified by charm_path.
748 base_folder = vnfd["_admin"]["storage"]
749 storage_params = self.fs.get_params()
750 charm_path = "{}{}/{}/charms/{}".format(
751 storage_params["path"],
752 base_folder["folder"],
753 base_folder["pkg-dir"],
754 proxy_charm
755 )
756
757 # Setup the runtime parameters for this VNF
758 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip']["vnf"][vnf_index]
759
760 # ns_name will be ignored in the current version of N2VC
761 # but will be implemented for the next point release.
762 model_name = 'default'
763 application_name = self.n2vc.FormatApplicationName(
764 nsr_name,
765 vnf_index,
766 vnfd['name'],
767 )
768
769 nsr_lcm["VCA"][vnf_index] = {
770 "model": model_name,
771 "application": application_name,
772 "operational-status": "init",
773 "detailed-status": "",
774 "vnfd_id": vnfd_id,
775 }
776
777 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
778 task = asyncio.ensure_future(
779 self.n2vc.DeployCharms(
780 model_name, # The network service name
781 application_name, # The application name
782 vnfd, # The vnf descriptor
783 charm_path, # Path to charm
784 params, # Runtime params, like mgmt ip
785 {}, # for native charms only
786 self.n2vc_callback, # Callback for status changes
787 db_nsr, # Callback parameter
788 db_nslcmop,
789 vnf_index, # Callback parameter
790 None, # Callback parameter (task)
791 )
792 )
793 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
794 db_nsr, db_nslcmop, vnf_index))
795 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
796
797 # TODO: Make this call inside deploy()
798 # Login to the VCA. If there are multiple calls to login(),
799 # subsequent calls will be a nop and return immediately.
800 await self.n2vc.login()
801
802 step = "Looking for needed vnfd to configure"
803 self.logger.debug(logging_text + step)
804 for c_vnf in nsd["constituent-vnfd"]:
805 vnfd_id = c_vnf["vnfd-id-ref"]
806 vnf_index = str(c_vnf["member-vnf-index"])
807 vnfd = needed_vnfd[vnfd_id]
808
809 # Check if this VNF has a charm configuration
810 vnf_config = vnfd.get("vnf-configuration")
811
812 if vnf_config and vnf_config.get("juju"):
813 proxy_charm = vnf_config["juju"]["charm"]
814 params = {}
815
816 if proxy_charm:
817 if 'initial-config-primitive' in vnf_config:
818 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
819
820 deploy()
821 number_to_configure += 1
822
823 # Deploy charms for each VDU that supports one.
824 for vdu in vnfd['vdu']:
825 vdu_config = vdu.get('vdu-configuration')
826 proxy_charm = None
827 params = {}
828
829 if vdu_config and vdu_config.get("juju"):
830 proxy_charm = vdu_config["juju"]["charm"]
831
832 if 'initial-config-primitive' in vdu_config:
833 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
834
835 if proxy_charm:
836 deploy()
837 number_to_configure += 1
838
839 if number_to_configure:
840 db_nsr["config-status"] = "configuring"
841 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
842 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
843 else:
844 db_nslcmop["operationState"] = "COMPLETED"
845 db_nslcmop["detailed-status"] = "done"
846 db_nsr["config-status"] = "configured"
847 db_nsr["detailed-status"] = "done"
848 db_nsr["operational-status"] = "running"
849 self.update_db("nsrs", nsr_id, db_nsr)
850 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
851 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
852 return nsr_lcm
853
854 except (ROclient.ROClientException, DbException, LcmException) as e:
855 self.logger.error(logging_text + "Exit Exception {}".format(e))
856 exc = e
857 except Exception as e:
858 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
859 exc = e
860 finally:
861 if exc:
862 if db_nsr:
863 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
864 db_nsr["operational-status"] = "failed"
865 self.update_db("nsrs", nsr_id, db_nsr)
866 if db_nslcmop:
867 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
868 db_nslcmop["operationState"] = "FAILED"
869 db_nslcmop["statusEnteredTime"] = time()
870 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
871
872 async def ns_terminate(self, nsr_id, nslcmop_id):
873 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
874 self.logger.debug(logging_text + "Enter")
875 db_nsr = None
876 db_nslcmop = None
877 exc = None
878 step = "Getting nsr, nslcmop from db"
879 failed_detail = [] # annotates all failed error messages
880 vca_task_list = []
881 vca_task_dict = {}
882 try:
883 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
884 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
885 # nsd = db_nsr["nsd"]
886 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
887 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
888 return
889 # TODO ALF remove
890 # db_vim = self.db.get_one("vims", {"_id": db_nsr["datacenter"]})
891 # #TODO check if VIM is creating and wait
892 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
893
894 db_nsr_update = {
895 "operational-status": "terminating",
896 "config-status": "terminating",
897 "detailed-status": "Deleting charms",
898 }
899 self.update_db_2("nsrs", nsr_id, db_nsr_update)
900
901 try:
902 self.logger.debug(logging_text + step)
903 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
904 if deploy_info and deploy_info.get("application"):
905 task = asyncio.ensure_future(
906 self.n2vc.RemoveCharms(
907 deploy_info['model'],
908 deploy_info['application'],
909 # self.n2vc_callback,
910 # db_nsr,
911 # db_nslcmop,
912 # vnf_index,
913 )
914 )
915 vca_task_list.append(task)
916 vca_task_dict[vnf_index] = task
917 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
918 # deploy_info['application'], None, db_nsr,
919 # db_nslcmop, vnf_index))
920 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
921 except Exception as e:
922 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
923 # remove from RO
924
925 RO = ROclient.ROClient(self.loop, **self.ro_config)
926 # Delete ns
927 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
928 if RO_nsr_id:
929 try:
930 step = db_nsr["detailed-status"] = "Deleting ns at RO"
931 self.logger.debug(logging_text + step)
932 desc = await RO.delete("ns", RO_nsr_id)
933 nsr_lcm["RO"]["nsr_id"] = None
934 nsr_lcm["RO"]["nsr_status"] = "DELETED"
935 except ROclient.ROClientException as e:
936 if e.http_code == 404: # not found
937 nsr_lcm["RO"]["nsr_id"] = None
938 nsr_lcm["RO"]["nsr_status"] = "DELETED"
939 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
940 elif e.http_code == 409: #conflict
941 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
942 self.logger.debug(logging_text + failed_detail[-1])
943 else:
944 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
945 self.logger.error(logging_text + failed_detail[-1])
946
947 # Delete nsd
948 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
949 if RO_nsd_id:
950 try:
951 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
952 desc = await RO.delete("nsd", RO_nsd_id)
953 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
954 nsr_lcm["RO"]["nsd_id"] = None
955 except ROclient.ROClientException as e:
956 if e.http_code == 404: # not found
957 nsr_lcm["RO"]["nsd_id"] = None
958 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
959 elif e.http_code == 409: #conflict
960 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
961 self.logger.debug(logging_text + failed_detail[-1])
962 else:
963 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
964 self.logger.error(logging_text + failed_detail[-1])
965
966 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
967 if not RO_vnfd_id:
968 continue
969 try:
970 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
971 desc = await RO.delete("vnfd", RO_vnfd_id)
972 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
973 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
974 except ROclient.ROClientException as e:
975 if e.http_code == 404: # not found
976 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
977 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
978 elif e.http_code == 409: #conflict
979 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
980 self.logger.debug(logging_text + failed_detail[-1])
981 else:
982 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
983 self.logger.error(logging_text + failed_detail[-1])
984
985 if vca_task_list:
986 await asyncio.wait(vca_task_list, timeout=300)
987 for vnf_index, task in vca_task_dict.items():
988 if task.cancelled():
989 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
990 elif task.done():
991 exc = task.exception()
992 if exc:
993 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
994 else:
995 nsr_lcm["VCA"][vnf_index] = None
996 else: # timeout
997 # TODO Should it be cancelled?!!
998 task.cancel()
999 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1000
1001 if failed_detail:
1002 self.logger.error(logging_text + " ;".join(failed_detail))
1003 db_nsr_update = {
1004 "operational-status": "failed",
1005 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1006 "_admin": {"deployed": nsr_lcm, }
1007 }
1008 db_nslcmop_update = {
1009 "detailedStatus": "; ".join(failed_detail),
1010 "operationState": "FAILED",
1011 "statusEnteredTime": time()
1012 }
1013 elif db_nslcmop["operationParams"].get("autoremove"):
1014 self.db.del_one("nsrs", {"_id": nsr_id})
1015 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1016 else:
1017 db_nsr_update = {
1018 "operational-status": "terminated",
1019 "detailed-status": "Done",
1020 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1021 }
1022 db_nslcmop_update = {
1023 "detailedStatus": "Done",
1024 "operationState": "COMPLETED",
1025 "statusEnteredTime": time()
1026 }
1027 self.logger.debug(logging_text + "Exit")
1028
1029 except (ROclient.ROClientException, DbException) as e:
1030 self.logger.error(logging_text + "Exit Exception {}".format(e))
1031 exc = e
1032 except Exception as e:
1033 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1034 exc = e
1035 finally:
1036 if exc and db_nslcmop:
1037 db_nslcmop_update = {
1038 "detailed-status": "FAILED {}: {}".format(step, exc),
1039 "operationState": "FAILED",
1040 "statusEnteredTime": time(),
1041 }
1042 if db_nslcmop_update:
1043 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1044 if db_nsr_update:
1045 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1046
1047 async def ns_action(self, nsr_id, nslcmop_id):
1048 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1049 self.logger.debug(logging_text + "Enter")
1050 # get all needed from database
1051 db_nsr = None
1052 db_nslcmop = None
1053 db_nslcmop_update = None
1054 exc = None
1055 step = "Getting nsr, nslcmop"
1056 try:
1057 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1058 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1059 nsr_lcm = db_nsr["_admin"].get("deployed")
1060 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1061
1062 #TODO check if ns is in a proper status
1063 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1064 if not vca_deployed:
1065 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1066 model_name = vca_deployed.get("model")
1067 application_name = vca_deployed.get("application")
1068 if not model_name or not application_name:
1069 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1070 if vca_deployed["operational-status"] != "active":
1071 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1072 vnf_index, vca_deployed["operational-status"]))
1073 primitive = db_nslcmop["operationParams"]["primitive"]
1074 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1075 callback = None # self.n2vc_callback
1076 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1077 await self.n2vc.login()
1078 task = asyncio.ensure_future(
1079 self.n2vc.ExecutePrimitive(
1080 model_name,
1081 application_name,
1082 primitive, callback,
1083 *callback_args,
1084 **primitive_params
1085 )
1086 )
1087 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1088 # db_nsr, db_nslcmop, vnf_index))
1089 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1090 # wait until completed with timeout
1091 await asyncio.wait((task,), timeout=300)
1092
1093 result = "FAILED" # by default
1094 result_detail = ""
1095 if task.cancelled():
1096 db_nslcmop["detailedStatus"] = "Task has been cancelled"
1097 elif task.done():
1098 exc = task.exception()
1099 if exc:
1100 result_detail = str(exc)
1101 else:
1102 self.logger.debug(logging_text + " task Done")
1103 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1104 result = "COMPLETED"
1105 result_detail = "Done"
1106 else: # timeout
1107 # TODO Should it be cancelled?!!
1108 task.cancel()
1109 result_detail = "timeout"
1110
1111 db_nslcmop_update = {
1112 "detailedStatus": result_detail,
1113 "operationState": result,
1114 "statusEnteredTime": time()
1115 }
1116 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1117 return # database update is called inside finally
1118
1119 except (DbException, LcmException) as e:
1120 self.logger.error(logging_text + "Exit Exception {}".format(e))
1121 exc = e
1122 except Exception as e:
1123 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1124 exc = e
1125 finally:
1126 if exc and db_nslcmop:
1127 db_nslcmop_update = {
1128 "detailed-status": "FAILED {}: {}".format(step, exc),
1129 "operationState": "FAILED",
1130 "statusEnteredTime": time(),
1131 }
1132 if db_nslcmop_update:
1133 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1134
1135 async def test(self, param=None):
1136 self.logger.debug("Starting/Ending test task: {}".format(param))
1137
1138 def cancel_tasks(self, topic, _id):
1139 """
1140 Cancel all active tasks of a concrete nsr or vim identified for _id
1141 :param topic: can be ns or vim_account
1142 :param _id: nsr or vim identity
1143 :return: None, or raises an exception if not possible
1144 """
1145 if topic == "ns":
1146 lcm_tasks = self.lcm_ns_tasks
1147 elif topic== "vim_account":
1148 lcm_tasks = self.lcm_vim_tasks
1149 elif topic== "sdn":
1150 lcm_tasks = self.lcm_sdn_tasks
1151
1152 if not lcm_tasks.get(_id):
1153 return
1154 for order_id, tasks_set in lcm_tasks[_id].items():
1155 for task_name, task in tasks_set.items():
1156 result = task.cancel()
1157 if result:
1158 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1159 lcm_tasks[_id] = {}
1160
1161 async def kafka_ping(self):
1162 self.logger.debug("Task kafka_ping Enter")
1163 consecutive_errors = 0
1164 first_start = True
1165 kafka_has_received = False
1166 self.pings_not_received = 1
1167 while True:
1168 try:
1169 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1170 # time between pings are low when it is not received and at starting
1171 wait_time = 5 if not kafka_has_received else 120
1172 if not self.pings_not_received:
1173 kafka_has_received = True
1174 self.pings_not_received += 1
1175 await asyncio.sleep(wait_time, loop=self.loop)
1176 if self.pings_not_received > 10:
1177 raise LcmException("It is not receiving pings from Kafka bus")
1178 consecutive_errors = 0
1179 first_start = False
1180 except LcmException:
1181 raise
1182 except Exception as e:
1183 # if not first_start is the first time after starting. So leave more time and wait
1184 # to allow kafka starts
1185 if consecutive_errors == 8 if not first_start else 30:
1186 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1187 raise
1188 consecutive_errors += 1
1189 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1190 wait_time = 1 if not first_start else 5
1191 await asyncio.sleep(wait_time, loop=self.loop)
1192
1193 async def kafka_read(self):
1194 self.logger.debug("Task kafka_read Enter")
1195 order_id = 1
1196 # future = asyncio.Future()
1197 consecutive_errors = 0
1198 first_start = True
1199 while consecutive_errors < 10:
1200 try:
1201 topics = ("admin", "ns", "vim_account", "sdn")
1202 topic, command, params = await self.msg.aioread(topics, self.loop)
1203 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1204 consecutive_errors = 0
1205 first_start = False
1206 order_id += 1
1207 if command == "exit":
1208 print("Bye!")
1209 break
1210 elif command.startswith("#"):
1211 continue
1212 elif command == "echo":
1213 # just for test
1214 print(params)
1215 sys.stdout.flush()
1216 continue
1217 elif command == "test":
1218 asyncio.Task(self.test(params), loop=self.loop)
1219 continue
1220
1221 if topic == "admin":
1222 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1223 self.pings_not_received = 0
1224 continue
1225 elif topic == "ns":
1226 if command == "instantiate":
1227 # self.logger.debug("Deploying NS {}".format(nsr_id))
1228 nslcmop = params
1229 nslcmop_id = nslcmop["_id"]
1230 nsr_id = nslcmop["nsInstanceId"]
1231 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1232 if nsr_id not in self.lcm_ns_tasks:
1233 self.lcm_ns_tasks[nsr_id] = {}
1234 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1235 continue
1236 elif command == "terminate":
1237 # self.logger.debug("Deleting NS {}".format(nsr_id))
1238 nslcmop = params
1239 nslcmop_id = nslcmop["_id"]
1240 nsr_id = nslcmop["nsInstanceId"]
1241 self.cancel_tasks(topic, nsr_id)
1242 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1243 if nsr_id not in self.lcm_ns_tasks:
1244 self.lcm_ns_tasks[nsr_id] = {}
1245 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1246 continue
1247 elif command == "action":
1248 # self.logger.debug("Update NS {}".format(nsr_id))
1249 nslcmop = params
1250 nslcmop_id = nslcmop["_id"]
1251 nsr_id = nslcmop["nsInstanceId"]
1252 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1253 if nsr_id not in self.lcm_ns_tasks:
1254 self.lcm_ns_tasks[nsr_id] = {}
1255 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1256 continue
1257 elif command == "show":
1258 try:
1259 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1260 print(
1261 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1262 "{}\n deploy: {}\n tasks: {}".format(
1263 nsr_id, db_nsr["operational-status"],
1264 db_nsr["config-status"], db_nsr["detailed-status"],
1265 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1266 except Exception as e:
1267 print("nsr {} not found: {}".format(nsr_id, e))
1268 sys.stdout.flush()
1269 continue
1270 elif command == "deleted":
1271 continue # TODO cleaning of task just in case should be done
1272 elif topic == "vim_account":
1273 vim_id = params["_id"]
1274 if command == "create":
1275 task = asyncio.ensure_future(self.vim_create(params, order_id))
1276 if vim_id not in self.lcm_vim_tasks:
1277 self.lcm_vim_tasks[vim_id] = {}
1278 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1279 continue
1280 elif command == "delete":
1281 self.cancel_tasks(topic, vim_id)
1282 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1283 if vim_id not in self.lcm_vim_tasks:
1284 self.lcm_vim_tasks[vim_id] = {}
1285 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1286 continue
1287 elif command == "show":
1288 print("not implemented show with vim_account")
1289 sys.stdout.flush()
1290 continue
1291 elif command == "edit":
1292 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1293 if vim_id not in self.lcm_vim_tasks:
1294 self.lcm_vim_tasks[vim_id] = {}
1295 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1296 continue
1297 elif topic == "sdn":
1298 _sdn_id = params["_id"]
1299 if command == "create":
1300 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1301 if _sdn_id not in self.lcm_sdn_tasks:
1302 self.lcm_sdn_tasks[_sdn_id] = {}
1303 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1304 continue
1305 elif command == "delete":
1306 self.cancel_tasks(topic, _sdn_id)
1307 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1308 if _sdn_id not in self.lcm_sdn_tasks:
1309 self.lcm_sdn_tasks[_sdn_id] = {}
1310 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1311 continue
1312 elif command == "edit":
1313 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1314 if _sdn_id not in self.lcm_sdn_tasks:
1315 self.lcm_sdn_tasks[_sdn_id] = {}
1316 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1317 continue
1318 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1319 except Exception as e:
1320 # if not first_start is the first time after starting. So leave more time and wait
1321 # to allow kafka starts
1322 if consecutive_errors == 8 if not first_start else 30:
1323 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1324 raise
1325 consecutive_errors += 1
1326 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1327 wait_time = 2 if not first_start else 5
1328 await asyncio.sleep(wait_time, loop=self.loop)
1329
1330 # self.logger.debug("Task kafka_read terminating")
1331 self.logger.debug("Task kafka_read exit")
1332
1333 def start(self):
1334 self.loop = asyncio.get_event_loop()
1335 self.loop.run_until_complete(asyncio.gather(
1336 self.kafka_read(),
1337 self.kafka_ping()
1338 ))
1339 # TODO
1340 # self.logger.debug("Terminating cancelling creation tasks")
1341 # self.cancel_tasks("ALL", "create")
1342 # timeout = 200
1343 # while self.is_pending_tasks():
1344 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1345 # await asyncio.sleep(2, loop=self.loop)
1346 # timeout -= 2
1347 # if not timeout:
1348 # self.cancel_tasks("ALL", "ALL")
1349 self.loop.close()
1350 self.loop = None
1351 if self.db:
1352 self.db.db_disconnect()
1353 if self.msg:
1354 self.msg.disconnect()
1355 if self.fs:
1356 self.fs.fs_disconnect()
1357
1358
1359 def read_config_file(self, config_file):
1360 # TODO make a [ini] + yaml inside parser
1361 # the configparser library is not suitable, because it does not admit comments at the end of line,
1362 # and not parse integer or boolean
1363 try:
1364 with open(config_file) as f:
1365 conf = yaml.load(f)
1366 for k, v in environ.items():
1367 if not k.startswith("OSMLCM_"):
1368 continue
1369 k_items = k.lower().split("_")
1370 c = conf
1371 try:
1372 for k_item in k_items[1:-1]:
1373 if k_item in ("ro", "vca"):
1374 # put in capital letter
1375 k_item = k_item.upper()
1376 c = c[k_item]
1377 if k_items[-1] == "port":
1378 c[k_items[-1]] = int(v)
1379 else:
1380 c[k_items[-1]] = v
1381 except Exception as e:
1382 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1383
1384 return conf
1385 except Exception as e:
1386 self.logger.critical("At config file '{}': {}".format(config_file, e))
1387 exit(1)
1388
1389
1390 if __name__ == '__main__':
1391
1392 config_file = "lcm.cfg"
1393 lcm = Lcm(config_file)
1394
1395 lcm.start()