LWB Added vnfR support
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 from n2vc import version as N2VC_version
22 # import os.path
23 # import time
24
25 from copy import deepcopy
26 from http import HTTPStatus
27 from time import time
28
29
30 class LcmException(Exception):
31 pass
32
33
34 class Lcm:
35
36 def __init__(self, config_file):
37 """
38 Init, Connect to database, filesystem storage, and messaging
39 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
40 :return: None
41 """
42
43 self.db = None
44 self.msg = None
45 self.fs = None
46 self.pings_not_received = 1
47
48 # contains created tasks/futures to be able to cancel
49 self.lcm_ns_tasks = {}
50 self.lcm_vim_tasks = {}
51 self.lcm_sdn_tasks = {}
52 # logging
53 self.logger = logging.getLogger('lcm')
54 # load configuration
55 config = self.read_config_file(config_file)
56 self.config = config
57 self.ro_config={
58 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
59 "tenant": config.get("tenant", "osm"),
60 "logger_name": "lcm.ROclient",
61 "loglevel": "ERROR",
62 }
63
64 self.vca = config["VCA"] # TODO VCA
65 self.loop = None
66
67 # logging
68 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
69 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
70 config["database"]["logger_name"] = "lcm.db"
71 config["storage"]["logger_name"] = "lcm.fs"
72 config["message"]["logger_name"] = "lcm.msg"
73 if "logfile" in config["global"]:
74 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
75 maxBytes=100e6, backupCount=9, delay=0)
76 file_handler.setFormatter(log_formatter_simple)
77 self.logger.addHandler(file_handler)
78 else:
79 str_handler = logging.StreamHandler()
80 str_handler.setFormatter(log_formatter_simple)
81 self.logger.addHandler(str_handler)
82
83 if config["global"].get("loglevel"):
84 self.logger.setLevel(config["global"]["loglevel"])
85
86 # logging other modules
87 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
88 config[k1]["logger_name"] = logname
89 logger_module = logging.getLogger(logname)
90 if "logfile" in config[k1]:
91 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
92 maxBytes=100e6, backupCount=9, delay=0)
93 file_handler.setFormatter(log_formatter_simple)
94 logger_module.addHandler(file_handler)
95 if "loglevel" in config[k1]:
96 logger_module.setLevel(config[k1]["loglevel"])
97
98 self.n2vc = N2VC(
99 log=self.logger,
100 server=config['VCA']['host'],
101 port=config['VCA']['port'],
102 user=config['VCA']['user'],
103 secret=config['VCA']['secret'],
104 # TODO: This should point to the base folder where charms are stored,
105 # if there is a common one (like object storage). Otherwise, leave
106 # it unset and pass it via DeployCharms
107 # artifacts=config['VCA'][''],
108 artifacts=None,
109 )
110 # check version of N2VC
111 # TODO enhance with int conversion or from distutils.version import LooseVersion
112 # or with list(map(int, version.split(".")))
113 if N2VC_version < "0.0.2":
114 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
115 try:
116 if config["database"]["driver"] == "mongo":
117 self.db = dbmongo.DbMongo()
118 self.db.db_connect(config["database"])
119 elif config["database"]["driver"] == "memory":
120 self.db = dbmemory.DbMemory()
121 self.db.db_connect(config["database"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
124 config["database"]["driver"]))
125
126 if config["storage"]["driver"] == "local":
127 self.fs = fslocal.FsLocal()
128 self.fs.fs_connect(config["storage"])
129 else:
130 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
131 config["storage"]["driver"]))
132
133 if config["message"]["driver"] == "local":
134 self.msg = msglocal.MsgLocal()
135 self.msg.connect(config["message"])
136 elif config["message"]["driver"] == "kafka":
137 self.msg = msgkafka.MsgKafka()
138 self.msg.connect(config["message"])
139 else:
140 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
141 config["storage"]["driver"]))
142 except (DbException, FsException, MsgException) as e:
143 self.logger.critical(str(e), exc_info=True)
144 raise LcmException(str(e))
145
146 def update_db(self, item, _id, _desc):
147 try:
148 self.db.replace(item, _id, _desc)
149 except DbException as e:
150 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
151
152 def update_db_2(self, item, _id, _desc):
153 try:
154 self.db.set_one(item, {"_id": _id}, _desc)
155 except DbException as e:
156 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
157
158 async def vim_create(self, vim_content, order_id):
159 vim_id = vim_content["_id"]
160 logging_text = "Task vim_create={} ".format(vim_id)
161 self.logger.debug(logging_text + "Enter")
162 db_vim = None
163 exc = None
164 try:
165 step = "Getting vim from db"
166 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
167 if "_admin" not in db_vim:
168 db_vim["_admin"] = {}
169 if "deployed" not in db_vim["_admin"]:
170 db_vim["_admin"]["deployed"] = {}
171 db_vim["_admin"]["deployed"]["RO"] = None
172
173 step = "Creating vim at RO"
174 RO = ROclient.ROClient(self.loop, **self.ro_config)
175 vim_RO = deepcopy(vim_content)
176 vim_RO.pop("_id", None)
177 vim_RO.pop("_admin", None)
178 vim_RO.pop("schema_version", None)
179 vim_RO.pop("schema_type", None)
180 vim_RO.pop("vim_tenant_name", None)
181 vim_RO["type"] = vim_RO.pop("vim_type")
182 vim_RO.pop("vim_user", None)
183 vim_RO.pop("vim_password", None)
184 desc = await RO.create("vim", descriptor=vim_RO)
185 RO_vim_id = desc["uuid"]
186 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
187 self.update_db("vim_accounts", vim_id, db_vim)
188
189 step = "Attach vim to RO tenant"
190 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
191 "vim_username": vim_content["vim_user"],
192 "vim_password": vim_content["vim_password"],
193 "config": vim_content["config"]
194 }
195 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
196 db_vim["_admin"]["operationalState"] = "ENABLED"
197 self.update_db("vim_accounts", vim_id, db_vim)
198
199 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
200 return RO_vim_id
201
202 except (ROclient.ROClientException, DbException) as e:
203 self.logger.error(logging_text + "Exit Exception {}".format(e))
204 exc = e
205 except Exception as e:
206 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
207 exc = e
208 finally:
209 if exc and db_vim:
210 db_vim["_admin"]["operationalState"] = "ERROR"
211 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
212 self.update_db("vim_accounts", vim_id, db_vim)
213
214 async def vim_edit(self, vim_content, order_id):
215 vim_id = vim_content["_id"]
216 logging_text = "Task vim_edit={} ".format(vim_id)
217 self.logger.debug(logging_text + "Enter")
218 db_vim = None
219 exc = None
220 step = "Getting vim from db"
221 try:
222 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
223 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
224 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
225 step = "Editing vim at RO"
226 RO = ROclient.ROClient(self.loop, **self.ro_config)
227 vim_RO = deepcopy(vim_content)
228 vim_RO.pop("_id", None)
229 vim_RO.pop("_admin", None)
230 vim_RO.pop("schema_version", None)
231 vim_RO.pop("schema_type", None)
232 vim_RO.pop("vim_tenant_name", None)
233 vim_RO["type"] = vim_RO.pop("vim_type")
234 vim_RO.pop("vim_user", None)
235 vim_RO.pop("vim_password", None)
236 if vim_RO:
237 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
238
239 step = "Editing vim-account at RO tenant"
240 vim_RO = {}
241 for k in ("vim_tenant_name", "vim_password", "config"):
242 if k in vim_content:
243 vim_RO[k] = vim_content[k]
244 if "vim_user" in vim_content:
245 vim_content["vim_username"] = vim_content["vim_user"]
246 if vim_RO:
247 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
248 db_vim["_admin"]["operationalState"] = "ENABLED"
249 self.update_db("vim_accounts", vim_id, db_vim)
250
251 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
252 return RO_vim_id
253
254 except (ROclient.ROClientException, DbException) as e:
255 self.logger.error(logging_text + "Exit Exception {}".format(e))
256 exc = e
257 except Exception as e:
258 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
259 exc = e
260 finally:
261 if exc and db_vim:
262 db_vim["_admin"]["operationalState"] = "ERROR"
263 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
264 self.update_db("vim_accounts", vim_id, db_vim)
265
266 async def vim_delete(self, vim_id, order_id):
267 logging_text = "Task vim_delete={} ".format(vim_id)
268 self.logger.debug(logging_text + "Enter")
269 db_vim = None
270 exc = None
271 step = "Getting vim from db"
272 try:
273 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
274 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
275 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
276 RO = ROclient.ROClient(self.loop, **self.ro_config)
277 step = "Detaching vim from RO tenant"
278 try:
279 await RO.detach_datacenter(RO_vim_id)
280 except ROclient.ROClientException as e:
281 if e.http_code == 404: # not found
282 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
283 else:
284 raise
285
286 step = "Deleting vim from RO"
287 try:
288 await RO.delete("vim", RO_vim_id)
289 except ROclient.ROClientException as e:
290 if e.http_code == 404: # not found
291 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
292 else:
293 raise
294 else:
295 # nothing to delete
296 self.logger.error(logging_text + "Skipping. There is not RO information at database")
297 self.db.del_one("vim_accounts", {"_id": vim_id})
298 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
299 return None
300
301 except (ROclient.ROClientException, DbException) as e:
302 self.logger.error(logging_text + "Exit Exception {}".format(e))
303 exc = e
304 except Exception as e:
305 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
306 exc = e
307 finally:
308 if exc and db_vim:
309 db_vim["_admin"]["operationalState"] = "ERROR"
310 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
311 self.update_db("vim_accounts", vim_id, db_vim)
312
313 async def sdn_create(self, sdn_content, order_id):
314 sdn_id = sdn_content["_id"]
315 logging_text = "Task sdn_create={} ".format(sdn_id)
316 self.logger.debug(logging_text + "Enter")
317 db_sdn = None
318 exc = None
319 try:
320 step = "Getting sdn from db"
321 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
322 if "_admin" not in db_sdn:
323 db_sdn["_admin"] = {}
324 if "deployed" not in db_sdn["_admin"]:
325 db_sdn["_admin"]["deployed"] = {}
326 db_sdn["_admin"]["deployed"]["RO"] = None
327
328 step = "Creating sdn at RO"
329 RO = ROclient.ROClient(self.loop, **self.ro_config)
330 sdn_RO = deepcopy(sdn_content)
331 sdn_RO.pop("_id", None)
332 sdn_RO.pop("_admin", None)
333 sdn_RO.pop("schema_version", None)
334 sdn_RO.pop("schema_type", None)
335 desc = await RO.create("sdn", descriptor=sdn_RO)
336 RO_sdn_id = desc["uuid"]
337 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
338 db_sdn["_admin"]["operationalState"] = "ENABLED"
339 self.update_db("sdns", sdn_id, db_sdn)
340 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
341 return RO_sdn_id
342
343 except (ROclient.ROClientException, DbException) as e:
344 self.logger.error(logging_text + "Exit Exception {}".format(e))
345 exc = e
346 except Exception as e:
347 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
348 exc = e
349 finally:
350 if exc and db_sdn:
351 db_sdn["_admin"]["operationalState"] = "ERROR"
352 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
353 self.update_db("sdns", sdn_id, db_sdn)
354
355 async def sdn_edit(self, sdn_content, order_id):
356 sdn_id = sdn_content["_id"]
357 logging_text = "Task sdn_edit={} ".format(sdn_id)
358 self.logger.debug(logging_text + "Enter")
359 db_sdn = None
360 exc = None
361 step = "Getting sdn from db"
362 try:
363 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
364 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
365 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
366 RO = ROclient.ROClient(self.loop, **self.ro_config)
367 step = "Editing sdn at RO"
368 sdn_RO = deepcopy(sdn_content)
369 sdn_RO.pop("_id", None)
370 sdn_RO.pop("_admin", None)
371 sdn_RO.pop("schema_version", None)
372 sdn_RO.pop("schema_type", None)
373 if sdn_RO:
374 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
375 db_sdn["_admin"]["operationalState"] = "ENABLED"
376 self.update_db("sdns", sdn_id, db_sdn)
377
378 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
379 return RO_sdn_id
380
381 except (ROclient.ROClientException, DbException) as e:
382 self.logger.error(logging_text + "Exit Exception {}".format(e))
383 exc = e
384 except Exception as e:
385 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
386 exc = e
387 finally:
388 if exc and db_sdn:
389 db_sdn["_admin"]["operationalState"] = "ERROR"
390 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
391 self.update_db("sdns", sdn_id, db_sdn)
392
393 async def sdn_delete(self, sdn_id, order_id):
394 logging_text = "Task sdn_delete={} ".format(sdn_id)
395 self.logger.debug(logging_text + "Enter")
396 db_sdn = None
397 exc = None
398 step = "Getting sdn from db"
399 try:
400 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
401 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
402 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
403 RO = ROclient.ROClient(self.loop, **self.ro_config)
404 step = "Deleting sdn from RO"
405 try:
406 await RO.delete("sdn", RO_sdn_id)
407 except ROclient.ROClientException as e:
408 if e.http_code == 404: # not found
409 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
410 else:
411 raise
412 else:
413 # nothing to delete
414 self.logger.error(logging_text + "Skipping. There is not RO information at database")
415 self.db.del_one("sdns", {"_id": sdn_id})
416 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
417 return None
418
419 except (ROclient.ROClientException, DbException) as e:
420 self.logger.error(logging_text + "Exit Exception {}".format(e))
421 exc = e
422 except Exception as e:
423 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
424 exc = e
425 finally:
426 if exc and db_sdn:
427 db_sdn["_admin"]["operationalState"] = "ERROR"
428 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
429 self.update_db("sdns", sdn_id, db_sdn)
430
431 def vnfd2RO(self, vnfd, new_id=None):
432 """
433 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
434 :param vnfd: input vnfd
435 :param new_id: overrides vnf id if provided
436 :return: copy of vnfd
437 """
438 ci_file = None
439 try:
440 vnfd_RO = deepcopy(vnfd)
441 vnfd_RO.pop("_id", None)
442 vnfd_RO.pop("_admin", None)
443 if new_id:
444 vnfd_RO["id"] = new_id
445 for vdu in vnfd_RO["vdu"]:
446 if "cloud-init-file" in vdu:
447 base_folder = vnfd["_admin"]["storage"]
448 clout_init_file = "{}/{}/cloud_init/{}".format(
449 base_folder["folder"],
450 base_folder["pkg-dir"],
451 vdu["cloud-init-file"]
452 )
453 ci_file = self.fs.file_open(clout_init_file, "r")
454 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
455 clout_init_content = ci_file.read()
456 ci_file.close()
457 ci_file = None
458 vdu.pop("cloud-init-file", None)
459 vdu["cloud-init"] = clout_init_content
460 return vnfd_RO
461 except FsException as e:
462 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
463 finally:
464 if ci_file:
465 ci_file.close()
466
467 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, vnf_member_index, task=None):
468 """
469 Callback both for charm status change and task completion
470 :param model_name: Charm model name
471 :param application_name: Charm application name
472 :param status: Can be
473 - blocked: The unit needs manual intervention
474 - maintenance: The unit is actively deploying/configuring
475 - waiting: The unit is waiting for another charm to be ready
476 - active: The unit is deployed, configured, and ready
477 - error: The charm has failed and needs attention.
478 - terminated: The charm has been destroyed
479 - removing,
480 - removed
481 :param message: detailed message error
482 :param db_nsr: nsr database content
483 :param db_nslcmop: nslcmop database content
484 :param vnf_member_index: NSD vnf-member-index
485 :param task: None for charm status change, or task for completion task callback
486 :return:
487 """
488 nsr_id = None
489 nslcmop_id = None
490 update_nsr = update_nslcmop = False
491 try:
492 nsr_id = db_nsr["_id"]
493 nslcmop_id = db_nslcmop["_id"]
494 nsr_lcm = db_nsr["_admin"]["deployed"]
495 ns_action = db_nslcmop["lcmOperationType"]
496 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
497 vnf_member_index)
498
499 if task:
500 if task.cancelled():
501 self.logger.debug(logging_text + " task Cancelled")
502 # TODO update db_nslcmop
503 return
504
505 if task.done():
506 exc = task.exception()
507 if exc:
508 self.logger.error(logging_text + " task Exception={}".format(exc))
509 if ns_action in ("instantiate", "terminate"):
510 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
511 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
512 elif ns_action == "action":
513 db_nslcmop["operationState"] = "FAILED"
514 db_nslcmop["detailed-status"] = str(exc)
515 db_nslcmop["statusEnteredTime"] = time()
516 update_nslcmop = True
517 return
518
519 else:
520 self.logger.debug(logging_text + " task Done")
521 # TODO revise with Adam if action is finished and ok when task is done
522 if ns_action == "action":
523 db_nslcmop["operationState"] = "COMPLETED"
524 db_nslcmop["detailed-status"] = "Done"
525 db_nslcmop["statusEnteredTime"] = time()
526 update_nslcmop = True
527 # task is Done, but callback is still ongoing. So ignore
528 return
529 elif status:
530 self.logger.debug(logging_text + " Enter status={}".format(status))
531 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == status:
532 return # same status, ignore
533 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = status
534 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(message)
535 else:
536 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
537 return
538
539 all_active = True
540 status_map = {}
541 n2vc_error_text = [] # contain text error list. If empty no one is in error status
542 for vnf_index, vca_info in nsr_lcm["VCA"].items():
543 vca_status = vca_info["operational-status"]
544 if vca_status not in status_map:
545 # Initialize it
546 status_map[vca_status] = 0
547 status_map[vca_status] += 1
548
549 if vca_status != "active":
550 all_active = False
551 elif vca_status in ("error", "blocked"):
552 n2vc_error_text.append("member_vnf_index={} {}: {}".format(vnf_member_index, vca_status,
553 vca_info["detailed-status"]))
554
555 if all_active:
556 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
557 db_nsr["config-status"] = "configured"
558 db_nsr["detailed-status"] = "done"
559 db_nslcmop["operationState"] = "COMPLETED"
560 db_nslcmop["detailed-status"] = "Done"
561 db_nslcmop["statusEnteredTime"] = time()
562 elif n2vc_error_text:
563 db_nsr["config-status"] = "failed"
564 error_text = "fail configuring " + ";".join(n2vc_error_text)
565 db_nsr["detailed-status"] = error_text
566 db_nslcmop["operationState"] = "FAILED_TEMP"
567 db_nslcmop["detailed-status"] = error_text
568 db_nslcmop["statusEnteredTime"] = time()
569 else:
570 cs = "configuring: "
571 separator = ""
572 for status, num in status_map.items():
573 cs += separator + "{}: {}".format(status, num)
574 separator = ", "
575 db_nsr["config-status"] = cs
576 db_nsr["detailed-status"] = cs
577 db_nslcmop["detailed-status"] = cs
578 update_nsr = update_nslcmop = True
579
580 except Exception as e:
581 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
582 finally:
583 try:
584 if update_nslcmop:
585 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
586 if update_nsr:
587 self.update_db("nsrs", nsr_id, db_nsr)
588 except Exception as e:
589 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
590 vnf_member_index, e), exc_info=True)
591
592 def ns_params_2_RO(self, ns_params):
593 """
594 Creates a RO ns descriptor from OSM ns_instantite params
595 :param ns_params: OSM instantiate params
596 :return: The RO ns descriptor
597 """
598 vim_2_RO = {}
599 def vim_account_2_RO(vim_account):
600 if vim_account in vim_2_RO:
601 return vim_2_RO[vim_account]
602 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
603 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
604 # #TODO check if VIM is creating and wait
605 if db_vim["_admin"]["operationalState"] != "ENABLED":
606 raise LcmException("VIM={} is not available. operationalState={}".format(
607 vim_account, db_vim["_admin"]["operationalState"]))
608 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
609 vim_2_RO[vim_account] = RO_vim_id
610 return RO_vim_id
611
612 if not ns_params:
613 return None
614 RO_ns_params = {
615 # "name": ns_params["nsName"],
616 # "description": ns_params.get("nsDescription"),
617 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
618 # "scenario": ns_params["nsdId"],
619 "vnfs": {},
620 "networks": {},
621 }
622 if ns_params.get("ssh-authorized-key"):
623 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
624 if ns_params.get("vnf"):
625 for vnf in ns_params["vnf"]:
626 RO_vnf = {}
627 if "vimAccountId" in vnf:
628 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
629 if RO_vnf:
630 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
631 if ns_params.get("vld"):
632 for vld in ns_params["vld"]:
633 RO_vld = {}
634 if "ip-profile" in vld:
635 RO_vld["ip-profile"] = vld["ip-profile"]
636 if "vim-network-name" in vld:
637 RO_vld["sites"] = []
638 if isinstance(vld["vim-network-name"], dict):
639 for vim_account, vim_net in vld["vim-network-name"].items():
640 RO_vld["sites"].append({
641 "netmap-use": vim_net,
642 "datacenter": vim_account_2_RO(vim_account)
643 })
644 else: #isinstance str
645 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
646 if RO_vld:
647 RO_ns_params["networks"][vld["name"]] = RO_vld
648 return RO_ns_params
649
650 async def ns_instantiate(self, nsr_id, nslcmop_id):
651 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
652 self.logger.debug(logging_text + "Enter")
653 # get all needed from database
654 db_nsr = None
655 db_nslcmop = None
656 db_vnfr = {}
657 exc = None
658 step = "Getting nsr, nslcmop, RO_vims from db"
659 try:
660 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
661 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
662 nsd = db_nsr["nsd"]
663 nsr_name = db_nsr["name"] # TODO short-name??
664 needed_vnfd = {}
665 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
666 for c_vnf in nsd["constituent-vnfd"]:
667 vnfd_id = c_vnf["vnfd-id-ref"]
668 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
669 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
670 if vnfd_id not in needed_vnfd:
671 step = "Getting vnfd={} from db".format(vnfd_id)
672 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
673
674 nsr_lcm = db_nsr["_admin"].get("deployed")
675 if not nsr_lcm:
676 nsr_lcm = db_nsr["_admin"]["deployed"] = {
677 "id": nsr_id,
678 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
679 "nsr_ip": {},
680 "VCA": {},
681 }
682 db_nsr["detailed-status"] = "creating"
683 db_nsr["operational-status"] = "init"
684
685 RO = ROclient.ROClient(self.loop, **self.ro_config)
686
687 # get vnfds, instantiate at RO
688 for vnfd_id, vnfd in needed_vnfd.items():
689 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
690 self.logger.debug(logging_text + step)
691 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
692
693 # look if present
694 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
695 if vnfd_list:
696 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
697 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
698 vnfd_id, vnfd_list[0]["uuid"]))
699 else:
700 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
701 desc = await RO.create("vnfd", descriptor=vnfd_RO)
702 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
703 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
704 self.update_db("nsrs", nsr_id, db_nsr)
705
706 # create nsd at RO
707 nsd_id = nsd["id"]
708 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
709 self.logger.debug(logging_text + step)
710
711 nsd_id_RO = nsd_id + "." + nsd_id[:200]
712 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
713 if nsd_list:
714 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
715 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
716 nsd_id, nsd_list[0]["uuid"]))
717 else:
718 nsd_RO = deepcopy(nsd)
719 nsd_RO["id"] = nsd_id_RO
720 nsd_RO.pop("_id", None)
721 nsd_RO.pop("_admin", None)
722 for c_vnf in nsd_RO["constituent-vnfd"]:
723 vnfd_id = c_vnf["vnfd-id-ref"]
724 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
725 desc = await RO.create("nsd", descriptor=nsd_RO)
726 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
727 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
728 self.update_db("nsrs", nsr_id, db_nsr)
729
730 # Crate ns at RO
731 # if present use it unless in error status
732 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
733 if RO_nsr_id:
734 try:
735 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
736 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
737 desc = await RO.show("ns", RO_nsr_id)
738 except ROclient.ROClientException as e:
739 if e.http_code != HTTPStatus.NOT_FOUND:
740 raise
741 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
742 if RO_nsr_id:
743 ns_status, ns_status_info = RO.check_ns_status(desc)
744 nsr_lcm["RO"]["nsr_status"] = ns_status
745 if ns_status == "ERROR":
746 step = db_nsr["detailed-status"] = "Deleting ns at RO"
747 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
748 await RO.delete("ns", RO_nsr_id)
749 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
750 if not RO_nsr_id:
751 step = db_nsr["detailed-status"] = "Creating ns at RO"
752 self.logger.debug(logging_text + step)
753 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
754 desc = await RO.create("ns", descriptor=RO_ns_params,
755 name=db_nsr["name"],
756 scenario=nsr_lcm["RO"]["nsd_id"])
757 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
758 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
759 nsr_lcm["RO"]["nsr_status"] = "BUILD"
760
761 self.update_db("nsrs", nsr_id, db_nsr)
762 # update VNFR vimAccount
763 step = "Updating VNFR vimAcccount"
764 for vnf_index, vnfr in db_vnfr.items():
765 if vnfr.get("vim-account-id"):
766 continue
767 if db_nsr["instantiate_params"].get("vnf") and db_nsr["instantiate_params"]["vnf"].get(vnf_index) \
768 and db_nsr["instantiate_params"]["vnf"][vnf_index].get("vimAccountId"):
769 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vnf"][vnf_index]["vimAccountId"]
770 else:
771 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
772 self.update_db("vnfrs", vnfr["_id"], vnfr)
773
774 # wait until NS is ready
775 step = ns_status_detailed = "Waiting ns ready at RO"
776 db_nsr["detailed-status"] = ns_status_detailed
777 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
778 deployment_timeout = 2*3600 # Two hours
779 while deployment_timeout > 0:
780 desc = await RO.show("ns", RO_nsr_id)
781 ns_status, ns_status_info = RO.check_ns_status(desc)
782 nsr_lcm["RO"]["nsr_status"] = ns_status
783 if ns_status == "ERROR":
784 raise ROclient.ROClientException(ns_status_info)
785 elif ns_status == "BUILD":
786 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
787 self.update_db("nsrs", nsr_id, db_nsr)
788 elif ns_status == "ACTIVE":
789 step = "Getting ns VIM information"
790 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
791 break
792 else:
793 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
794
795 await asyncio.sleep(5, loop=self.loop)
796 deployment_timeout -= 5
797 if deployment_timeout <= 0:
798 raise ROclient.ROClientException("Timeout waiting ns to be ready")
799 step = "Updating VNFRs"
800 for vnf_index, vnfr_deployed in ns_RO_info.items():
801 vnfr = db_vnfr[vnf_index]
802 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
803 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
804 for vdur in vnfr["vdur"]:
805 if vdur["vdu-id-ref"] == vdu_id:
806 vdur["vim-id"] = vdu_deployed.get("vim_id")
807 vdur["ip-address"] = vdu_deployed.get("ip_address")
808 break
809 self.update_db("vnfrs", vnfr["_id"], vnfr)
810
811 db_nsr["detailed-status"] = "Configuring vnfr"
812 self.update_db("nsrs", nsr_id, db_nsr)
813
814 # The parameters we'll need to deploy a charm
815 number_to_configure = 0
816
817 def deploy():
818 """An inner function to deploy the charm from either vnf or vdu
819 """
820
821 # Login to the VCA.
822 # if number_to_configure == 0:
823 # self.logger.debug("Logging into N2VC...")
824 # task = asyncio.ensure_future(self.n2vc.login())
825 # yield from asyncio.wait_for(task, 30.0)
826 # self.logger.debug("Logged into N2VC!")
827
828 ## await self.n2vc.login()
829
830 # Note: The charm needs to exist on disk at the location
831 # specified by charm_path.
832 base_folder = vnfd["_admin"]["storage"]
833 storage_params = self.fs.get_params()
834 charm_path = "{}{}/{}/charms/{}".format(
835 storage_params["path"],
836 base_folder["folder"],
837 base_folder["pkg-dir"],
838 proxy_charm
839 )
840
841 # Setup the runtime parameters for this VNF
842 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
843
844 # ns_name will be ignored in the current version of N2VC
845 # but will be implemented for the next point release.
846 model_name = 'default'
847 application_name = self.n2vc.FormatApplicationName(
848 nsr_name,
849 vnf_index,
850 vnfd['name'],
851 )
852
853 nsr_lcm["VCA"][vnf_index] = {
854 "model": model_name,
855 "application": application_name,
856 "operational-status": "init",
857 "detailed-status": "",
858 "vnfd_id": vnfd_id,
859 }
860
861 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
862 task = asyncio.ensure_future(
863 self.n2vc.DeployCharms(
864 model_name, # The network service name
865 application_name, # The application name
866 vnfd, # The vnf descriptor
867 charm_path, # Path to charm
868 params, # Runtime params, like mgmt ip
869 {}, # for native charms only
870 self.n2vc_callback, # Callback for status changes
871 db_nsr, # Callback parameter
872 db_nslcmop,
873 vnf_index, # Callback parameter
874 None, # Callback parameter (task)
875 )
876 )
877 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
878 db_nsr, db_nslcmop, vnf_index))
879 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
880
881 # TODO: Make this call inside deploy()
882 # Login to the VCA. If there are multiple calls to login(),
883 # subsequent calls will be a nop and return immediately.
884 await self.n2vc.login()
885
886 step = "Looking for needed vnfd to configure"
887 self.logger.debug(logging_text + step)
888 for c_vnf in nsd["constituent-vnfd"]:
889 vnfd_id = c_vnf["vnfd-id-ref"]
890 vnf_index = str(c_vnf["member-vnf-index"])
891 vnfd = needed_vnfd[vnfd_id]
892
893 # Check if this VNF has a charm configuration
894 vnf_config = vnfd.get("vnf-configuration")
895
896 if vnf_config and vnf_config.get("juju"):
897 proxy_charm = vnf_config["juju"]["charm"]
898 params = {}
899
900 if proxy_charm:
901 if 'initial-config-primitive' in vnf_config:
902 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
903
904 deploy()
905 number_to_configure += 1
906
907 # Deploy charms for each VDU that supports one.
908 for vdu in vnfd['vdu']:
909 vdu_config = vdu.get('vdu-configuration')
910 proxy_charm = None
911 params = {}
912
913 if vdu_config and vdu_config.get("juju"):
914 proxy_charm = vdu_config["juju"]["charm"]
915
916 if 'initial-config-primitive' in vdu_config:
917 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
918
919 if proxy_charm:
920 deploy()
921 number_to_configure += 1
922
923 if number_to_configure:
924 db_nsr["config-status"] = "configuring"
925 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
926 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
927 else:
928 db_nslcmop["operationState"] = "COMPLETED"
929 db_nslcmop["detailed-status"] = "done"
930 db_nsr["config-status"] = "configured"
931 db_nsr["detailed-status"] = "done"
932 db_nsr["operational-status"] = "running"
933 self.update_db("nsrs", nsr_id, db_nsr)
934 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
935 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
936 return nsr_lcm
937
938 except (ROclient.ROClientException, DbException, LcmException) as e:
939 self.logger.error(logging_text + "Exit Exception {}".format(e))
940 exc = e
941 except Exception as e:
942 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
943 exc = e
944 finally:
945 if exc:
946 if db_nsr:
947 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
948 db_nsr["operational-status"] = "failed"
949 self.update_db("nsrs", nsr_id, db_nsr)
950 if db_nslcmop:
951 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
952 db_nslcmop["operationState"] = "FAILED"
953 db_nslcmop["statusEnteredTime"] = time()
954 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
955
956 async def ns_terminate(self, nsr_id, nslcmop_id):
957 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
958 self.logger.debug(logging_text + "Enter")
959 db_nsr = None
960 db_nslcmop = None
961 exc = None
962 step = "Getting nsr, nslcmop from db"
963 failed_detail = [] # annotates all failed error messages
964 vca_task_list = []
965 vca_task_dict = {}
966 try:
967 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
968 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
969 # nsd = db_nsr["nsd"]
970 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
971 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
972 return
973 # TODO ALF remove
974 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
975 # #TODO check if VIM is creating and wait
976 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
977
978 db_nsr_update = {
979 "operational-status": "terminating",
980 "config-status": "terminating",
981 "detailed-status": "Deleting charms",
982 }
983 self.update_db_2("nsrs", nsr_id, db_nsr_update)
984
985 try:
986 self.logger.debug(logging_text + step)
987 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
988 if deploy_info and deploy_info.get("application"):
989 task = asyncio.ensure_future(
990 self.n2vc.RemoveCharms(
991 deploy_info['model'],
992 deploy_info['application'],
993 # self.n2vc_callback,
994 # db_nsr,
995 # db_nslcmop,
996 # vnf_index,
997 )
998 )
999 vca_task_list.append(task)
1000 vca_task_dict[vnf_index] = task
1001 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1002 # deploy_info['application'], None, db_nsr,
1003 # db_nslcmop, vnf_index))
1004 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1005 except Exception as e:
1006 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1007 # remove from RO
1008
1009 RO = ROclient.ROClient(self.loop, **self.ro_config)
1010 # Delete ns
1011 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1012 if RO_nsr_id:
1013 try:
1014 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1015 self.logger.debug(logging_text + step)
1016 desc = await RO.delete("ns", RO_nsr_id)
1017 nsr_lcm["RO"]["nsr_id"] = None
1018 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1019 except ROclient.ROClientException as e:
1020 if e.http_code == 404: # not found
1021 nsr_lcm["RO"]["nsr_id"] = None
1022 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1023 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1024 elif e.http_code == 409: #conflict
1025 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1026 self.logger.debug(logging_text + failed_detail[-1])
1027 else:
1028 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1029 self.logger.error(logging_text + failed_detail[-1])
1030
1031 # Delete nsd
1032 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1033 if RO_nsd_id:
1034 try:
1035 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1036 desc = await RO.delete("nsd", RO_nsd_id)
1037 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1038 nsr_lcm["RO"]["nsd_id"] = None
1039 except ROclient.ROClientException as e:
1040 if e.http_code == 404: # not found
1041 nsr_lcm["RO"]["nsd_id"] = None
1042 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1043 elif e.http_code == 409: #conflict
1044 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1045 self.logger.debug(logging_text + failed_detail[-1])
1046 else:
1047 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1048 self.logger.error(logging_text + failed_detail[-1])
1049
1050 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1051 if not RO_vnfd_id:
1052 continue
1053 try:
1054 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1055 desc = await RO.delete("vnfd", RO_vnfd_id)
1056 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1057 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1058 except ROclient.ROClientException as e:
1059 if e.http_code == 404: # not found
1060 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1061 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1062 elif e.http_code == 409: #conflict
1063 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1064 self.logger.debug(logging_text + failed_detail[-1])
1065 else:
1066 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1067 self.logger.error(logging_text + failed_detail[-1])
1068
1069 if vca_task_list:
1070 await asyncio.wait(vca_task_list, timeout=300)
1071 for vnf_index, task in vca_task_dict.items():
1072 if task.cancelled():
1073 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1074 elif task.done():
1075 exc = task.exception()
1076 if exc:
1077 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1078 else:
1079 nsr_lcm["VCA"][vnf_index] = None
1080 else: # timeout
1081 # TODO Should it be cancelled?!!
1082 task.cancel()
1083 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1084
1085 if failed_detail:
1086 self.logger.error(logging_text + " ;".join(failed_detail))
1087 db_nsr_update = {
1088 "operational-status": "failed",
1089 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1090 "_admin": {"deployed": nsr_lcm, }
1091 }
1092 db_nslcmop_update = {
1093 "detailed-status": "; ".join(failed_detail),
1094 "operationState": "FAILED",
1095 "statusEnteredTime": time()
1096 }
1097 elif db_nslcmop["operationParams"].get("autoremove"):
1098 self.db.del_one("nsrs", {"_id": nsr_id})
1099 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1100 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1101 else:
1102 db_nsr_update = {
1103 "operational-status": "terminated",
1104 "detailed-status": "Done",
1105 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1106 }
1107 db_nslcmop_update = {
1108 "detailed-status": "Done",
1109 "operationState": "COMPLETED",
1110 "statusEnteredTime": time()
1111 }
1112 self.logger.debug(logging_text + "Exit")
1113
1114 except (ROclient.ROClientException, DbException) as e:
1115 self.logger.error(logging_text + "Exit Exception {}".format(e))
1116 exc = e
1117 except Exception as e:
1118 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1119 exc = e
1120 finally:
1121 if exc and db_nslcmop:
1122 db_nslcmop_update = {
1123 "detailed-status": "FAILED {}: {}".format(step, exc),
1124 "operationState": "FAILED",
1125 "statusEnteredTime": time(),
1126 }
1127 if db_nslcmop_update:
1128 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1129 if db_nsr_update:
1130 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1131
1132 async def ns_action(self, nsr_id, nslcmop_id):
1133 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1134 self.logger.debug(logging_text + "Enter")
1135 # get all needed from database
1136 db_nsr = None
1137 db_nslcmop = None
1138 db_nslcmop_update = None
1139 exc = None
1140 try:
1141 step = "Getting information from database"
1142 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1143 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1144 nsr_lcm = db_nsr["_admin"].get("deployed")
1145 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1146
1147 #TODO check if ns is in a proper status
1148 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1149 if not vca_deployed:
1150 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1151 model_name = vca_deployed.get("model")
1152 application_name = vca_deployed.get("application")
1153 if not model_name or not application_name:
1154 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1155 if vca_deployed["operational-status"] != "active":
1156 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1157 vnf_index, vca_deployed["operational-status"]))
1158 primitive = db_nslcmop["operationParams"]["primitive"]
1159 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1160 callback = None # self.n2vc_callback
1161 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1162 await self.n2vc.login()
1163 task = asyncio.ensure_future(
1164 self.n2vc.ExecutePrimitive(
1165 model_name,
1166 application_name,
1167 primitive, callback,
1168 *callback_args,
1169 **primitive_params
1170 )
1171 )
1172 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1173 # db_nsr, db_nslcmop, vnf_index))
1174 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1175 # wait until completed with timeout
1176 await asyncio.wait((task,), timeout=300)
1177
1178 result = "FAILED" # by default
1179 result_detail = ""
1180 if task.cancelled():
1181 db_nslcmop["detailed-status"] = "Task has been cancelled"
1182 elif task.done():
1183 exc = task.exception()
1184 if exc:
1185 result_detail = str(exc)
1186 else:
1187 self.logger.debug(logging_text + " task Done")
1188 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1189 result = "COMPLETED"
1190 result_detail = "Done"
1191 else: # timeout
1192 # TODO Should it be cancelled?!!
1193 task.cancel()
1194 result_detail = "timeout"
1195
1196 db_nslcmop_update = {
1197 "detailed-status": result_detail,
1198 "operationState": result,
1199 "statusEnteredTime": time()
1200 }
1201 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1202 return # database update is called inside finally
1203
1204 except (DbException, LcmException) as e:
1205 self.logger.error(logging_text + "Exit Exception {}".format(e))
1206 exc = e
1207 except Exception as e:
1208 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1209 exc = e
1210 finally:
1211 if exc and db_nslcmop:
1212 db_nslcmop_update = {
1213 "detailed-status": "FAILED {}: {}".format(step, exc),
1214 "operationState": "FAILED",
1215 "statusEnteredTime": time(),
1216 }
1217 if db_nslcmop_update:
1218 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1219
1220 async def test(self, param=None):
1221 self.logger.debug("Starting/Ending test task: {}".format(param))
1222
1223 def cancel_tasks(self, topic, _id):
1224 """
1225 Cancel all active tasks of a concrete nsr or vim identified for _id
1226 :param topic: can be ns or vim_account
1227 :param _id: nsr or vim identity
1228 :return: None, or raises an exception if not possible
1229 """
1230 if topic == "ns":
1231 lcm_tasks = self.lcm_ns_tasks
1232 elif topic== "vim_account":
1233 lcm_tasks = self.lcm_vim_tasks
1234 elif topic== "sdn":
1235 lcm_tasks = self.lcm_sdn_tasks
1236
1237 if not lcm_tasks.get(_id):
1238 return
1239 for order_id, tasks_set in lcm_tasks[_id].items():
1240 for task_name, task in tasks_set.items():
1241 result = task.cancel()
1242 if result:
1243 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1244 lcm_tasks[_id] = {}
1245
1246 async def kafka_ping(self):
1247 self.logger.debug("Task kafka_ping Enter")
1248 consecutive_errors = 0
1249 first_start = True
1250 kafka_has_received = False
1251 self.pings_not_received = 1
1252 while True:
1253 try:
1254 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1255 # time between pings are low when it is not received and at starting
1256 wait_time = 5 if not kafka_has_received else 120
1257 if not self.pings_not_received:
1258 kafka_has_received = True
1259 self.pings_not_received += 1
1260 await asyncio.sleep(wait_time, loop=self.loop)
1261 if self.pings_not_received > 10:
1262 raise LcmException("It is not receiving pings from Kafka bus")
1263 consecutive_errors = 0
1264 first_start = False
1265 except LcmException:
1266 raise
1267 except Exception as e:
1268 # if not first_start is the first time after starting. So leave more time and wait
1269 # to allow kafka starts
1270 if consecutive_errors == 8 if not first_start else 30:
1271 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1272 raise
1273 consecutive_errors += 1
1274 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1275 wait_time = 1 if not first_start else 5
1276 await asyncio.sleep(wait_time, loop=self.loop)
1277
1278 async def kafka_read(self):
1279 self.logger.debug("Task kafka_read Enter")
1280 order_id = 1
1281 # future = asyncio.Future()
1282 consecutive_errors = 0
1283 first_start = True
1284 while consecutive_errors < 10:
1285 try:
1286 topics = ("admin", "ns", "vim_account", "sdn")
1287 topic, command, params = await self.msg.aioread(topics, self.loop)
1288 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1289 consecutive_errors = 0
1290 first_start = False
1291 order_id += 1
1292 if command == "exit":
1293 print("Bye!")
1294 break
1295 elif command.startswith("#"):
1296 continue
1297 elif command == "echo":
1298 # just for test
1299 print(params)
1300 sys.stdout.flush()
1301 continue
1302 elif command == "test":
1303 asyncio.Task(self.test(params), loop=self.loop)
1304 continue
1305
1306 if topic == "admin":
1307 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1308 self.pings_not_received = 0
1309 continue
1310 elif topic == "ns":
1311 if command == "instantiate":
1312 # self.logger.debug("Deploying NS {}".format(nsr_id))
1313 nslcmop = params
1314 nslcmop_id = nslcmop["_id"]
1315 nsr_id = nslcmop["nsInstanceId"]
1316 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1317 if nsr_id not in self.lcm_ns_tasks:
1318 self.lcm_ns_tasks[nsr_id] = {}
1319 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1320 continue
1321 elif command == "terminate":
1322 # self.logger.debug("Deleting NS {}".format(nsr_id))
1323 nslcmop = params
1324 nslcmop_id = nslcmop["_id"]
1325 nsr_id = nslcmop["nsInstanceId"]
1326 self.cancel_tasks(topic, nsr_id)
1327 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1328 if nsr_id not in self.lcm_ns_tasks:
1329 self.lcm_ns_tasks[nsr_id] = {}
1330 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1331 continue
1332 elif command == "action":
1333 # self.logger.debug("Update NS {}".format(nsr_id))
1334 nslcmop = params
1335 nslcmop_id = nslcmop["_id"]
1336 nsr_id = nslcmop["nsInstanceId"]
1337 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1338 if nsr_id not in self.lcm_ns_tasks:
1339 self.lcm_ns_tasks[nsr_id] = {}
1340 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1341 continue
1342 elif command == "show":
1343 try:
1344 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1345 print(
1346 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1347 "{}\n deploy: {}\n tasks: {}".format(
1348 nsr_id, db_nsr["operational-status"],
1349 db_nsr["config-status"], db_nsr["detailed-status"],
1350 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1351 except Exception as e:
1352 print("nsr {} not found: {}".format(nsr_id, e))
1353 sys.stdout.flush()
1354 continue
1355 elif command == "deleted":
1356 continue # TODO cleaning of task just in case should be done
1357 elif topic == "vim_account":
1358 vim_id = params["_id"]
1359 if command == "create":
1360 task = asyncio.ensure_future(self.vim_create(params, order_id))
1361 if vim_id not in self.lcm_vim_tasks:
1362 self.lcm_vim_tasks[vim_id] = {}
1363 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1364 continue
1365 elif command == "delete":
1366 self.cancel_tasks(topic, vim_id)
1367 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1368 if vim_id not in self.lcm_vim_tasks:
1369 self.lcm_vim_tasks[vim_id] = {}
1370 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1371 continue
1372 elif command == "show":
1373 print("not implemented show with vim_account")
1374 sys.stdout.flush()
1375 continue
1376 elif command == "edit":
1377 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1378 if vim_id not in self.lcm_vim_tasks:
1379 self.lcm_vim_tasks[vim_id] = {}
1380 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1381 continue
1382 elif topic == "sdn":
1383 _sdn_id = params["_id"]
1384 if command == "create":
1385 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1386 if _sdn_id not in self.lcm_sdn_tasks:
1387 self.lcm_sdn_tasks[_sdn_id] = {}
1388 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1389 continue
1390 elif command == "delete":
1391 self.cancel_tasks(topic, _sdn_id)
1392 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1393 if _sdn_id not in self.lcm_sdn_tasks:
1394 self.lcm_sdn_tasks[_sdn_id] = {}
1395 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1396 continue
1397 elif command == "edit":
1398 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1399 if _sdn_id not in self.lcm_sdn_tasks:
1400 self.lcm_sdn_tasks[_sdn_id] = {}
1401 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1402 continue
1403 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1404 except Exception as e:
1405 # if not first_start is the first time after starting. So leave more time and wait
1406 # to allow kafka starts
1407 if consecutive_errors == 8 if not first_start else 30:
1408 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1409 raise
1410 consecutive_errors += 1
1411 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1412 wait_time = 2 if not first_start else 5
1413 await asyncio.sleep(wait_time, loop=self.loop)
1414
1415 # self.logger.debug("Task kafka_read terminating")
1416 self.logger.debug("Task kafka_read exit")
1417
1418 def start(self):
1419 self.loop = asyncio.get_event_loop()
1420 self.loop.run_until_complete(asyncio.gather(
1421 self.kafka_read(),
1422 self.kafka_ping()
1423 ))
1424 # TODO
1425 # self.logger.debug("Terminating cancelling creation tasks")
1426 # self.cancel_tasks("ALL", "create")
1427 # timeout = 200
1428 # while self.is_pending_tasks():
1429 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1430 # await asyncio.sleep(2, loop=self.loop)
1431 # timeout -= 2
1432 # if not timeout:
1433 # self.cancel_tasks("ALL", "ALL")
1434 self.loop.close()
1435 self.loop = None
1436 if self.db:
1437 self.db.db_disconnect()
1438 if self.msg:
1439 self.msg.disconnect()
1440 if self.fs:
1441 self.fs.fs_disconnect()
1442
1443
1444 def read_config_file(self, config_file):
1445 # TODO make a [ini] + yaml inside parser
1446 # the configparser library is not suitable, because it does not admit comments at the end of line,
1447 # and not parse integer or boolean
1448 try:
1449 with open(config_file) as f:
1450 conf = yaml.load(f)
1451 for k, v in environ.items():
1452 if not k.startswith("OSMLCM_"):
1453 continue
1454 k_items = k.lower().split("_")
1455 c = conf
1456 try:
1457 for k_item in k_items[1:-1]:
1458 if k_item in ("ro", "vca"):
1459 # put in capital letter
1460 k_item = k_item.upper()
1461 c = c[k_item]
1462 if k_items[-1] == "port":
1463 c[k_items[-1]] = int(v)
1464 else:
1465 c[k_items[-1]] = v
1466 except Exception as e:
1467 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1468
1469 return conf
1470 except Exception as e:
1471 self.logger.critical("At config file '{}': {}".format(config_file, e))
1472 exit(1)
1473
1474
1475 if __name__ == '__main__':
1476
1477 config_file = "lcm.cfg"
1478 lcm = Lcm(config_file)
1479
1480 lcm.start()