LWB increase deployment timeout. Check vnf management ip is returned
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 class LcmException(Exception):
30 pass
31
32
33 class Lcm:
34
35 def __init__(self, config_file):
36 """
37 Init, Connect to database, filesystem storage, and messaging
38 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
39 :return: None
40 """
41
42 self.db = None
43 self.msg = None
44 self.fs = None
45 self.pings_not_received = 1
46
47 # contains created tasks/futures to be able to cancel
48 self.lcm_ns_tasks = {}
49 self.lcm_vim_tasks = {}
50 self.lcm_sdn_tasks = {}
51 # logging
52 self.logger = logging.getLogger('lcm')
53 # load configuration
54 config = self.read_config_file(config_file)
55 self.config = config
56 self.ro_config={
57 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
58 "tenant": config.get("tenant", "osm"),
59 "logger_name": "lcm.ROclient",
60 "loglevel": "ERROR",
61 }
62
63 self.vca = config["VCA"] # TODO VCA
64 self.loop = None
65
66 # logging
67 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
68 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
69 config["database"]["logger_name"] = "lcm.db"
70 config["storage"]["logger_name"] = "lcm.fs"
71 config["message"]["logger_name"] = "lcm.msg"
72 if "logfile" in config["global"]:
73 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
74 maxBytes=100e6, backupCount=9, delay=0)
75 file_handler.setFormatter(log_formatter_simple)
76 self.logger.addHandler(file_handler)
77 else:
78 str_handler = logging.StreamHandler()
79 str_handler.setFormatter(log_formatter_simple)
80 self.logger.addHandler(str_handler)
81
82 if config["global"].get("loglevel"):
83 self.logger.setLevel(config["global"]["loglevel"])
84
85 # logging other modules
86 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
87 config[k1]["logger_name"] = logname
88 logger_module = logging.getLogger(logname)
89 if "logfile" in config[k1]:
90 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
91 maxBytes=100e6, backupCount=9, delay=0)
92 file_handler.setFormatter(log_formatter_simple)
93 logger_module.addHandler(file_handler)
94 if "loglevel" in config[k1]:
95 logger_module.setLevel(config[k1]["loglevel"])
96
97 self.n2vc = N2VC(
98 log=self.logger,
99 server=config['VCA']['host'],
100 port=config['VCA']['port'],
101 user=config['VCA']['user'],
102 secret=config['VCA']['secret'],
103 # TODO: This should point to the base folder where charms are stored,
104 # if there is a common one (like object storage). Otherwise, leave
105 # it unset and pass it via DeployCharms
106 # artifacts=config['VCA'][''],
107 artifacts=None,
108 )
109
110 try:
111 if config["database"]["driver"] == "mongo":
112 self.db = dbmongo.DbMongo()
113 self.db.db_connect(config["database"])
114 elif config["database"]["driver"] == "memory":
115 self.db = dbmemory.DbMemory()
116 self.db.db_connect(config["database"])
117 else:
118 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
119 config["database"]["driver"]))
120
121 if config["storage"]["driver"] == "local":
122 self.fs = fslocal.FsLocal()
123 self.fs.fs_connect(config["storage"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
126 config["storage"]["driver"]))
127
128 if config["message"]["driver"] == "local":
129 self.msg = msglocal.MsgLocal()
130 self.msg.connect(config["message"])
131 elif config["message"]["driver"] == "kafka":
132 self.msg = msgkafka.MsgKafka()
133 self.msg.connect(config["message"])
134 else:
135 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
136 config["storage"]["driver"]))
137 except (DbException, FsException, MsgException) as e:
138 self.logger.critical(str(e), exc_info=True)
139 raise LcmException(str(e))
140
141 def update_db(self, item, _id, _desc):
142 try:
143 self.db.replace(item, _id, _desc)
144 except DbException as e:
145 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
146
147 def update_db_2(self, item, _id, _desc):
148 try:
149 self.db.set_one(item, {"_id": _id}, _desc)
150 except DbException as e:
151 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
152
153 async def vim_create(self, vim_content, order_id):
154 vim_id = vim_content["_id"]
155 logging_text = "Task vim_create={} ".format(vim_id)
156 self.logger.debug(logging_text + "Enter")
157 db_vim = None
158 exc = None
159 try:
160 step = "Getting vim from db"
161 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
162 if "_admin" not in db_vim:
163 db_vim["_admin"] = {}
164 if "deployed" not in db_vim["_admin"]:
165 db_vim["_admin"]["deployed"] = {}
166 db_vim["_admin"]["deployed"]["RO"] = None
167
168 step = "Creating vim at RO"
169 RO = ROclient.ROClient(self.loop, **self.ro_config)
170 vim_RO = deepcopy(vim_content)
171 vim_RO.pop("_id", None)
172 vim_RO.pop("_admin", None)
173 vim_RO.pop("schema_version", None)
174 vim_RO.pop("schema_type", None)
175 vim_RO.pop("vim_tenant_name", None)
176 vim_RO["type"] = vim_RO.pop("vim_type")
177 vim_RO.pop("vim_user", None)
178 vim_RO.pop("vim_password", None)
179 desc = await RO.create("vim", descriptor=vim_RO)
180 RO_vim_id = desc["uuid"]
181 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
182 self.update_db("vim_accounts", vim_id, db_vim)
183
184 step = "Attach vim to RO tenant"
185 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
186 "vim_username": vim_content["vim_user"],
187 "vim_password": vim_content["vim_password"],
188 "config": vim_content["config"]
189 }
190 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
191 db_vim["_admin"]["operationalState"] = "ENABLED"
192 self.update_db("vim_accounts", vim_id, db_vim)
193
194 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
195 return RO_vim_id
196
197 except (ROclient.ROClientException, DbException) as e:
198 self.logger.error(logging_text + "Exit Exception {}".format(e))
199 exc = e
200 except Exception as e:
201 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
202 exc = e
203 finally:
204 if exc and db_vim:
205 db_vim["_admin"]["operationalState"] = "ERROR"
206 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
207 self.update_db("vim_accounts", vim_id, db_vim)
208
209 async def vim_edit(self, vim_content, order_id):
210 vim_id = vim_content["_id"]
211 logging_text = "Task vim_edit={} ".format(vim_id)
212 self.logger.debug(logging_text + "Enter")
213 db_vim = None
214 exc = None
215 step = "Getting vim from db"
216 try:
217 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
218 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
219 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
220 step = "Editing vim at RO"
221 RO = ROclient.ROClient(self.loop, **self.ro_config)
222 vim_RO = deepcopy(vim_content)
223 vim_RO.pop("_id", None)
224 vim_RO.pop("_admin", None)
225 vim_RO.pop("schema_version", None)
226 vim_RO.pop("schema_type", None)
227 vim_RO.pop("vim_tenant_name", None)
228 vim_RO["type"] = vim_RO.pop("vim_type")
229 vim_RO.pop("vim_user", None)
230 vim_RO.pop("vim_password", None)
231 if vim_RO:
232 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
233
234 step = "Editing vim-account at RO tenant"
235 vim_RO = {}
236 for k in ("vim_tenant_name", "vim_password", "config"):
237 if k in vim_content:
238 vim_RO[k] = vim_content[k]
239 if "vim_user" in vim_content:
240 vim_content["vim_username"] = vim_content["vim_user"]
241 if vim_RO:
242 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
243 db_vim["_admin"]["operationalState"] = "ENABLED"
244 self.update_db("vim_accounts", vim_id, db_vim)
245
246 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
247 return RO_vim_id
248
249 except (ROclient.ROClientException, DbException) as e:
250 self.logger.error(logging_text + "Exit Exception {}".format(e))
251 exc = e
252 except Exception as e:
253 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
254 exc = e
255 finally:
256 if exc and db_vim:
257 db_vim["_admin"]["operationalState"] = "ERROR"
258 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
259 self.update_db("vim_accounts", vim_id, db_vim)
260
261 async def vim_delete(self, vim_id, order_id):
262 logging_text = "Task vim_delete={} ".format(vim_id)
263 self.logger.debug(logging_text + "Enter")
264 db_vim = None
265 exc = None
266 step = "Getting vim from db"
267 try:
268 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
269 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
270 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
271 RO = ROclient.ROClient(self.loop, **self.ro_config)
272 step = "Detaching vim from RO tenant"
273 try:
274 await RO.detach_datacenter(RO_vim_id)
275 except ROclient.ROClientException as e:
276 if e.http_code == 404: # not found
277 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
278 else:
279 raise
280
281 step = "Deleting vim from RO"
282 try:
283 await RO.delete("vim", RO_vim_id)
284 except ROclient.ROClientException as e:
285 if e.http_code == 404: # not found
286 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
287 else:
288 raise
289 else:
290 # nothing to delete
291 self.logger.error(logging_text + "Skipping. There is not RO information at database")
292 self.db.del_one("vim_accounts", {"_id": vim_id})
293 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
294 return None
295
296 except (ROclient.ROClientException, DbException) as e:
297 self.logger.error(logging_text + "Exit Exception {}".format(e))
298 exc = e
299 except Exception as e:
300 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
301 exc = e
302 finally:
303 if exc and db_vim:
304 db_vim["_admin"]["operationalState"] = "ERROR"
305 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
306 self.update_db("vim_accounts", vim_id, db_vim)
307
308 async def sdn_create(self, sdn_content, order_id):
309 sdn_id = sdn_content["_id"]
310 logging_text = "Task sdn_create={} ".format(sdn_id)
311 self.logger.debug(logging_text + "Enter")
312 db_sdn = None
313 exc = None
314 try:
315 step = "Getting sdn from db"
316 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
317 if "_admin" not in db_sdn:
318 db_sdn["_admin"] = {}
319 if "deployed" not in db_sdn["_admin"]:
320 db_sdn["_admin"]["deployed"] = {}
321 db_sdn["_admin"]["deployed"]["RO"] = None
322
323 step = "Creating sdn at RO"
324 RO = ROclient.ROClient(self.loop, **self.ro_config)
325 sdn_RO = deepcopy(sdn_content)
326 sdn_RO.pop("_id", None)
327 sdn_RO.pop("_admin", None)
328 sdn_RO.pop("schema_version", None)
329 sdn_RO.pop("schema_type", None)
330 desc = await RO.create("sdn", descriptor=sdn_RO)
331 RO_sdn_id = desc["uuid"]
332 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
333 db_sdn["_admin"]["operationalState"] = "ENABLED"
334 self.update_db("sdns", sdn_id, db_sdn)
335 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
336 return RO_sdn_id
337
338 except (ROclient.ROClientException, DbException) as e:
339 self.logger.error(logging_text + "Exit Exception {}".format(e))
340 exc = e
341 except Exception as e:
342 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
343 exc = e
344 finally:
345 if exc and db_sdn:
346 db_sdn["_admin"]["operationalState"] = "ERROR"
347 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
348 self.update_db("sdns", sdn_id, db_sdn)
349
350 async def sdn_edit(self, sdn_content, order_id):
351 sdn_id = sdn_content["_id"]
352 logging_text = "Task sdn_edit={} ".format(sdn_id)
353 self.logger.debug(logging_text + "Enter")
354 db_sdn = None
355 exc = None
356 step = "Getting sdn from db"
357 try:
358 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
359 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
360 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
361 RO = ROclient.ROClient(self.loop, **self.ro_config)
362 step = "Editing sdn at RO"
363 sdn_RO = deepcopy(sdn_content)
364 sdn_RO.pop("_id", None)
365 sdn_RO.pop("_admin", None)
366 sdn_RO.pop("schema_version", None)
367 sdn_RO.pop("schema_type", None)
368 if sdn_RO:
369 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
370 db_sdn["_admin"]["operationalState"] = "ENABLED"
371 self.update_db("sdns", sdn_id, db_sdn)
372
373 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
374 return RO_sdn_id
375
376 except (ROclient.ROClientException, DbException) as e:
377 self.logger.error(logging_text + "Exit Exception {}".format(e))
378 exc = e
379 except Exception as e:
380 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
381 exc = e
382 finally:
383 if exc and db_sdn:
384 db_sdn["_admin"]["operationalState"] = "ERROR"
385 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
386 self.update_db("sdns", sdn_id, db_sdn)
387
388 async def sdn_delete(self, sdn_id, order_id):
389 logging_text = "Task sdn_delete={} ".format(sdn_id)
390 self.logger.debug(logging_text + "Enter")
391 db_sdn = None
392 exc = None
393 step = "Getting sdn from db"
394 try:
395 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
396 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
397 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
398 RO = ROclient.ROClient(self.loop, **self.ro_config)
399 step = "Deleting sdn from RO"
400 try:
401 await RO.delete("sdn", RO_sdn_id)
402 except ROclient.ROClientException as e:
403 if e.http_code == 404: # not found
404 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
405 else:
406 raise
407 else:
408 # nothing to delete
409 self.logger.error(logging_text + "Skipping. There is not RO information at database")
410 self.db.del_one("sdns", {"_id": sdn_id})
411 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
412 return None
413
414 except (ROclient.ROClientException, DbException) as e:
415 self.logger.error(logging_text + "Exit Exception {}".format(e))
416 exc = e
417 except Exception as e:
418 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
419 exc = e
420 finally:
421 if exc and db_sdn:
422 db_sdn["_admin"]["operationalState"] = "ERROR"
423 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
424 self.update_db("sdns", sdn_id, db_sdn)
425
426 def vnfd2RO(self, vnfd, new_id=None):
427 """
428 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
429 :param vnfd: input vnfd
430 :param new_id: overrides vnf id if provided
431 :return: copy of vnfd
432 """
433 ci_file = None
434 try:
435 vnfd_RO = deepcopy(vnfd)
436 vnfd_RO.pop("_id", None)
437 vnfd_RO.pop("_admin", None)
438 if new_id:
439 vnfd_RO["id"] = new_id
440 for vdu in vnfd_RO["vdu"]:
441 if "cloud-init-file" in vdu:
442 base_folder = vnfd["_admin"]["storage"]
443 clout_init_file = "{}/{}/cloud_init/{}".format(
444 base_folder["folder"],
445 base_folder["pkg-dir"],
446 vdu["cloud-init-file"]
447 )
448 ci_file = self.fs.file_open(clout_init_file, "r")
449 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
450 clout_init_content = ci_file.read()
451 ci_file.close()
452 ci_file = None
453 vdu.pop("cloud-init-file", None)
454 vdu["cloud-init"] = clout_init_content
455 return vnfd_RO
456 except FsException as e:
457 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
458 finally:
459 if ci_file:
460 ci_file.close()
461
462 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, db_nslcmop, vnf_member_index, task=None):
463 """Update the lcm database with the status of the charm.
464
465 Updates the VNF's operational status with the state of the charm:
466 - blocked: The unit needs manual intervention
467 - maintenance: The unit is actively deploying/configuring
468 - waiting: The unit is waiting for another charm to be ready
469 - active: The unit is deployed, configured, and ready
470 - error: The charm has failed and needs attention.
471 - terminated: The charm has been destroyed
472 - removing,
473 - removed
474
475 Updates the network service's config-status to reflect the state of all
476 charms.
477 """
478 nsr_id = None
479 nslcmop_id = None
480 update_nsr = update_nslcmop = False
481 try:
482 nsr_id = db_nsr["_id"]
483 nslcmop_id = db_nslcmop["_id"]
484 nsr_lcm = db_nsr["_admin"]["deployed"]
485 ns_action = db_nslcmop["lcmOperationType"]
486 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
487 vnf_member_index)
488
489 if task:
490 if task.cancelled():
491 self.logger.debug(logging_text + " task Cancelled")
492 # TODO update db_nslcmop
493 return
494
495 if task.done():
496 exc = task.exception()
497 if exc:
498 self.logger.error(logging_text + " task Exception={}".format(exc))
499 if ns_action in ("instantiate", "terminate"):
500 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
501 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
502 elif ns_action == "action":
503 db_nslcmop["operationState"] = "FAILED"
504 db_nslcmop["detailedStatus"] = str(exc)
505 db_nslcmop["statusEnteredTime"] = time()
506 update_nslcmop = True
507 return
508
509 else:
510 self.logger.debug(logging_text + " task Done")
511 # TODO revise with Adam if action is finished and ok when task is done
512 if ns_action == "action":
513 db_nslcmop["operationState"] = "COMPLETED"
514 db_nslcmop["detailedStatus"] = "Done"
515 db_nslcmop["statusEnteredTime"] = time()
516 update_nslcmop = True
517 # task is Done, but callback is still ongoing. So ignore
518 return
519 elif workload_status:
520 self.logger.debug(logging_text + " Enter workload_status={}".format(workload_status))
521 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
522 return # same status, ignore
523 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
524 # TODO N2VC some error message in case of error should be obtained from N2VC
525 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
526 else:
527 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
528 return
529
530 some_failed = False
531 all_active = True
532 status_map = {}
533 for vnf_index, vca_info in nsr_lcm["VCA"].items():
534 vca_status = vca_info["operational-status"]
535 if vca_status not in status_map:
536 # Initialize it
537 status_map[vca_status] = 0
538 status_map[vca_status] += 1
539
540 if vca_status != "active":
541 all_active = False
542 if vca_status == "error":
543 some_failed = True
544 db_nsr["config-status"] = "failed"
545 error_text = "fail configuring vnf_index={} {}".format(vnf_member_index,
546 vca_info["detailed-status"])
547 db_nsr["detailed-status"] = error_text
548 db_nslcmop["operationState"] = "FAILED_TEMP"
549 db_nslcmop["detailedStatus"] = error_text
550 db_nslcmop["statusEnteredTime"] = time()
551 break
552
553 if all_active:
554 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
555 db_nsr["config-status"] = "configured"
556 db_nsr["detailed-status"] = "done"
557 db_nslcmop["operationState"] = "COMPLETED"
558 db_nslcmop["detailedStatus"] = "Done"
559 db_nslcmop["statusEnteredTime"] = time()
560 elif some_failed:
561 pass
562 else:
563 cs = "configuring: "
564 separator = ""
565 for status, num in status_map.items():
566 cs += separator + "{}: {}".format(status, num)
567 separator = ", "
568 db_nsr["config-status"] = cs
569 db_nslcmop["detailedStatus"] = cs
570 update_nsr = update_nslcmop = True
571
572 except Exception as e:
573 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
574 finally:
575 try:
576 if update_nslcmop:
577 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
578 if update_nsr:
579 self.update_db("nsrs", nsr_id, db_nsr)
580 except Exception as e:
581 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
582 vnf_member_index, e), exc_info=True)
583
584 def ns_params_2_RO(self, ns_params):
585 """
586 Creates a RO ns descriptor from OSM ns_instantite params
587 :param ns_params: OSM instantiate params
588 :return: The RO ns descriptor
589 """
590 vim_2_RO = {}
591 def vim_account_2_RO(vim_account):
592 if vim_account in vim_2_RO:
593 return vim_2_RO[vim_account]
594 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
595 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
596 # #TODO check if VIM is creating and wait
597 if db_vim["_admin"]["operationalState"] != "ENABLED":
598 raise LcmException("VIM={} is not available. operationalSstatus={}".format(
599 vim_account, db_vim["_admin"]["operationalState"]))
600 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
601 vim_2_RO[vim_account] = RO_vim_id
602 return RO_vim_id
603
604 if not ns_params:
605 return None
606 RO_ns_params = {
607 # "name": ns_params["nsName"],
608 # "description": ns_params.get("nsDescription"),
609 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
610 # "scenario": ns_params["nsdId"],
611 "vnfs": {},
612 "networks": {},
613 }
614 if ns_params.get("ssh-authorized-key"):
615 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
616 if ns_params.get("vnf"):
617 for vnf in ns_params["vnf"]:
618 RO_vnf = {}
619 if "vimAccountId" in vnf:
620 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
621 if RO_vnf:
622 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
623 if ns_params.get("vld"):
624 for vld in ns_params["vld"]:
625 RO_vld = {}
626 if "ip-profile" in vld:
627 RO_vld["ip-profile"] = vld["ip-profile"]
628 if "vim-network-name" in vld:
629 RO_vld["sites"] = []
630 if isinstance(vld["vim-network-name"], dict):
631 for vim_account, vim_net in vld["vim-network-name"].items():
632 RO_vld["sites"].append({
633 "netmap-use": vim_net,
634 "datacenter": vim_account_2_RO(vim_account)
635 })
636 else: #isinstance str
637 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
638 if RO_vld:
639 RO_ns_params["networks"][vld["name"]] = RO_vld
640 return RO_ns_params
641
642 async def ns_instantiate(self, nsr_id, nslcmop_id):
643 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
644 self.logger.debug(logging_text + "Enter")
645 # get all needed from database
646 db_nsr = None
647 db_nslcmop = None
648 exc = None
649 step = "Getting nsr, nslcmop, RO_vims from db"
650 try:
651 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
652 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
653 nsd = db_nsr["nsd"]
654 nsr_name = db_nsr["name"] # TODO short-name??
655
656 needed_vnfd = {}
657 for c_vnf in nsd["constituent-vnfd"]:
658 vnfd_id = c_vnf["vnfd-id-ref"]
659 if vnfd_id not in needed_vnfd:
660 step = "Getting vnfd={} from db".format(vnfd_id)
661 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
662
663 nsr_lcm = db_nsr["_admin"].get("deployed")
664 if not nsr_lcm:
665 nsr_lcm = db_nsr["_admin"]["deployed"] = {
666 "id": nsr_id,
667 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
668 "nsr_ip": {},
669 "VCA": {},
670 }
671 db_nsr["detailed-status"] = "creating"
672 db_nsr["operational-status"] = "init"
673
674 RO = ROclient.ROClient(self.loop, **self.ro_config)
675
676 # get vnfds, instantiate at RO
677 for vnfd_id, vnfd in needed_vnfd.items():
678 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
679 self.logger.debug(logging_text + step)
680 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
681
682 # look if present
683 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
684 if vnfd_list:
685 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
686 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
687 vnfd_id, vnfd_list[0]["uuid"]))
688 else:
689 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
690 desc = await RO.create("vnfd", descriptor=vnfd_RO)
691 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
692 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
693 self.update_db("nsrs", nsr_id, db_nsr)
694
695 # create nsd at RO
696 nsd_id = nsd["id"]
697 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
698 self.logger.debug(logging_text + step)
699
700 nsd_id_RO = nsd_id + "." + nsd_id[:200]
701 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
702 if nsd_list:
703 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
704 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
705 nsd_id, nsd_list[0]["uuid"]))
706 else:
707 nsd_RO = deepcopy(nsd)
708 nsd_RO["id"] = nsd_id_RO
709 nsd_RO.pop("_id", None)
710 nsd_RO.pop("_admin", None)
711 for c_vnf in nsd_RO["constituent-vnfd"]:
712 vnfd_id = c_vnf["vnfd-id-ref"]
713 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
714 desc = await RO.create("nsd", descriptor=nsd_RO)
715 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
716 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
717 self.update_db("nsrs", nsr_id, db_nsr)
718
719 # Crate ns at RO
720 # if present use it unless in error status
721 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
722 if RO_nsr_id:
723 try:
724 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
725 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
726 desc = await RO.show("ns", RO_nsr_id)
727 except ROclient.ROClientException as e:
728 if e.http_code != HTTPStatus.NOT_FOUND:
729 raise
730 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
731 if RO_nsr_id:
732 ns_status, ns_status_info = RO.check_ns_status(desc)
733 nsr_lcm["RO"]["nsr_status"] = ns_status
734 if ns_status == "ERROR":
735 step = db_nsr["detailed-status"] = "Deleting ns at RO"
736 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
737 await RO.delete("ns", RO_nsr_id)
738 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
739
740 if not RO_nsr_id:
741 step = db_nsr["detailed-status"] = "Creating ns at RO"
742 self.logger.debug(logging_text + step)
743 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
744 desc = await RO.create("ns", descriptor=RO_ns_params,
745 name=db_nsr["name"],
746 scenario=nsr_lcm["RO"]["nsd_id"])
747 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
748 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
749 nsr_lcm["RO"]["nsr_status"] = "BUILD"
750 self.update_db("nsrs", nsr_id, db_nsr)
751
752 # wait until NS is ready
753 step = ns_status_detailed = "Waiting ns ready at RO"
754 db_nsr["detailed-status"] = ns_status_detailed
755 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
756 deployment_timeout = 2*3600 # Two hours
757 while deployment_timeout > 0:
758 desc = await RO.show("ns", RO_nsr_id)
759 ns_status, ns_status_info = RO.check_ns_status(desc)
760 nsr_lcm["RO"]["nsr_status"] = ns_status
761 if ns_status == "ERROR":
762 raise ROclient.ROClientException(ns_status_info)
763 elif ns_status == "BUILD":
764 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
765 self.update_db("nsrs", nsr_id, db_nsr)
766 elif ns_status == "ACTIVE":
767 step = "Getting ns VNF management IP address"
768 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
769 break
770 else:
771 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
772
773 await asyncio.sleep(5, loop=self.loop)
774 deployment_timeout -= 5
775 if deployment_timeout <= 0:
776 raise ROclient.ROClientException("Timeout waiting ns to be ready")
777 db_nsr["detailed-status"] = "Configuring vnfr"
778 self.update_db("nsrs", nsr_id, db_nsr)
779
780 # The parameters we'll need to deploy a charm
781 number_to_configure = 0
782
783 def deploy():
784 """An inner function to deploy the charm from either vnf or vdu
785 """
786
787 # Login to the VCA.
788 # if number_to_configure == 0:
789 # self.logger.debug("Logging into N2VC...")
790 # task = asyncio.ensure_future(self.n2vc.login())
791 # yield from asyncio.wait_for(task, 30.0)
792 # self.logger.debug("Logged into N2VC!")
793
794 ## await self.n2vc.login()
795
796 # Note: The charm needs to exist on disk at the location
797 # specified by charm_path.
798 base_folder = vnfd["_admin"]["storage"]
799 storage_params = self.fs.get_params()
800 charm_path = "{}{}/{}/charms/{}".format(
801 storage_params["path"],
802 base_folder["folder"],
803 base_folder["pkg-dir"],
804 proxy_charm
805 )
806
807 # Setup the runtime parameters for this VNF
808 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip']["vnf"][vnf_index]
809
810 # ns_name will be ignored in the current version of N2VC
811 # but will be implemented for the next point release.
812 model_name = 'default'
813 application_name = self.n2vc.FormatApplicationName(
814 nsr_name,
815 vnf_index,
816 vnfd['name'],
817 )
818
819 nsr_lcm["VCA"][vnf_index] = {
820 "model": model_name,
821 "application": application_name,
822 "operational-status": "init",
823 "detailed-status": "",
824 "vnfd_id": vnfd_id,
825 }
826
827 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
828 task = asyncio.ensure_future(
829 self.n2vc.DeployCharms(
830 model_name, # The network service name
831 application_name, # The application name
832 vnfd, # The vnf descriptor
833 charm_path, # Path to charm
834 params, # Runtime params, like mgmt ip
835 {}, # for native charms only
836 self.n2vc_callback, # Callback for status changes
837 db_nsr, # Callback parameter
838 db_nslcmop,
839 vnf_index, # Callback parameter
840 None, # Callback parameter (task)
841 )
842 )
843 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
844 db_nsr, db_nslcmop, vnf_index))
845 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
846
847 # TODO: Make this call inside deploy()
848 # Login to the VCA. If there are multiple calls to login(),
849 # subsequent calls will be a nop and return immediately.
850 await self.n2vc.login()
851
852 step = "Looking for needed vnfd to configure"
853 self.logger.debug(logging_text + step)
854 for c_vnf in nsd["constituent-vnfd"]:
855 vnfd_id = c_vnf["vnfd-id-ref"]
856 vnf_index = str(c_vnf["member-vnf-index"])
857 vnfd = needed_vnfd[vnfd_id]
858
859 # Check if this VNF has a charm configuration
860 vnf_config = vnfd.get("vnf-configuration")
861
862 if vnf_config and vnf_config.get("juju"):
863 proxy_charm = vnf_config["juju"]["charm"]
864 params = {}
865
866 if proxy_charm:
867 if 'initial-config-primitive' in vnf_config:
868 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
869
870 deploy()
871 number_to_configure += 1
872
873 # Deploy charms for each VDU that supports one.
874 for vdu in vnfd['vdu']:
875 vdu_config = vdu.get('vdu-configuration')
876 proxy_charm = None
877 params = {}
878
879 if vdu_config and vdu_config.get("juju"):
880 proxy_charm = vdu_config["juju"]["charm"]
881
882 if 'initial-config-primitive' in vdu_config:
883 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
884
885 if proxy_charm:
886 deploy()
887 number_to_configure += 1
888
889 if number_to_configure:
890 db_nsr["config-status"] = "configuring"
891 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
892 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
893 else:
894 db_nslcmop["operationState"] = "COMPLETED"
895 db_nslcmop["detailed-status"] = "done"
896 db_nsr["config-status"] = "configured"
897 db_nsr["detailed-status"] = "done"
898 db_nsr["operational-status"] = "running"
899 self.update_db("nsrs", nsr_id, db_nsr)
900 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
901 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
902 return nsr_lcm
903
904 except (ROclient.ROClientException, DbException, LcmException) as e:
905 self.logger.error(logging_text + "Exit Exception {}".format(e))
906 exc = e
907 except Exception as e:
908 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
909 exc = e
910 finally:
911 if exc:
912 if db_nsr:
913 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
914 db_nsr["operational-status"] = "failed"
915 self.update_db("nsrs", nsr_id, db_nsr)
916 if db_nslcmop:
917 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
918 db_nslcmop["operationState"] = "FAILED"
919 db_nslcmop["statusEnteredTime"] = time()
920 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
921
922 async def ns_terminate(self, nsr_id, nslcmop_id):
923 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
924 self.logger.debug(logging_text + "Enter")
925 db_nsr = None
926 db_nslcmop = None
927 exc = None
928 step = "Getting nsr, nslcmop from db"
929 failed_detail = [] # annotates all failed error messages
930 vca_task_list = []
931 vca_task_dict = {}
932 try:
933 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
934 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
935 # nsd = db_nsr["nsd"]
936 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
937 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
938 return
939 # TODO ALF remove
940 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
941 # #TODO check if VIM is creating and wait
942 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
943
944 db_nsr_update = {
945 "operational-status": "terminating",
946 "config-status": "terminating",
947 "detailed-status": "Deleting charms",
948 }
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950
951 try:
952 self.logger.debug(logging_text + step)
953 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
954 if deploy_info and deploy_info.get("application"):
955 task = asyncio.ensure_future(
956 self.n2vc.RemoveCharms(
957 deploy_info['model'],
958 deploy_info['application'],
959 # self.n2vc_callback,
960 # db_nsr,
961 # db_nslcmop,
962 # vnf_index,
963 )
964 )
965 vca_task_list.append(task)
966 vca_task_dict[vnf_index] = task
967 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
968 # deploy_info['application'], None, db_nsr,
969 # db_nslcmop, vnf_index))
970 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
971 except Exception as e:
972 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
973 # remove from RO
974
975 RO = ROclient.ROClient(self.loop, **self.ro_config)
976 # Delete ns
977 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
978 if RO_nsr_id:
979 try:
980 step = db_nsr["detailed-status"] = "Deleting ns at RO"
981 self.logger.debug(logging_text + step)
982 desc = await RO.delete("ns", RO_nsr_id)
983 nsr_lcm["RO"]["nsr_id"] = None
984 nsr_lcm["RO"]["nsr_status"] = "DELETED"
985 except ROclient.ROClientException as e:
986 if e.http_code == 404: # not found
987 nsr_lcm["RO"]["nsr_id"] = None
988 nsr_lcm["RO"]["nsr_status"] = "DELETED"
989 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
990 elif e.http_code == 409: #conflict
991 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
992 self.logger.debug(logging_text + failed_detail[-1])
993 else:
994 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
995 self.logger.error(logging_text + failed_detail[-1])
996
997 # Delete nsd
998 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
999 if RO_nsd_id:
1000 try:
1001 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1002 desc = await RO.delete("nsd", RO_nsd_id)
1003 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1004 nsr_lcm["RO"]["nsd_id"] = None
1005 except ROclient.ROClientException as e:
1006 if e.http_code == 404: # not found
1007 nsr_lcm["RO"]["nsd_id"] = None
1008 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1009 elif e.http_code == 409: #conflict
1010 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1011 self.logger.debug(logging_text + failed_detail[-1])
1012 else:
1013 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1014 self.logger.error(logging_text + failed_detail[-1])
1015
1016 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1017 if not RO_vnfd_id:
1018 continue
1019 try:
1020 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1021 desc = await RO.delete("vnfd", RO_vnfd_id)
1022 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1023 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1024 except ROclient.ROClientException as e:
1025 if e.http_code == 404: # not found
1026 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1027 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1028 elif e.http_code == 409: #conflict
1029 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1030 self.logger.debug(logging_text + failed_detail[-1])
1031 else:
1032 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1033 self.logger.error(logging_text + failed_detail[-1])
1034
1035 if vca_task_list:
1036 await asyncio.wait(vca_task_list, timeout=300)
1037 for vnf_index, task in vca_task_dict.items():
1038 if task.cancelled():
1039 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1040 elif task.done():
1041 exc = task.exception()
1042 if exc:
1043 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1044 else:
1045 nsr_lcm["VCA"][vnf_index] = None
1046 else: # timeout
1047 # TODO Should it be cancelled?!!
1048 task.cancel()
1049 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1050
1051 if failed_detail:
1052 self.logger.error(logging_text + " ;".join(failed_detail))
1053 db_nsr_update = {
1054 "operational-status": "failed",
1055 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1056 "_admin": {"deployed": nsr_lcm, }
1057 }
1058 db_nslcmop_update = {
1059 "detailedStatus": "; ".join(failed_detail),
1060 "operationState": "FAILED",
1061 "statusEnteredTime": time()
1062 }
1063 elif db_nslcmop["operationParams"].get("autoremove"):
1064 self.db.del_one("nsrs", {"_id": nsr_id})
1065 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1066 else:
1067 db_nsr_update = {
1068 "operational-status": "terminated",
1069 "detailed-status": "Done",
1070 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1071 }
1072 db_nslcmop_update = {
1073 "detailedStatus": "Done",
1074 "operationState": "COMPLETED",
1075 "statusEnteredTime": time()
1076 }
1077 self.logger.debug(logging_text + "Exit")
1078
1079 except (ROclient.ROClientException, DbException) as e:
1080 self.logger.error(logging_text + "Exit Exception {}".format(e))
1081 exc = e
1082 except Exception as e:
1083 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1084 exc = e
1085 finally:
1086 if exc and db_nslcmop:
1087 db_nslcmop_update = {
1088 "detailed-status": "FAILED {}: {}".format(step, exc),
1089 "operationState": "FAILED",
1090 "statusEnteredTime": time(),
1091 }
1092 if db_nslcmop_update:
1093 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1094 if db_nsr_update:
1095 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1096
1097 async def ns_action(self, nsr_id, nslcmop_id):
1098 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1099 self.logger.debug(logging_text + "Enter")
1100 # get all needed from database
1101 db_nsr = None
1102 db_nslcmop = None
1103 db_nslcmop_update = None
1104 exc = None
1105 step = "Getting nsr, nslcmop"
1106 try:
1107 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1108 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1109 nsr_lcm = db_nsr["_admin"].get("deployed")
1110 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1111
1112 #TODO check if ns is in a proper status
1113 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1114 if not vca_deployed:
1115 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1116 model_name = vca_deployed.get("model")
1117 application_name = vca_deployed.get("application")
1118 if not model_name or not application_name:
1119 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1120 if vca_deployed["operational-status"] != "active":
1121 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1122 vnf_index, vca_deployed["operational-status"]))
1123 primitive = db_nslcmop["operationParams"]["primitive"]
1124 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1125 callback = None # self.n2vc_callback
1126 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1127 await self.n2vc.login()
1128 task = asyncio.ensure_future(
1129 self.n2vc.ExecutePrimitive(
1130 model_name,
1131 application_name,
1132 primitive, callback,
1133 *callback_args,
1134 **primitive_params
1135 )
1136 )
1137 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1138 # db_nsr, db_nslcmop, vnf_index))
1139 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1140 # wait until completed with timeout
1141 await asyncio.wait((task,), timeout=300)
1142
1143 result = "FAILED" # by default
1144 result_detail = ""
1145 if task.cancelled():
1146 db_nslcmop["detailedStatus"] = "Task has been cancelled"
1147 elif task.done():
1148 exc = task.exception()
1149 if exc:
1150 result_detail = str(exc)
1151 else:
1152 self.logger.debug(logging_text + " task Done")
1153 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1154 result = "COMPLETED"
1155 result_detail = "Done"
1156 else: # timeout
1157 # TODO Should it be cancelled?!!
1158 task.cancel()
1159 result_detail = "timeout"
1160
1161 db_nslcmop_update = {
1162 "detailedStatus": result_detail,
1163 "operationState": result,
1164 "statusEnteredTime": time()
1165 }
1166 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1167 return # database update is called inside finally
1168
1169 except (DbException, LcmException) as e:
1170 self.logger.error(logging_text + "Exit Exception {}".format(e))
1171 exc = e
1172 except Exception as e:
1173 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1174 exc = e
1175 finally:
1176 if exc and db_nslcmop:
1177 db_nslcmop_update = {
1178 "detailed-status": "FAILED {}: {}".format(step, exc),
1179 "operationState": "FAILED",
1180 "statusEnteredTime": time(),
1181 }
1182 if db_nslcmop_update:
1183 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1184
1185 async def test(self, param=None):
1186 self.logger.debug("Starting/Ending test task: {}".format(param))
1187
1188 def cancel_tasks(self, topic, _id):
1189 """
1190 Cancel all active tasks of a concrete nsr or vim identified for _id
1191 :param topic: can be ns or vim_account
1192 :param _id: nsr or vim identity
1193 :return: None, or raises an exception if not possible
1194 """
1195 if topic == "ns":
1196 lcm_tasks = self.lcm_ns_tasks
1197 elif topic== "vim_account":
1198 lcm_tasks = self.lcm_vim_tasks
1199 elif topic== "sdn":
1200 lcm_tasks = self.lcm_sdn_tasks
1201
1202 if not lcm_tasks.get(_id):
1203 return
1204 for order_id, tasks_set in lcm_tasks[_id].items():
1205 for task_name, task in tasks_set.items():
1206 result = task.cancel()
1207 if result:
1208 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1209 lcm_tasks[_id] = {}
1210
1211 async def kafka_ping(self):
1212 self.logger.debug("Task kafka_ping Enter")
1213 consecutive_errors = 0
1214 first_start = True
1215 kafka_has_received = False
1216 self.pings_not_received = 1
1217 while True:
1218 try:
1219 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1220 # time between pings are low when it is not received and at starting
1221 wait_time = 5 if not kafka_has_received else 120
1222 if not self.pings_not_received:
1223 kafka_has_received = True
1224 self.pings_not_received += 1
1225 await asyncio.sleep(wait_time, loop=self.loop)
1226 if self.pings_not_received > 10:
1227 raise LcmException("It is not receiving pings from Kafka bus")
1228 consecutive_errors = 0
1229 first_start = False
1230 except LcmException:
1231 raise
1232 except Exception as e:
1233 # if not first_start is the first time after starting. So leave more time and wait
1234 # to allow kafka starts
1235 if consecutive_errors == 8 if not first_start else 30:
1236 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1237 raise
1238 consecutive_errors += 1
1239 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1240 wait_time = 1 if not first_start else 5
1241 await asyncio.sleep(wait_time, loop=self.loop)
1242
1243 async def kafka_read(self):
1244 self.logger.debug("Task kafka_read Enter")
1245 order_id = 1
1246 # future = asyncio.Future()
1247 consecutive_errors = 0
1248 first_start = True
1249 while consecutive_errors < 10:
1250 try:
1251 topics = ("admin", "ns", "vim_account", "sdn")
1252 topic, command, params = await self.msg.aioread(topics, self.loop)
1253 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1254 consecutive_errors = 0
1255 first_start = False
1256 order_id += 1
1257 if command == "exit":
1258 print("Bye!")
1259 break
1260 elif command.startswith("#"):
1261 continue
1262 elif command == "echo":
1263 # just for test
1264 print(params)
1265 sys.stdout.flush()
1266 continue
1267 elif command == "test":
1268 asyncio.Task(self.test(params), loop=self.loop)
1269 continue
1270
1271 if topic == "admin":
1272 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1273 self.pings_not_received = 0
1274 continue
1275 elif topic == "ns":
1276 if command == "instantiate":
1277 # self.logger.debug("Deploying NS {}".format(nsr_id))
1278 nslcmop = params
1279 nslcmop_id = nslcmop["_id"]
1280 nsr_id = nslcmop["nsInstanceId"]
1281 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1282 if nsr_id not in self.lcm_ns_tasks:
1283 self.lcm_ns_tasks[nsr_id] = {}
1284 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1285 continue
1286 elif command == "terminate":
1287 # self.logger.debug("Deleting NS {}".format(nsr_id))
1288 nslcmop = params
1289 nslcmop_id = nslcmop["_id"]
1290 nsr_id = nslcmop["nsInstanceId"]
1291 self.cancel_tasks(topic, nsr_id)
1292 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1293 if nsr_id not in self.lcm_ns_tasks:
1294 self.lcm_ns_tasks[nsr_id] = {}
1295 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1296 continue
1297 elif command == "action":
1298 # self.logger.debug("Update NS {}".format(nsr_id))
1299 nslcmop = params
1300 nslcmop_id = nslcmop["_id"]
1301 nsr_id = nslcmop["nsInstanceId"]
1302 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1303 if nsr_id not in self.lcm_ns_tasks:
1304 self.lcm_ns_tasks[nsr_id] = {}
1305 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1306 continue
1307 elif command == "show":
1308 try:
1309 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1310 print(
1311 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1312 "{}\n deploy: {}\n tasks: {}".format(
1313 nsr_id, db_nsr["operational-status"],
1314 db_nsr["config-status"], db_nsr["detailed-status"],
1315 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1316 except Exception as e:
1317 print("nsr {} not found: {}".format(nsr_id, e))
1318 sys.stdout.flush()
1319 continue
1320 elif command == "deleted":
1321 continue # TODO cleaning of task just in case should be done
1322 elif topic == "vim_account":
1323 vim_id = params["_id"]
1324 if command == "create":
1325 task = asyncio.ensure_future(self.vim_create(params, order_id))
1326 if vim_id not in self.lcm_vim_tasks:
1327 self.lcm_vim_tasks[vim_id] = {}
1328 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1329 continue
1330 elif command == "delete":
1331 self.cancel_tasks(topic, vim_id)
1332 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1333 if vim_id not in self.lcm_vim_tasks:
1334 self.lcm_vim_tasks[vim_id] = {}
1335 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1336 continue
1337 elif command == "show":
1338 print("not implemented show with vim_account")
1339 sys.stdout.flush()
1340 continue
1341 elif command == "edit":
1342 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1343 if vim_id not in self.lcm_vim_tasks:
1344 self.lcm_vim_tasks[vim_id] = {}
1345 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1346 continue
1347 elif topic == "sdn":
1348 _sdn_id = params["_id"]
1349 if command == "create":
1350 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1351 if _sdn_id not in self.lcm_sdn_tasks:
1352 self.lcm_sdn_tasks[_sdn_id] = {}
1353 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1354 continue
1355 elif command == "delete":
1356 self.cancel_tasks(topic, _sdn_id)
1357 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1358 if _sdn_id not in self.lcm_sdn_tasks:
1359 self.lcm_sdn_tasks[_sdn_id] = {}
1360 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1361 continue
1362 elif command == "edit":
1363 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1364 if _sdn_id not in self.lcm_sdn_tasks:
1365 self.lcm_sdn_tasks[_sdn_id] = {}
1366 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1367 continue
1368 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1369 except Exception as e:
1370 # if not first_start is the first time after starting. So leave more time and wait
1371 # to allow kafka starts
1372 if consecutive_errors == 8 if not first_start else 30:
1373 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1374 raise
1375 consecutive_errors += 1
1376 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1377 wait_time = 2 if not first_start else 5
1378 await asyncio.sleep(wait_time, loop=self.loop)
1379
1380 # self.logger.debug("Task kafka_read terminating")
1381 self.logger.debug("Task kafka_read exit")
1382
1383 def start(self):
1384 self.loop = asyncio.get_event_loop()
1385 self.loop.run_until_complete(asyncio.gather(
1386 self.kafka_read(),
1387 self.kafka_ping()
1388 ))
1389 # TODO
1390 # self.logger.debug("Terminating cancelling creation tasks")
1391 # self.cancel_tasks("ALL", "create")
1392 # timeout = 200
1393 # while self.is_pending_tasks():
1394 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1395 # await asyncio.sleep(2, loop=self.loop)
1396 # timeout -= 2
1397 # if not timeout:
1398 # self.cancel_tasks("ALL", "ALL")
1399 self.loop.close()
1400 self.loop = None
1401 if self.db:
1402 self.db.db_disconnect()
1403 if self.msg:
1404 self.msg.disconnect()
1405 if self.fs:
1406 self.fs.fs_disconnect()
1407
1408
1409 def read_config_file(self, config_file):
1410 # TODO make a [ini] + yaml inside parser
1411 # the configparser library is not suitable, because it does not admit comments at the end of line,
1412 # and not parse integer or boolean
1413 try:
1414 with open(config_file) as f:
1415 conf = yaml.load(f)
1416 for k, v in environ.items():
1417 if not k.startswith("OSMLCM_"):
1418 continue
1419 k_items = k.lower().split("_")
1420 c = conf
1421 try:
1422 for k_item in k_items[1:-1]:
1423 if k_item in ("ro", "vca"):
1424 # put in capital letter
1425 k_item = k_item.upper()
1426 c = c[k_item]
1427 if k_items[-1] == "port":
1428 c[k_items[-1]] = int(v)
1429 else:
1430 c[k_items[-1]] = v
1431 except Exception as e:
1432 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1433
1434 return conf
1435 except Exception as e:
1436 self.logger.critical("At config file '{}': {}".format(config_file, e))
1437 exit(1)
1438
1439
1440 if __name__ == '__main__':
1441
1442 config_file = "lcm.cfg"
1443 lcm = Lcm(config_file)
1444
1445 lcm.start()