AWS vimconn: Allow config:flavor_data as str
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 from n2vc import version as N2VC_version
22 # import os.path
23 # import time
24
25 from copy import deepcopy
26 from http import HTTPStatus
27 from time import time
28
29
30 class LcmException(Exception):
31 pass
32
33
34 class Lcm:
35
36 def __init__(self, config_file):
37 """
38 Init, Connect to database, filesystem storage, and messaging
39 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
40 :return: None
41 """
42
43 self.db = None
44 self.msg = None
45 self.fs = None
46 self.pings_not_received = 1
47
48 # contains created tasks/futures to be able to cancel
49 self.lcm_ns_tasks = {}
50 self.lcm_vim_tasks = {}
51 self.lcm_sdn_tasks = {}
52 # logging
53 self.logger = logging.getLogger('lcm')
54 # load configuration
55 config = self.read_config_file(config_file)
56 self.config = config
57 self.ro_config={
58 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
59 "tenant": config.get("tenant", "osm"),
60 "logger_name": "lcm.ROclient",
61 "loglevel": "ERROR",
62 }
63
64 self.vca = config["VCA"] # TODO VCA
65 self.loop = None
66
67 # logging
68 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
69 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
70 config["database"]["logger_name"] = "lcm.db"
71 config["storage"]["logger_name"] = "lcm.fs"
72 config["message"]["logger_name"] = "lcm.msg"
73 if "logfile" in config["global"]:
74 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
75 maxBytes=100e6, backupCount=9, delay=0)
76 file_handler.setFormatter(log_formatter_simple)
77 self.logger.addHandler(file_handler)
78 else:
79 str_handler = logging.StreamHandler()
80 str_handler.setFormatter(log_formatter_simple)
81 self.logger.addHandler(str_handler)
82
83 if config["global"].get("loglevel"):
84 self.logger.setLevel(config["global"]["loglevel"])
85
86 # logging other modules
87 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
88 config[k1]["logger_name"] = logname
89 logger_module = logging.getLogger(logname)
90 if "logfile" in config[k1]:
91 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
92 maxBytes=100e6, backupCount=9, delay=0)
93 file_handler.setFormatter(log_formatter_simple)
94 logger_module.addHandler(file_handler)
95 if "loglevel" in config[k1]:
96 logger_module.setLevel(config[k1]["loglevel"])
97
98 self.n2vc = N2VC(
99 log=self.logger,
100 server=config['VCA']['host'],
101 port=config['VCA']['port'],
102 user=config['VCA']['user'],
103 secret=config['VCA']['secret'],
104 # TODO: This should point to the base folder where charms are stored,
105 # if there is a common one (like object storage). Otherwise, leave
106 # it unset and pass it via DeployCharms
107 # artifacts=config['VCA'][''],
108 artifacts=None,
109 )
110 # check version of N2VC
111 # TODO enhance with int conversion or from distutils.version import LooseVersion
112 # or with list(map(int, version.split(".")))
113 if N2VC_version < "0.0.2":
114 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
115 try:
116 if config["database"]["driver"] == "mongo":
117 self.db = dbmongo.DbMongo()
118 self.db.db_connect(config["database"])
119 elif config["database"]["driver"] == "memory":
120 self.db = dbmemory.DbMemory()
121 self.db.db_connect(config["database"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
124 config["database"]["driver"]))
125
126 if config["storage"]["driver"] == "local":
127 self.fs = fslocal.FsLocal()
128 self.fs.fs_connect(config["storage"])
129 else:
130 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
131 config["storage"]["driver"]))
132
133 if config["message"]["driver"] == "local":
134 self.msg = msglocal.MsgLocal()
135 self.msg.connect(config["message"])
136 elif config["message"]["driver"] == "kafka":
137 self.msg = msgkafka.MsgKafka()
138 self.msg.connect(config["message"])
139 else:
140 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
141 config["storage"]["driver"]))
142 except (DbException, FsException, MsgException) as e:
143 self.logger.critical(str(e), exc_info=True)
144 raise LcmException(str(e))
145
146 def update_db(self, item, _id, _desc):
147 try:
148 self.db.replace(item, _id, _desc)
149 except DbException as e:
150 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
151
152 def update_db_2(self, item, _id, _desc):
153 try:
154 self.db.set_one(item, {"_id": _id}, _desc)
155 except DbException as e:
156 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
157
158 async def vim_create(self, vim_content, order_id):
159 vim_id = vim_content["_id"]
160 logging_text = "Task vim_create={} ".format(vim_id)
161 self.logger.debug(logging_text + "Enter")
162 db_vim = None
163 exc = None
164 try:
165 step = "Getting vim from db"
166 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
167 if "_admin" not in db_vim:
168 db_vim["_admin"] = {}
169 if "deployed" not in db_vim["_admin"]:
170 db_vim["_admin"]["deployed"] = {}
171 db_vim["_admin"]["deployed"]["RO"] = None
172
173 step = "Creating vim at RO"
174 RO = ROclient.ROClient(self.loop, **self.ro_config)
175 vim_RO = deepcopy(vim_content)
176 vim_RO.pop("_id", None)
177 vim_RO.pop("_admin", None)
178 vim_RO.pop("schema_version", None)
179 vim_RO.pop("schema_type", None)
180 vim_RO.pop("vim_tenant_name", None)
181 vim_RO["type"] = vim_RO.pop("vim_type")
182 vim_RO.pop("vim_user", None)
183 vim_RO.pop("vim_password", None)
184 desc = await RO.create("vim", descriptor=vim_RO)
185 RO_vim_id = desc["uuid"]
186 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
187 self.update_db("vim_accounts", vim_id, db_vim)
188
189 step = "Attach vim to RO tenant"
190 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
191 "vim_username": vim_content["vim_user"],
192 "vim_password": vim_content["vim_password"],
193 "config": vim_content["config"]
194 }
195 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
196 db_vim["_admin"]["operationalState"] = "ENABLED"
197 self.update_db("vim_accounts", vim_id, db_vim)
198
199 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
200 return RO_vim_id
201
202 except (ROclient.ROClientException, DbException) as e:
203 self.logger.error(logging_text + "Exit Exception {}".format(e))
204 exc = e
205 except Exception as e:
206 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
207 exc = e
208 finally:
209 if exc and db_vim:
210 db_vim["_admin"]["operationalState"] = "ERROR"
211 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
212 self.update_db("vim_accounts", vim_id, db_vim)
213
214 async def vim_edit(self, vim_content, order_id):
215 vim_id = vim_content["_id"]
216 logging_text = "Task vim_edit={} ".format(vim_id)
217 self.logger.debug(logging_text + "Enter")
218 db_vim = None
219 exc = None
220 step = "Getting vim from db"
221 try:
222 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
223 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
224 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
225 step = "Editing vim at RO"
226 RO = ROclient.ROClient(self.loop, **self.ro_config)
227 vim_RO = deepcopy(vim_content)
228 vim_RO.pop("_id", None)
229 vim_RO.pop("_admin", None)
230 vim_RO.pop("schema_version", None)
231 vim_RO.pop("schema_type", None)
232 vim_RO.pop("vim_tenant_name", None)
233 vim_RO["type"] = vim_RO.pop("vim_type")
234 vim_RO.pop("vim_user", None)
235 vim_RO.pop("vim_password", None)
236 if vim_RO:
237 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
238
239 step = "Editing vim-account at RO tenant"
240 vim_RO = {}
241 for k in ("vim_tenant_name", "vim_password", "config"):
242 if k in vim_content:
243 vim_RO[k] = vim_content[k]
244 if "vim_user" in vim_content:
245 vim_content["vim_username"] = vim_content["vim_user"]
246 if vim_RO:
247 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
248 db_vim["_admin"]["operationalState"] = "ENABLED"
249 self.update_db("vim_accounts", vim_id, db_vim)
250
251 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
252 return RO_vim_id
253
254 except (ROclient.ROClientException, DbException) as e:
255 self.logger.error(logging_text + "Exit Exception {}".format(e))
256 exc = e
257 except Exception as e:
258 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
259 exc = e
260 finally:
261 if exc and db_vim:
262 db_vim["_admin"]["operationalState"] = "ERROR"
263 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
264 self.update_db("vim_accounts", vim_id, db_vim)
265
266 async def vim_delete(self, vim_id, order_id):
267 logging_text = "Task vim_delete={} ".format(vim_id)
268 self.logger.debug(logging_text + "Enter")
269 db_vim = None
270 exc = None
271 step = "Getting vim from db"
272 try:
273 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
274 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
275 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
276 RO = ROclient.ROClient(self.loop, **self.ro_config)
277 step = "Detaching vim from RO tenant"
278 try:
279 await RO.detach_datacenter(RO_vim_id)
280 except ROclient.ROClientException as e:
281 if e.http_code == 404: # not found
282 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
283 else:
284 raise
285
286 step = "Deleting vim from RO"
287 try:
288 await RO.delete("vim", RO_vim_id)
289 except ROclient.ROClientException as e:
290 if e.http_code == 404: # not found
291 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
292 else:
293 raise
294 else:
295 # nothing to delete
296 self.logger.error(logging_text + "Skipping. There is not RO information at database")
297 self.db.del_one("vim_accounts", {"_id": vim_id})
298 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
299 return None
300
301 except (ROclient.ROClientException, DbException) as e:
302 self.logger.error(logging_text + "Exit Exception {}".format(e))
303 exc = e
304 except Exception as e:
305 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
306 exc = e
307 finally:
308 if exc and db_vim:
309 db_vim["_admin"]["operationalState"] = "ERROR"
310 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
311 self.update_db("vim_accounts", vim_id, db_vim)
312
313 async def sdn_create(self, sdn_content, order_id):
314 sdn_id = sdn_content["_id"]
315 logging_text = "Task sdn_create={} ".format(sdn_id)
316 self.logger.debug(logging_text + "Enter")
317 db_sdn = None
318 exc = None
319 try:
320 step = "Getting sdn from db"
321 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
322 if "_admin" not in db_sdn:
323 db_sdn["_admin"] = {}
324 if "deployed" not in db_sdn["_admin"]:
325 db_sdn["_admin"]["deployed"] = {}
326 db_sdn["_admin"]["deployed"]["RO"] = None
327
328 step = "Creating sdn at RO"
329 RO = ROclient.ROClient(self.loop, **self.ro_config)
330 sdn_RO = deepcopy(sdn_content)
331 sdn_RO.pop("_id", None)
332 sdn_RO.pop("_admin", None)
333 sdn_RO.pop("schema_version", None)
334 sdn_RO.pop("schema_type", None)
335 sdn_RO.pop("description", None)
336 desc = await RO.create("sdn", descriptor=sdn_RO)
337 RO_sdn_id = desc["uuid"]
338 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
339 db_sdn["_admin"]["operationalState"] = "ENABLED"
340 self.update_db("sdns", sdn_id, db_sdn)
341 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
342 return RO_sdn_id
343
344 except (ROclient.ROClientException, DbException) as e:
345 self.logger.error(logging_text + "Exit Exception {}".format(e))
346 exc = e
347 except Exception as e:
348 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
349 exc = e
350 finally:
351 if exc and db_sdn:
352 db_sdn["_admin"]["operationalState"] = "ERROR"
353 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
354 self.update_db("sdns", sdn_id, db_sdn)
355
356 async def sdn_edit(self, sdn_content, order_id):
357 sdn_id = sdn_content["_id"]
358 logging_text = "Task sdn_edit={} ".format(sdn_id)
359 self.logger.debug(logging_text + "Enter")
360 db_sdn = None
361 exc = None
362 step = "Getting sdn from db"
363 try:
364 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
365 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
366 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
367 RO = ROclient.ROClient(self.loop, **self.ro_config)
368 step = "Editing sdn at RO"
369 sdn_RO = deepcopy(sdn_content)
370 sdn_RO.pop("_id", None)
371 sdn_RO.pop("_admin", None)
372 sdn_RO.pop("schema_version", None)
373 sdn_RO.pop("schema_type", None)
374 sdn_RO.pop("description", None)
375 if sdn_RO:
376 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
377 db_sdn["_admin"]["operationalState"] = "ENABLED"
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
381 return RO_sdn_id
382
383 except (ROclient.ROClientException, DbException) as e:
384 self.logger.error(logging_text + "Exit Exception {}".format(e))
385 exc = e
386 except Exception as e:
387 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
388 exc = e
389 finally:
390 if exc and db_sdn:
391 db_sdn["_admin"]["operationalState"] = "ERROR"
392 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
393 self.update_db("sdns", sdn_id, db_sdn)
394
395 async def sdn_delete(self, sdn_id, order_id):
396 logging_text = "Task sdn_delete={} ".format(sdn_id)
397 self.logger.debug(logging_text + "Enter")
398 db_sdn = None
399 exc = None
400 step = "Getting sdn from db"
401 try:
402 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
403 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
404 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
405 RO = ROclient.ROClient(self.loop, **self.ro_config)
406 step = "Deleting sdn from RO"
407 try:
408 await RO.delete("sdn", RO_sdn_id)
409 except ROclient.ROClientException as e:
410 if e.http_code == 404: # not found
411 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
412 else:
413 raise
414 else:
415 # nothing to delete
416 self.logger.error(logging_text + "Skipping. There is not RO information at database")
417 self.db.del_one("sdns", {"_id": sdn_id})
418 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
419 return None
420
421 except (ROclient.ROClientException, DbException) as e:
422 self.logger.error(logging_text + "Exit Exception {}".format(e))
423 exc = e
424 except Exception as e:
425 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
426 exc = e
427 finally:
428 if exc and db_sdn:
429 db_sdn["_admin"]["operationalState"] = "ERROR"
430 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
431 self.update_db("sdns", sdn_id, db_sdn)
432
433 def vnfd2RO(self, vnfd, new_id=None):
434 """
435 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
436 :param vnfd: input vnfd
437 :param new_id: overrides vnf id if provided
438 :return: copy of vnfd
439 """
440 ci_file = None
441 try:
442 vnfd_RO = deepcopy(vnfd)
443 vnfd_RO.pop("_id", None)
444 vnfd_RO.pop("_admin", None)
445 if new_id:
446 vnfd_RO["id"] = new_id
447 for vdu in vnfd_RO["vdu"]:
448 if "cloud-init-file" in vdu:
449 base_folder = vnfd["_admin"]["storage"]
450 clout_init_file = "{}/{}/cloud_init/{}".format(
451 base_folder["folder"],
452 base_folder["pkg-dir"],
453 vdu["cloud-init-file"]
454 )
455 ci_file = self.fs.file_open(clout_init_file, "r")
456 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
457 clout_init_content = ci_file.read()
458 ci_file.close()
459 ci_file = None
460 vdu.pop("cloud-init-file", None)
461 vdu["cloud-init"] = clout_init_content
462 return vnfd_RO
463 except FsException as e:
464 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
465 finally:
466 if ci_file:
467 ci_file.close()
468
469 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, vnf_member_index, task=None):
470 """
471 Callback both for charm status change and task completion
472 :param model_name: Charm model name
473 :param application_name: Charm application name
474 :param status: Can be
475 - blocked: The unit needs manual intervention
476 - maintenance: The unit is actively deploying/configuring
477 - waiting: The unit is waiting for another charm to be ready
478 - active: The unit is deployed, configured, and ready
479 - error: The charm has failed and needs attention.
480 - terminated: The charm has been destroyed
481 - removing,
482 - removed
483 :param message: detailed message error
484 :param db_nsr: nsr database content
485 :param db_nslcmop: nslcmop database content
486 :param vnf_member_index: NSD vnf-member-index
487 :param task: None for charm status change, or task for completion task callback
488 :return:
489 """
490 nsr_id = None
491 nslcmop_id = None
492 update_nsr = update_nslcmop = False
493 try:
494 nsr_id = db_nsr["_id"]
495 nslcmop_id = db_nslcmop["_id"]
496 nsr_lcm = db_nsr["_admin"]["deployed"]
497 ns_action = db_nslcmop["lcmOperationType"]
498 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
499 vnf_member_index)
500
501 if task:
502 if task.cancelled():
503 self.logger.debug(logging_text + " task Cancelled")
504 # TODO update db_nslcmop
505 return
506
507 if task.done():
508 exc = task.exception()
509 if exc:
510 self.logger.error(logging_text + " task Exception={}".format(exc))
511 if ns_action in ("instantiate", "terminate"):
512 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
513 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
514 elif ns_action == "action":
515 db_nslcmop["operationState"] = "FAILED"
516 db_nslcmop["detailed-status"] = str(exc)
517 db_nslcmop["statusEnteredTime"] = time()
518 update_nslcmop = True
519 return
520
521 else:
522 self.logger.debug(logging_text + " task Done")
523 # TODO revise with Adam if action is finished and ok when task is done
524 if ns_action == "action":
525 db_nslcmop["operationState"] = "COMPLETED"
526 db_nslcmop["detailed-status"] = "Done"
527 db_nslcmop["statusEnteredTime"] = time()
528 update_nslcmop = True
529 # task is Done, but callback is still ongoing. So ignore
530 return
531 elif status:
532 self.logger.debug(logging_text + " Enter status={}".format(status))
533 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == status:
534 return # same status, ignore
535 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = status
536 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(message)
537 else:
538 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
539 return
540
541 all_active = True
542 status_map = {}
543 n2vc_error_text = [] # contain text error list. If empty no one is in error status
544 for vnf_index, vca_info in nsr_lcm["VCA"].items():
545 vca_status = vca_info["operational-status"]
546 if vca_status not in status_map:
547 # Initialize it
548 status_map[vca_status] = 0
549 status_map[vca_status] += 1
550
551 if vca_status != "active":
552 all_active = False
553 elif vca_status in ("error", "blocked"):
554 n2vc_error_text.append("member_vnf_index={} {}: {}".format(vnf_member_index, vca_status,
555 vca_info["detailed-status"]))
556
557 if all_active:
558 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
559 db_nsr["config-status"] = "configured"
560 db_nsr["detailed-status"] = "done"
561 db_nslcmop["operationState"] = "COMPLETED"
562 db_nslcmop["detailed-status"] = "Done"
563 db_nslcmop["statusEnteredTime"] = time()
564 elif n2vc_error_text:
565 db_nsr["config-status"] = "failed"
566 error_text = "fail configuring " + ";".join(n2vc_error_text)
567 db_nsr["detailed-status"] = error_text
568 db_nslcmop["operationState"] = "FAILED_TEMP"
569 db_nslcmop["detailed-status"] = error_text
570 db_nslcmop["statusEnteredTime"] = time()
571 else:
572 cs = "configuring: "
573 separator = ""
574 for status, num in status_map.items():
575 cs += separator + "{}: {}".format(status, num)
576 separator = ", "
577 db_nsr["config-status"] = cs
578 db_nsr["detailed-status"] = cs
579 db_nslcmop["detailed-status"] = cs
580 update_nsr = update_nslcmop = True
581
582 except Exception as e:
583 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
584 finally:
585 try:
586 if update_nslcmop:
587 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
588 if update_nsr:
589 self.update_db("nsrs", nsr_id, db_nsr)
590 except Exception as e:
591 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
592 vnf_member_index, e), exc_info=True)
593
594 def ns_params_2_RO(self, ns_params):
595 """
596 Creates a RO ns descriptor from OSM ns_instantite params
597 :param ns_params: OSM instantiate params
598 :return: The RO ns descriptor
599 """
600 vim_2_RO = {}
601 def vim_account_2_RO(vim_account):
602 if vim_account in vim_2_RO:
603 return vim_2_RO[vim_account]
604 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
605 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
606 # #TODO check if VIM is creating and wait
607 if db_vim["_admin"]["operationalState"] != "ENABLED":
608 raise LcmException("VIM={} is not available. operationalState={}".format(
609 vim_account, db_vim["_admin"]["operationalState"]))
610 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
611 vim_2_RO[vim_account] = RO_vim_id
612 return RO_vim_id
613
614 if not ns_params:
615 return None
616 RO_ns_params = {
617 # "name": ns_params["nsName"],
618 # "description": ns_params.get("nsDescription"),
619 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
620 # "scenario": ns_params["nsdId"],
621 "vnfs": {},
622 "networks": {},
623 }
624 if ns_params.get("ssh-authorized-key"):
625 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
626 if ns_params.get("vnf"):
627 for vnf in ns_params["vnf"]:
628 RO_vnf = {}
629 if "vimAccountId" in vnf:
630 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
631 if RO_vnf:
632 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
633 if ns_params.get("vld"):
634 for vld in ns_params["vld"]:
635 RO_vld = {}
636 if "ip-profile" in vld:
637 RO_vld["ip-profile"] = vld["ip-profile"]
638 if "vim-network-name" in vld:
639 RO_vld["sites"] = []
640 if isinstance(vld["vim-network-name"], dict):
641 for vim_account, vim_net in vld["vim-network-name"].items():
642 RO_vld["sites"].append({
643 "netmap-use": vim_net,
644 "datacenter": vim_account_2_RO(vim_account)
645 })
646 else: #isinstance str
647 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
648 if RO_vld:
649 RO_ns_params["networks"][vld["name"]] = RO_vld
650 return RO_ns_params
651
652 async def ns_instantiate(self, nsr_id, nslcmop_id):
653 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
654 self.logger.debug(logging_text + "Enter")
655 # get all needed from database
656 db_nsr = None
657 db_nslcmop = None
658 db_vnfr = {}
659 exc = None
660 step = "Getting nsr, nslcmop, RO_vims from db"
661 try:
662 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
663 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
664 nsd = db_nsr["nsd"]
665 nsr_name = db_nsr["name"] # TODO short-name??
666 needed_vnfd = {}
667 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
668 for c_vnf in nsd["constituent-vnfd"]:
669 vnfd_id = c_vnf["vnfd-id-ref"]
670 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
671 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
672 if vnfd_id not in needed_vnfd:
673 step = "Getting vnfd={} from db".format(vnfd_id)
674 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
675
676 nsr_lcm = db_nsr["_admin"].get("deployed")
677 if not nsr_lcm:
678 nsr_lcm = db_nsr["_admin"]["deployed"] = {
679 "id": nsr_id,
680 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
681 "nsr_ip": {},
682 "VCA": {},
683 }
684 db_nsr["detailed-status"] = "creating"
685 db_nsr["operational-status"] = "init"
686
687 RO = ROclient.ROClient(self.loop, **self.ro_config)
688
689 # get vnfds, instantiate at RO
690 for vnfd_id, vnfd in needed_vnfd.items():
691 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
692 self.logger.debug(logging_text + step)
693 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
694
695 # look if present
696 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
697 if vnfd_list:
698 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
699 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
700 vnfd_id, vnfd_list[0]["uuid"]))
701 else:
702 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
703 desc = await RO.create("vnfd", descriptor=vnfd_RO)
704 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
705 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
706 self.update_db("nsrs", nsr_id, db_nsr)
707
708 # create nsd at RO
709 nsd_id = nsd["id"]
710 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
711 self.logger.debug(logging_text + step)
712
713 nsd_id_RO = nsd_id + "." + nsd_id[:200]
714 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
715 if nsd_list:
716 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
717 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
718 nsd_id, nsd_list[0]["uuid"]))
719 else:
720 nsd_RO = deepcopy(nsd)
721 nsd_RO["id"] = nsd_id_RO
722 nsd_RO.pop("_id", None)
723 nsd_RO.pop("_admin", None)
724 for c_vnf in nsd_RO["constituent-vnfd"]:
725 vnfd_id = c_vnf["vnfd-id-ref"]
726 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
727 desc = await RO.create("nsd", descriptor=nsd_RO)
728 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
729 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
730 self.update_db("nsrs", nsr_id, db_nsr)
731
732 # Crate ns at RO
733 # if present use it unless in error status
734 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
735 if RO_nsr_id:
736 try:
737 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
738 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
739 desc = await RO.show("ns", RO_nsr_id)
740 except ROclient.ROClientException as e:
741 if e.http_code != HTTPStatus.NOT_FOUND:
742 raise
743 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
744 if RO_nsr_id:
745 ns_status, ns_status_info = RO.check_ns_status(desc)
746 nsr_lcm["RO"]["nsr_status"] = ns_status
747 if ns_status == "ERROR":
748 step = db_nsr["detailed-status"] = "Deleting ns at RO"
749 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
750 await RO.delete("ns", RO_nsr_id)
751 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
752 if not RO_nsr_id:
753 step = db_nsr["detailed-status"] = "Creating ns at RO"
754 self.logger.debug(logging_text + step)
755 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
756 desc = await RO.create("ns", descriptor=RO_ns_params,
757 name=db_nsr["name"],
758 scenario=nsr_lcm["RO"]["nsd_id"])
759 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
760 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
761 nsr_lcm["RO"]["nsr_status"] = "BUILD"
762
763 self.update_db("nsrs", nsr_id, db_nsr)
764 # update VNFR vimAccount
765 step = "Updating VNFR vimAcccount"
766 for vnf_index, vnfr in db_vnfr.items():
767 if vnfr.get("vim-account-id"):
768 continue
769 if db_nsr["instantiate_params"].get("vnf") and db_nsr["instantiate_params"]["vnf"].get(vnf_index) \
770 and db_nsr["instantiate_params"]["vnf"][vnf_index].get("vimAccountId"):
771 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vnf"][vnf_index]["vimAccountId"]
772 else:
773 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
774 self.update_db("vnfrs", vnfr["_id"], vnfr)
775
776 # wait until NS is ready
777 step = ns_status_detailed = "Waiting ns ready at RO"
778 db_nsr["detailed-status"] = ns_status_detailed
779 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
780 deployment_timeout = 2*3600 # Two hours
781 while deployment_timeout > 0:
782 desc = await RO.show("ns", RO_nsr_id)
783 ns_status, ns_status_info = RO.check_ns_status(desc)
784 nsr_lcm["RO"]["nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 raise ROclient.ROClientException(ns_status_info)
787 elif ns_status == "BUILD":
788 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
789 self.update_db("nsrs", nsr_id, db_nsr)
790 elif ns_status == "ACTIVE":
791 step = "Getting ns VIM information"
792 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
793 break
794 else:
795 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
796
797 await asyncio.sleep(5, loop=self.loop)
798 deployment_timeout -= 5
799 if deployment_timeout <= 0:
800 raise ROclient.ROClientException("Timeout waiting ns to be ready")
801 step = "Updating VNFRs"
802 for vnf_index, vnfr_deployed in ns_RO_info.items():
803 vnfr = db_vnfr[vnf_index]
804 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
805 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
806 for vdur in vnfr["vdur"]:
807 if vdur["vdu-id-ref"] == vdu_id:
808 vdur["vim-id"] = vdu_deployed.get("vim_id")
809 vdur["ip-address"] = vdu_deployed.get("ip_address")
810 break
811 self.update_db("vnfrs", vnfr["_id"], vnfr)
812
813 db_nsr["detailed-status"] = "Configuring vnfr"
814 self.update_db("nsrs", nsr_id, db_nsr)
815
816 # The parameters we'll need to deploy a charm
817 number_to_configure = 0
818
819 def deploy():
820 """An inner function to deploy the charm from either vnf or vdu
821 """
822
823 # Login to the VCA.
824 # if number_to_configure == 0:
825 # self.logger.debug("Logging into N2VC...")
826 # task = asyncio.ensure_future(self.n2vc.login())
827 # yield from asyncio.wait_for(task, 30.0)
828 # self.logger.debug("Logged into N2VC!")
829
830 ## await self.n2vc.login()
831
832 # Note: The charm needs to exist on disk at the location
833 # specified by charm_path.
834 base_folder = vnfd["_admin"]["storage"]
835 storage_params = self.fs.get_params()
836 charm_path = "{}{}/{}/charms/{}".format(
837 storage_params["path"],
838 base_folder["folder"],
839 base_folder["pkg-dir"],
840 proxy_charm
841 )
842
843 # Setup the runtime parameters for this VNF
844 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
845
846 # ns_name will be ignored in the current version of N2VC
847 # but will be implemented for the next point release.
848 model_name = 'default'
849 application_name = self.n2vc.FormatApplicationName(
850 nsr_name,
851 vnf_index,
852 vnfd['name'],
853 )
854
855 nsr_lcm["VCA"][vnf_index] = {
856 "model": model_name,
857 "application": application_name,
858 "operational-status": "init",
859 "detailed-status": "",
860 "vnfd_id": vnfd_id,
861 }
862
863 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
864 task = asyncio.ensure_future(
865 self.n2vc.DeployCharms(
866 model_name, # The network service name
867 application_name, # The application name
868 vnfd, # The vnf descriptor
869 charm_path, # Path to charm
870 params, # Runtime params, like mgmt ip
871 {}, # for native charms only
872 self.n2vc_callback, # Callback for status changes
873 db_nsr, # Callback parameter
874 db_nslcmop,
875 vnf_index, # Callback parameter
876 None, # Callback parameter (task)
877 )
878 )
879 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
880 db_nsr, db_nslcmop, vnf_index))
881 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
882
883 # TODO: Make this call inside deploy()
884 # Login to the VCA. If there are multiple calls to login(),
885 # subsequent calls will be a nop and return immediately.
886 await self.n2vc.login()
887
888 step = "Looking for needed vnfd to configure"
889 self.logger.debug(logging_text + step)
890 for c_vnf in nsd["constituent-vnfd"]:
891 vnfd_id = c_vnf["vnfd-id-ref"]
892 vnf_index = str(c_vnf["member-vnf-index"])
893 vnfd = needed_vnfd[vnfd_id]
894
895 # Check if this VNF has a charm configuration
896 vnf_config = vnfd.get("vnf-configuration")
897
898 if vnf_config and vnf_config.get("juju"):
899 proxy_charm = vnf_config["juju"]["charm"]
900 params = {}
901
902 if proxy_charm:
903 if 'initial-config-primitive' in vnf_config:
904 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
905
906 deploy()
907 number_to_configure += 1
908
909 # Deploy charms for each VDU that supports one.
910 for vdu in vnfd['vdu']:
911 vdu_config = vdu.get('vdu-configuration')
912 proxy_charm = None
913 params = {}
914
915 if vdu_config and vdu_config.get("juju"):
916 proxy_charm = vdu_config["juju"]["charm"]
917
918 if 'initial-config-primitive' in vdu_config:
919 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
920
921 if proxy_charm:
922 deploy()
923 number_to_configure += 1
924
925 if number_to_configure:
926 db_nsr["config-status"] = "configuring"
927 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
928 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
929 else:
930 db_nslcmop["operationState"] = "COMPLETED"
931 db_nslcmop["detailed-status"] = "done"
932 db_nsr["config-status"] = "configured"
933 db_nsr["detailed-status"] = "done"
934 db_nsr["operational-status"] = "running"
935 self.update_db("nsrs", nsr_id, db_nsr)
936 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
937 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
938 return nsr_lcm
939
940 except (ROclient.ROClientException, DbException, LcmException) as e:
941 self.logger.error(logging_text + "Exit Exception {}".format(e))
942 exc = e
943 except Exception as e:
944 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
945 exc = e
946 finally:
947 if exc:
948 if db_nsr:
949 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
950 db_nsr["operational-status"] = "failed"
951 self.update_db("nsrs", nsr_id, db_nsr)
952 if db_nslcmop:
953 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
954 db_nslcmop["operationState"] = "FAILED"
955 db_nslcmop["statusEnteredTime"] = time()
956 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
957
958 async def ns_terminate(self, nsr_id, nslcmop_id):
959 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
960 self.logger.debug(logging_text + "Enter")
961 db_nsr = None
962 db_nslcmop = None
963 exc = None
964 step = "Getting nsr, nslcmop from db"
965 failed_detail = [] # annotates all failed error messages
966 vca_task_list = []
967 vca_task_dict = {}
968 try:
969 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
970 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
971 # nsd = db_nsr["nsd"]
972 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
973 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
974 return
975 # TODO ALF remove
976 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
977 # #TODO check if VIM is creating and wait
978 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
979
980 db_nsr_update = {
981 "operational-status": "terminating",
982 "config-status": "terminating",
983 "detailed-status": "Deleting charms",
984 }
985 self.update_db_2("nsrs", nsr_id, db_nsr_update)
986
987 try:
988 self.logger.debug(logging_text + step)
989 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
990 if deploy_info and deploy_info.get("application"):
991 task = asyncio.ensure_future(
992 self.n2vc.RemoveCharms(
993 deploy_info['model'],
994 deploy_info['application'],
995 # self.n2vc_callback,
996 # db_nsr,
997 # db_nslcmop,
998 # vnf_index,
999 )
1000 )
1001 vca_task_list.append(task)
1002 vca_task_dict[vnf_index] = task
1003 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1004 # deploy_info['application'], None, db_nsr,
1005 # db_nslcmop, vnf_index))
1006 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1007 except Exception as e:
1008 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1009 # remove from RO
1010
1011 RO = ROclient.ROClient(self.loop, **self.ro_config)
1012 # Delete ns
1013 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1014 if RO_nsr_id:
1015 try:
1016 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1017 self.logger.debug(logging_text + step)
1018 desc = await RO.delete("ns", RO_nsr_id)
1019 nsr_lcm["RO"]["nsr_id"] = None
1020 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1021 except ROclient.ROClientException as e:
1022 if e.http_code == 404: # not found
1023 nsr_lcm["RO"]["nsr_id"] = None
1024 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1025 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1026 elif e.http_code == 409: #conflict
1027 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1028 self.logger.debug(logging_text + failed_detail[-1])
1029 else:
1030 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1031 self.logger.error(logging_text + failed_detail[-1])
1032
1033 # Delete nsd
1034 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1035 if RO_nsd_id:
1036 try:
1037 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1038 desc = await RO.delete("nsd", RO_nsd_id)
1039 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1040 nsr_lcm["RO"]["nsd_id"] = None
1041 except ROclient.ROClientException as e:
1042 if e.http_code == 404: # not found
1043 nsr_lcm["RO"]["nsd_id"] = None
1044 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1045 elif e.http_code == 409: #conflict
1046 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1047 self.logger.debug(logging_text + failed_detail[-1])
1048 else:
1049 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1050 self.logger.error(logging_text + failed_detail[-1])
1051
1052 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1053 if not RO_vnfd_id:
1054 continue
1055 try:
1056 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1057 desc = await RO.delete("vnfd", RO_vnfd_id)
1058 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1059 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1060 except ROclient.ROClientException as e:
1061 if e.http_code == 404: # not found
1062 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1063 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1064 elif e.http_code == 409: #conflict
1065 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1066 self.logger.debug(logging_text + failed_detail[-1])
1067 else:
1068 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1069 self.logger.error(logging_text + failed_detail[-1])
1070
1071 if vca_task_list:
1072 await asyncio.wait(vca_task_list, timeout=300)
1073 for vnf_index, task in vca_task_dict.items():
1074 if task.cancelled():
1075 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1076 elif task.done():
1077 exc = task.exception()
1078 if exc:
1079 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1080 else:
1081 nsr_lcm["VCA"][vnf_index] = None
1082 else: # timeout
1083 # TODO Should it be cancelled?!!
1084 task.cancel()
1085 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1086
1087 if failed_detail:
1088 self.logger.error(logging_text + " ;".join(failed_detail))
1089 db_nsr_update = {
1090 "operational-status": "failed",
1091 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1092 "_admin": {"deployed": nsr_lcm, }
1093 }
1094 db_nslcmop_update = {
1095 "detailed-status": "; ".join(failed_detail),
1096 "operationState": "FAILED",
1097 "statusEnteredTime": time()
1098 }
1099 elif db_nslcmop["operationParams"].get("autoremove"):
1100 self.db.del_one("nsrs", {"_id": nsr_id})
1101 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1102 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1103 else:
1104 db_nsr_update = {
1105 "operational-status": "terminated",
1106 "detailed-status": "Done",
1107 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1108 }
1109 db_nslcmop_update = {
1110 "detailed-status": "Done",
1111 "operationState": "COMPLETED",
1112 "statusEnteredTime": time()
1113 }
1114 self.logger.debug(logging_text + "Exit")
1115
1116 except (ROclient.ROClientException, DbException) as e:
1117 self.logger.error(logging_text + "Exit Exception {}".format(e))
1118 exc = e
1119 except Exception as e:
1120 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1121 exc = e
1122 finally:
1123 if exc and db_nslcmop:
1124 db_nslcmop_update = {
1125 "detailed-status": "FAILED {}: {}".format(step, exc),
1126 "operationState": "FAILED",
1127 "statusEnteredTime": time(),
1128 }
1129 if db_nslcmop_update:
1130 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1131 if db_nsr_update:
1132 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1133
1134 async def ns_action(self, nsr_id, nslcmop_id):
1135 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1136 self.logger.debug(logging_text + "Enter")
1137 # get all needed from database
1138 db_nsr = None
1139 db_nslcmop = None
1140 db_nslcmop_update = None
1141 exc = None
1142 try:
1143 step = "Getting information from database"
1144 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1145 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1146 nsr_lcm = db_nsr["_admin"].get("deployed")
1147 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1148
1149 #TODO check if ns is in a proper status
1150 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1151 if not vca_deployed:
1152 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1153 model_name = vca_deployed.get("model")
1154 application_name = vca_deployed.get("application")
1155 if not model_name or not application_name:
1156 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1157 if vca_deployed["operational-status"] != "active":
1158 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1159 vnf_index, vca_deployed["operational-status"]))
1160 primitive = db_nslcmop["operationParams"]["primitive"]
1161 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1162 callback = None # self.n2vc_callback
1163 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1164 await self.n2vc.login()
1165 task = asyncio.ensure_future(
1166 self.n2vc.ExecutePrimitive(
1167 model_name,
1168 application_name,
1169 primitive, callback,
1170 *callback_args,
1171 **primitive_params
1172 )
1173 )
1174 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1175 # db_nsr, db_nslcmop, vnf_index))
1176 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1177 # wait until completed with timeout
1178 await asyncio.wait((task,), timeout=300)
1179
1180 result = "FAILED" # by default
1181 result_detail = ""
1182 if task.cancelled():
1183 db_nslcmop["detailed-status"] = "Task has been cancelled"
1184 elif task.done():
1185 exc = task.exception()
1186 if exc:
1187 result_detail = str(exc)
1188 else:
1189 self.logger.debug(logging_text + " task Done")
1190 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1191 result = "COMPLETED"
1192 result_detail = "Done"
1193 else: # timeout
1194 # TODO Should it be cancelled?!!
1195 task.cancel()
1196 result_detail = "timeout"
1197
1198 db_nslcmop_update = {
1199 "detailed-status": result_detail,
1200 "operationState": result,
1201 "statusEnteredTime": time()
1202 }
1203 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1204 return # database update is called inside finally
1205
1206 except (DbException, LcmException) as e:
1207 self.logger.error(logging_text + "Exit Exception {}".format(e))
1208 exc = e
1209 except Exception as e:
1210 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1211 exc = e
1212 finally:
1213 if exc and db_nslcmop:
1214 db_nslcmop_update = {
1215 "detailed-status": "FAILED {}: {}".format(step, exc),
1216 "operationState": "FAILED",
1217 "statusEnteredTime": time(),
1218 }
1219 if db_nslcmop_update:
1220 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1221
1222 async def test(self, param=None):
1223 self.logger.debug("Starting/Ending test task: {}".format(param))
1224
1225 def cancel_tasks(self, topic, _id):
1226 """
1227 Cancel all active tasks of a concrete nsr or vim identified for _id
1228 :param topic: can be ns or vim_account
1229 :param _id: nsr or vim identity
1230 :return: None, or raises an exception if not possible
1231 """
1232 if topic == "ns":
1233 lcm_tasks = self.lcm_ns_tasks
1234 elif topic== "vim_account":
1235 lcm_tasks = self.lcm_vim_tasks
1236 elif topic== "sdn":
1237 lcm_tasks = self.lcm_sdn_tasks
1238
1239 if not lcm_tasks.get(_id):
1240 return
1241 for order_id, tasks_set in lcm_tasks[_id].items():
1242 for task_name, task in tasks_set.items():
1243 result = task.cancel()
1244 if result:
1245 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1246 lcm_tasks[_id] = {}
1247
1248 async def kafka_ping(self):
1249 self.logger.debug("Task kafka_ping Enter")
1250 consecutive_errors = 0
1251 first_start = True
1252 kafka_has_received = False
1253 self.pings_not_received = 1
1254 while True:
1255 try:
1256 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1257 # time between pings are low when it is not received and at starting
1258 wait_time = 5 if not kafka_has_received else 120
1259 if not self.pings_not_received:
1260 kafka_has_received = True
1261 self.pings_not_received += 1
1262 await asyncio.sleep(wait_time, loop=self.loop)
1263 if self.pings_not_received > 10:
1264 raise LcmException("It is not receiving pings from Kafka bus")
1265 consecutive_errors = 0
1266 first_start = False
1267 except LcmException:
1268 raise
1269 except Exception as e:
1270 # if not first_start is the first time after starting. So leave more time and wait
1271 # to allow kafka starts
1272 if consecutive_errors == 8 if not first_start else 30:
1273 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1274 raise
1275 consecutive_errors += 1
1276 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1277 wait_time = 1 if not first_start else 5
1278 await asyncio.sleep(wait_time, loop=self.loop)
1279
1280 async def kafka_read(self):
1281 self.logger.debug("Task kafka_read Enter")
1282 order_id = 1
1283 # future = asyncio.Future()
1284 consecutive_errors = 0
1285 first_start = True
1286 while consecutive_errors < 10:
1287 try:
1288 topics = ("admin", "ns", "vim_account", "sdn")
1289 topic, command, params = await self.msg.aioread(topics, self.loop)
1290 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1291 consecutive_errors = 0
1292 first_start = False
1293 order_id += 1
1294 if command == "exit":
1295 print("Bye!")
1296 break
1297 elif command.startswith("#"):
1298 continue
1299 elif command == "echo":
1300 # just for test
1301 print(params)
1302 sys.stdout.flush()
1303 continue
1304 elif command == "test":
1305 asyncio.Task(self.test(params), loop=self.loop)
1306 continue
1307
1308 if topic == "admin":
1309 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1310 self.pings_not_received = 0
1311 continue
1312 elif topic == "ns":
1313 if command == "instantiate":
1314 # self.logger.debug("Deploying NS {}".format(nsr_id))
1315 nslcmop = params
1316 nslcmop_id = nslcmop["_id"]
1317 nsr_id = nslcmop["nsInstanceId"]
1318 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1319 if nsr_id not in self.lcm_ns_tasks:
1320 self.lcm_ns_tasks[nsr_id] = {}
1321 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1322 continue
1323 elif command == "terminate":
1324 # self.logger.debug("Deleting NS {}".format(nsr_id))
1325 nslcmop = params
1326 nslcmop_id = nslcmop["_id"]
1327 nsr_id = nslcmop["nsInstanceId"]
1328 self.cancel_tasks(topic, nsr_id)
1329 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1330 if nsr_id not in self.lcm_ns_tasks:
1331 self.lcm_ns_tasks[nsr_id] = {}
1332 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1333 continue
1334 elif command == "action":
1335 # self.logger.debug("Update NS {}".format(nsr_id))
1336 nslcmop = params
1337 nslcmop_id = nslcmop["_id"]
1338 nsr_id = nslcmop["nsInstanceId"]
1339 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1340 if nsr_id not in self.lcm_ns_tasks:
1341 self.lcm_ns_tasks[nsr_id] = {}
1342 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1343 continue
1344 elif command == "show":
1345 try:
1346 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1347 print(
1348 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1349 "{}\n deploy: {}\n tasks: {}".format(
1350 nsr_id, db_nsr["operational-status"],
1351 db_nsr["config-status"], db_nsr["detailed-status"],
1352 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1353 except Exception as e:
1354 print("nsr {} not found: {}".format(nsr_id, e))
1355 sys.stdout.flush()
1356 continue
1357 elif command == "deleted":
1358 continue # TODO cleaning of task just in case should be done
1359 elif topic == "vim_account":
1360 vim_id = params["_id"]
1361 if command == "create":
1362 task = asyncio.ensure_future(self.vim_create(params, order_id))
1363 if vim_id not in self.lcm_vim_tasks:
1364 self.lcm_vim_tasks[vim_id] = {}
1365 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1366 continue
1367 elif command == "delete":
1368 self.cancel_tasks(topic, vim_id)
1369 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1370 if vim_id not in self.lcm_vim_tasks:
1371 self.lcm_vim_tasks[vim_id] = {}
1372 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1373 continue
1374 elif command == "show":
1375 print("not implemented show with vim_account")
1376 sys.stdout.flush()
1377 continue
1378 elif command == "edit":
1379 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1380 if vim_id not in self.lcm_vim_tasks:
1381 self.lcm_vim_tasks[vim_id] = {}
1382 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1383 continue
1384 elif topic == "sdn":
1385 _sdn_id = params["_id"]
1386 if command == "create":
1387 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1388 if _sdn_id not in self.lcm_sdn_tasks:
1389 self.lcm_sdn_tasks[_sdn_id] = {}
1390 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1391 continue
1392 elif command == "delete":
1393 self.cancel_tasks(topic, _sdn_id)
1394 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1395 if _sdn_id not in self.lcm_sdn_tasks:
1396 self.lcm_sdn_tasks[_sdn_id] = {}
1397 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1398 continue
1399 elif command == "edit":
1400 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1401 if _sdn_id not in self.lcm_sdn_tasks:
1402 self.lcm_sdn_tasks[_sdn_id] = {}
1403 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1404 continue
1405 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1406 except Exception as e:
1407 # if not first_start is the first time after starting. So leave more time and wait
1408 # to allow kafka starts
1409 if consecutive_errors == 8 if not first_start else 30:
1410 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1411 raise
1412 consecutive_errors += 1
1413 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1414 wait_time = 2 if not first_start else 5
1415 await asyncio.sleep(wait_time, loop=self.loop)
1416
1417 # self.logger.debug("Task kafka_read terminating")
1418 self.logger.debug("Task kafka_read exit")
1419
1420 def start(self):
1421 self.loop = asyncio.get_event_loop()
1422 self.loop.run_until_complete(asyncio.gather(
1423 self.kafka_read(),
1424 self.kafka_ping()
1425 ))
1426 # TODO
1427 # self.logger.debug("Terminating cancelling creation tasks")
1428 # self.cancel_tasks("ALL", "create")
1429 # timeout = 200
1430 # while self.is_pending_tasks():
1431 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1432 # await asyncio.sleep(2, loop=self.loop)
1433 # timeout -= 2
1434 # if not timeout:
1435 # self.cancel_tasks("ALL", "ALL")
1436 self.loop.close()
1437 self.loop = None
1438 if self.db:
1439 self.db.db_disconnect()
1440 if self.msg:
1441 self.msg.disconnect()
1442 if self.fs:
1443 self.fs.fs_disconnect()
1444
1445
1446 def read_config_file(self, config_file):
1447 # TODO make a [ini] + yaml inside parser
1448 # the configparser library is not suitable, because it does not admit comments at the end of line,
1449 # and not parse integer or boolean
1450 try:
1451 with open(config_file) as f:
1452 conf = yaml.load(f)
1453 for k, v in environ.items():
1454 if not k.startswith("OSMLCM_"):
1455 continue
1456 k_items = k.lower().split("_")
1457 c = conf
1458 try:
1459 for k_item in k_items[1:-1]:
1460 if k_item in ("ro", "vca"):
1461 # put in capital letter
1462 k_item = k_item.upper()
1463 c = c[k_item]
1464 if k_items[-1] == "port":
1465 c[k_items[-1]] = int(v)
1466 else:
1467 c[k_items[-1]] = v
1468 except Exception as e:
1469 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1470
1471 return conf
1472 except Exception as e:
1473 self.logger.critical("At config file '{}': {}".format(config_file, e))
1474 exit(1)
1475
1476
1477 if __name__ == '__main__':
1478
1479 config_file = "lcm.cfg"
1480 lcm = Lcm(config_file)
1481
1482 lcm.start()