LCM update to new N2VC version: message to the callback
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 from n2vc import version as N2VC_version
22 # import os.path
23 # import time
24
25 from copy import deepcopy
26 from http import HTTPStatus
27 from time import time
28
29
30 class LcmException(Exception):
31 pass
32
33
34 class Lcm:
35
36 def __init__(self, config_file):
37 """
38 Init, Connect to database, filesystem storage, and messaging
39 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
40 :return: None
41 """
42
43 self.db = None
44 self.msg = None
45 self.fs = None
46 self.pings_not_received = 1
47
48 # contains created tasks/futures to be able to cancel
49 self.lcm_ns_tasks = {}
50 self.lcm_vim_tasks = {}
51 self.lcm_sdn_tasks = {}
52 # logging
53 self.logger = logging.getLogger('lcm')
54 # load configuration
55 config = self.read_config_file(config_file)
56 self.config = config
57 self.ro_config={
58 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
59 "tenant": config.get("tenant", "osm"),
60 "logger_name": "lcm.ROclient",
61 "loglevel": "ERROR",
62 }
63
64 self.vca = config["VCA"] # TODO VCA
65 self.loop = None
66
67 # logging
68 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
69 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
70 config["database"]["logger_name"] = "lcm.db"
71 config["storage"]["logger_name"] = "lcm.fs"
72 config["message"]["logger_name"] = "lcm.msg"
73 if "logfile" in config["global"]:
74 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
75 maxBytes=100e6, backupCount=9, delay=0)
76 file_handler.setFormatter(log_formatter_simple)
77 self.logger.addHandler(file_handler)
78 else:
79 str_handler = logging.StreamHandler()
80 str_handler.setFormatter(log_formatter_simple)
81 self.logger.addHandler(str_handler)
82
83 if config["global"].get("loglevel"):
84 self.logger.setLevel(config["global"]["loglevel"])
85
86 # logging other modules
87 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
88 config[k1]["logger_name"] = logname
89 logger_module = logging.getLogger(logname)
90 if "logfile" in config[k1]:
91 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
92 maxBytes=100e6, backupCount=9, delay=0)
93 file_handler.setFormatter(log_formatter_simple)
94 logger_module.addHandler(file_handler)
95 if "loglevel" in config[k1]:
96 logger_module.setLevel(config[k1]["loglevel"])
97
98 self.n2vc = N2VC(
99 log=self.logger,
100 server=config['VCA']['host'],
101 port=config['VCA']['port'],
102 user=config['VCA']['user'],
103 secret=config['VCA']['secret'],
104 # TODO: This should point to the base folder where charms are stored,
105 # if there is a common one (like object storage). Otherwise, leave
106 # it unset and pass it via DeployCharms
107 # artifacts=config['VCA'][''],
108 artifacts=None,
109 )
110 # check version of N2VC
111 # TODO enhance with int conversion or from distutils.version import LooseVersion
112 # or with list(map(int, version.split(".")))
113 if N2VC_version < "0.0.2":
114 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
115 try:
116 if config["database"]["driver"] == "mongo":
117 self.db = dbmongo.DbMongo()
118 self.db.db_connect(config["database"])
119 elif config["database"]["driver"] == "memory":
120 self.db = dbmemory.DbMemory()
121 self.db.db_connect(config["database"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
124 config["database"]["driver"]))
125
126 if config["storage"]["driver"] == "local":
127 self.fs = fslocal.FsLocal()
128 self.fs.fs_connect(config["storage"])
129 else:
130 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
131 config["storage"]["driver"]))
132
133 if config["message"]["driver"] == "local":
134 self.msg = msglocal.MsgLocal()
135 self.msg.connect(config["message"])
136 elif config["message"]["driver"] == "kafka":
137 self.msg = msgkafka.MsgKafka()
138 self.msg.connect(config["message"])
139 else:
140 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
141 config["storage"]["driver"]))
142 except (DbException, FsException, MsgException) as e:
143 self.logger.critical(str(e), exc_info=True)
144 raise LcmException(str(e))
145
146 def update_db(self, item, _id, _desc):
147 try:
148 self.db.replace(item, _id, _desc)
149 except DbException as e:
150 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
151
152 def update_db_2(self, item, _id, _desc):
153 try:
154 self.db.set_one(item, {"_id": _id}, _desc)
155 except DbException as e:
156 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
157
158 async def vim_create(self, vim_content, order_id):
159 vim_id = vim_content["_id"]
160 logging_text = "Task vim_create={} ".format(vim_id)
161 self.logger.debug(logging_text + "Enter")
162 db_vim = None
163 exc = None
164 try:
165 step = "Getting vim from db"
166 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
167 if "_admin" not in db_vim:
168 db_vim["_admin"] = {}
169 if "deployed" not in db_vim["_admin"]:
170 db_vim["_admin"]["deployed"] = {}
171 db_vim["_admin"]["deployed"]["RO"] = None
172
173 step = "Creating vim at RO"
174 RO = ROclient.ROClient(self.loop, **self.ro_config)
175 vim_RO = deepcopy(vim_content)
176 vim_RO.pop("_id", None)
177 vim_RO.pop("_admin", None)
178 vim_RO.pop("schema_version", None)
179 vim_RO.pop("schema_type", None)
180 vim_RO.pop("vim_tenant_name", None)
181 vim_RO["type"] = vim_RO.pop("vim_type")
182 vim_RO.pop("vim_user", None)
183 vim_RO.pop("vim_password", None)
184 desc = await RO.create("vim", descriptor=vim_RO)
185 RO_vim_id = desc["uuid"]
186 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
187 self.update_db("vim_accounts", vim_id, db_vim)
188
189 step = "Attach vim to RO tenant"
190 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
191 "vim_username": vim_content["vim_user"],
192 "vim_password": vim_content["vim_password"],
193 "config": vim_content["config"]
194 }
195 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
196 db_vim["_admin"]["operationalState"] = "ENABLED"
197 self.update_db("vim_accounts", vim_id, db_vim)
198
199 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
200 return RO_vim_id
201
202 except (ROclient.ROClientException, DbException) as e:
203 self.logger.error(logging_text + "Exit Exception {}".format(e))
204 exc = e
205 except Exception as e:
206 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
207 exc = e
208 finally:
209 if exc and db_vim:
210 db_vim["_admin"]["operationalState"] = "ERROR"
211 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
212 self.update_db("vim_accounts", vim_id, db_vim)
213
214 async def vim_edit(self, vim_content, order_id):
215 vim_id = vim_content["_id"]
216 logging_text = "Task vim_edit={} ".format(vim_id)
217 self.logger.debug(logging_text + "Enter")
218 db_vim = None
219 exc = None
220 step = "Getting vim from db"
221 try:
222 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
223 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
224 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
225 step = "Editing vim at RO"
226 RO = ROclient.ROClient(self.loop, **self.ro_config)
227 vim_RO = deepcopy(vim_content)
228 vim_RO.pop("_id", None)
229 vim_RO.pop("_admin", None)
230 vim_RO.pop("schema_version", None)
231 vim_RO.pop("schema_type", None)
232 vim_RO.pop("vim_tenant_name", None)
233 vim_RO["type"] = vim_RO.pop("vim_type")
234 vim_RO.pop("vim_user", None)
235 vim_RO.pop("vim_password", None)
236 if vim_RO:
237 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
238
239 step = "Editing vim-account at RO tenant"
240 vim_RO = {}
241 for k in ("vim_tenant_name", "vim_password", "config"):
242 if k in vim_content:
243 vim_RO[k] = vim_content[k]
244 if "vim_user" in vim_content:
245 vim_content["vim_username"] = vim_content["vim_user"]
246 if vim_RO:
247 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
248 db_vim["_admin"]["operationalState"] = "ENABLED"
249 self.update_db("vim_accounts", vim_id, db_vim)
250
251 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
252 return RO_vim_id
253
254 except (ROclient.ROClientException, DbException) as e:
255 self.logger.error(logging_text + "Exit Exception {}".format(e))
256 exc = e
257 except Exception as e:
258 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
259 exc = e
260 finally:
261 if exc and db_vim:
262 db_vim["_admin"]["operationalState"] = "ERROR"
263 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
264 self.update_db("vim_accounts", vim_id, db_vim)
265
266 async def vim_delete(self, vim_id, order_id):
267 logging_text = "Task vim_delete={} ".format(vim_id)
268 self.logger.debug(logging_text + "Enter")
269 db_vim = None
270 exc = None
271 step = "Getting vim from db"
272 try:
273 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
274 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
275 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
276 RO = ROclient.ROClient(self.loop, **self.ro_config)
277 step = "Detaching vim from RO tenant"
278 try:
279 await RO.detach_datacenter(RO_vim_id)
280 except ROclient.ROClientException as e:
281 if e.http_code == 404: # not found
282 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
283 else:
284 raise
285
286 step = "Deleting vim from RO"
287 try:
288 await RO.delete("vim", RO_vim_id)
289 except ROclient.ROClientException as e:
290 if e.http_code == 404: # not found
291 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
292 else:
293 raise
294 else:
295 # nothing to delete
296 self.logger.error(logging_text + "Skipping. There is not RO information at database")
297 self.db.del_one("vim_accounts", {"_id": vim_id})
298 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
299 return None
300
301 except (ROclient.ROClientException, DbException) as e:
302 self.logger.error(logging_text + "Exit Exception {}".format(e))
303 exc = e
304 except Exception as e:
305 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
306 exc = e
307 finally:
308 if exc and db_vim:
309 db_vim["_admin"]["operationalState"] = "ERROR"
310 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
311 self.update_db("vim_accounts", vim_id, db_vim)
312
313 async def sdn_create(self, sdn_content, order_id):
314 sdn_id = sdn_content["_id"]
315 logging_text = "Task sdn_create={} ".format(sdn_id)
316 self.logger.debug(logging_text + "Enter")
317 db_sdn = None
318 exc = None
319 try:
320 step = "Getting sdn from db"
321 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
322 if "_admin" not in db_sdn:
323 db_sdn["_admin"] = {}
324 if "deployed" not in db_sdn["_admin"]:
325 db_sdn["_admin"]["deployed"] = {}
326 db_sdn["_admin"]["deployed"]["RO"] = None
327
328 step = "Creating sdn at RO"
329 RO = ROclient.ROClient(self.loop, **self.ro_config)
330 sdn_RO = deepcopy(sdn_content)
331 sdn_RO.pop("_id", None)
332 sdn_RO.pop("_admin", None)
333 sdn_RO.pop("schema_version", None)
334 sdn_RO.pop("schema_type", None)
335 desc = await RO.create("sdn", descriptor=sdn_RO)
336 RO_sdn_id = desc["uuid"]
337 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
338 db_sdn["_admin"]["operationalState"] = "ENABLED"
339 self.update_db("sdns", sdn_id, db_sdn)
340 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
341 return RO_sdn_id
342
343 except (ROclient.ROClientException, DbException) as e:
344 self.logger.error(logging_text + "Exit Exception {}".format(e))
345 exc = e
346 except Exception as e:
347 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
348 exc = e
349 finally:
350 if exc and db_sdn:
351 db_sdn["_admin"]["operationalState"] = "ERROR"
352 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
353 self.update_db("sdns", sdn_id, db_sdn)
354
355 async def sdn_edit(self, sdn_content, order_id):
356 sdn_id = sdn_content["_id"]
357 logging_text = "Task sdn_edit={} ".format(sdn_id)
358 self.logger.debug(logging_text + "Enter")
359 db_sdn = None
360 exc = None
361 step = "Getting sdn from db"
362 try:
363 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
364 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
365 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
366 RO = ROclient.ROClient(self.loop, **self.ro_config)
367 step = "Editing sdn at RO"
368 sdn_RO = deepcopy(sdn_content)
369 sdn_RO.pop("_id", None)
370 sdn_RO.pop("_admin", None)
371 sdn_RO.pop("schema_version", None)
372 sdn_RO.pop("schema_type", None)
373 if sdn_RO:
374 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
375 db_sdn["_admin"]["operationalState"] = "ENABLED"
376 self.update_db("sdns", sdn_id, db_sdn)
377
378 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
379 return RO_sdn_id
380
381 except (ROclient.ROClientException, DbException) as e:
382 self.logger.error(logging_text + "Exit Exception {}".format(e))
383 exc = e
384 except Exception as e:
385 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
386 exc = e
387 finally:
388 if exc and db_sdn:
389 db_sdn["_admin"]["operationalState"] = "ERROR"
390 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
391 self.update_db("sdns", sdn_id, db_sdn)
392
393 async def sdn_delete(self, sdn_id, order_id):
394 logging_text = "Task sdn_delete={} ".format(sdn_id)
395 self.logger.debug(logging_text + "Enter")
396 db_sdn = None
397 exc = None
398 step = "Getting sdn from db"
399 try:
400 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
401 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
402 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
403 RO = ROclient.ROClient(self.loop, **self.ro_config)
404 step = "Deleting sdn from RO"
405 try:
406 await RO.delete("sdn", RO_sdn_id)
407 except ROclient.ROClientException as e:
408 if e.http_code == 404: # not found
409 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
410 else:
411 raise
412 else:
413 # nothing to delete
414 self.logger.error(logging_text + "Skipping. There is not RO information at database")
415 self.db.del_one("sdns", {"_id": sdn_id})
416 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
417 return None
418
419 except (ROclient.ROClientException, DbException) as e:
420 self.logger.error(logging_text + "Exit Exception {}".format(e))
421 exc = e
422 except Exception as e:
423 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
424 exc = e
425 finally:
426 if exc and db_sdn:
427 db_sdn["_admin"]["operationalState"] = "ERROR"
428 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
429 self.update_db("sdns", sdn_id, db_sdn)
430
431 def vnfd2RO(self, vnfd, new_id=None):
432 """
433 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
434 :param vnfd: input vnfd
435 :param new_id: overrides vnf id if provided
436 :return: copy of vnfd
437 """
438 ci_file = None
439 try:
440 vnfd_RO = deepcopy(vnfd)
441 vnfd_RO.pop("_id", None)
442 vnfd_RO.pop("_admin", None)
443 if new_id:
444 vnfd_RO["id"] = new_id
445 for vdu in vnfd_RO["vdu"]:
446 if "cloud-init-file" in vdu:
447 base_folder = vnfd["_admin"]["storage"]
448 clout_init_file = "{}/{}/cloud_init/{}".format(
449 base_folder["folder"],
450 base_folder["pkg-dir"],
451 vdu["cloud-init-file"]
452 )
453 ci_file = self.fs.file_open(clout_init_file, "r")
454 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
455 clout_init_content = ci_file.read()
456 ci_file.close()
457 ci_file = None
458 vdu.pop("cloud-init-file", None)
459 vdu["cloud-init"] = clout_init_content
460 return vnfd_RO
461 except FsException as e:
462 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
463 finally:
464 if ci_file:
465 ci_file.close()
466
467 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, vnf_member_index, task=None):
468 """
469 Callback both for charm status change and task completion
470 :param model_name: Charm model name
471 :param application_name: Charm application name
472 :param status: Can be
473 - blocked: The unit needs manual intervention
474 - maintenance: The unit is actively deploying/configuring
475 - waiting: The unit is waiting for another charm to be ready
476 - active: The unit is deployed, configured, and ready
477 - error: The charm has failed and needs attention.
478 - terminated: The charm has been destroyed
479 - removing,
480 - removed
481 :param message: detailed message error
482 :param db_nsr: nsr database content
483 :param db_nslcmop: nslcmop database content
484 :param vnf_member_index: NSD vnf-member-index
485 :param task: None for charm status change, or task for completion task callback
486 :return:
487 """
488 nsr_id = None
489 nslcmop_id = None
490 update_nsr = update_nslcmop = False
491 try:
492 nsr_id = db_nsr["_id"]
493 nslcmop_id = db_nslcmop["_id"]
494 nsr_lcm = db_nsr["_admin"]["deployed"]
495 ns_action = db_nslcmop["lcmOperationType"]
496 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
497 vnf_member_index)
498
499 if task:
500 if task.cancelled():
501 self.logger.debug(logging_text + " task Cancelled")
502 # TODO update db_nslcmop
503 return
504
505 if task.done():
506 exc = task.exception()
507 if exc:
508 self.logger.error(logging_text + " task Exception={}".format(exc))
509 if ns_action in ("instantiate", "terminate"):
510 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
511 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
512 elif ns_action == "action":
513 db_nslcmop["operationState"] = "FAILED"
514 db_nslcmop["detailed-status"] = str(exc)
515 db_nslcmop["statusEnteredTime"] = time()
516 update_nslcmop = True
517 return
518
519 else:
520 self.logger.debug(logging_text + " task Done")
521 # TODO revise with Adam if action is finished and ok when task is done
522 if ns_action == "action":
523 db_nslcmop["operationState"] = "COMPLETED"
524 db_nslcmop["detailed-status"] = "Done"
525 db_nslcmop["statusEnteredTime"] = time()
526 update_nslcmop = True
527 # task is Done, but callback is still ongoing. So ignore
528 return
529 elif status:
530 self.logger.debug(logging_text + " Enter status={}".format(status))
531 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == status:
532 return # same status, ignore
533 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = status
534 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(message)
535 else:
536 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
537 return
538
539 all_active = True
540 status_map = {}
541 n2vc_error_text = [] # contain text error list. If empty no one is in error status
542 for vnf_index, vca_info in nsr_lcm["VCA"].items():
543 vca_status = vca_info["operational-status"]
544 if vca_status not in status_map:
545 # Initialize it
546 status_map[vca_status] = 0
547 status_map[vca_status] += 1
548
549 if vca_status != "active":
550 all_active = False
551 elif vca_status in ("error", "blocked"):
552 n2vc_error_text.append("member_vnf_index={} {}: {}".format(vnf_member_index, vca_status,
553 vca_info["detailed-status"]))
554
555 if all_active:
556 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
557 db_nsr["config-status"] = "configured"
558 db_nsr["detailed-status"] = "done"
559 db_nslcmop["operationState"] = "COMPLETED"
560 db_nslcmop["detailed-status"] = "Done"
561 db_nslcmop["statusEnteredTime"] = time()
562 elif n2vc_error_text:
563 db_nsr["config-status"] = "failed"
564 error_text = "fail configuring " + ";".join(n2vc_error_text)
565 db_nsr["detailed-status"] = error_text
566 db_nslcmop["operationState"] = "FAILED_TEMP"
567 db_nslcmop["detailed-status"] = error_text
568 db_nslcmop["statusEnteredTime"] = time()
569 else:
570 cs = "configuring: "
571 separator = ""
572 for status, num in status_map.items():
573 cs += separator + "{}: {}".format(status, num)
574 separator = ", "
575 db_nsr["config-status"] = cs
576 db_nsr["detailed-status"] = cs
577 db_nslcmop["detailed-status"] = cs
578 update_nsr = update_nslcmop = True
579
580 except Exception as e:
581 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
582 finally:
583 try:
584 if update_nslcmop:
585 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
586 if update_nsr:
587 self.update_db("nsrs", nsr_id, db_nsr)
588 except Exception as e:
589 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
590 vnf_member_index, e), exc_info=True)
591
592 def ns_params_2_RO(self, ns_params):
593 """
594 Creates a RO ns descriptor from OSM ns_instantite params
595 :param ns_params: OSM instantiate params
596 :return: The RO ns descriptor
597 """
598 vim_2_RO = {}
599 def vim_account_2_RO(vim_account):
600 if vim_account in vim_2_RO:
601 return vim_2_RO[vim_account]
602 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
603 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
604 # #TODO check if VIM is creating and wait
605 if db_vim["_admin"]["operationalState"] != "ENABLED":
606 raise LcmException("VIM={} is not available. operationalSstatus={}".format(
607 vim_account, db_vim["_admin"]["operationalState"]))
608 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
609 vim_2_RO[vim_account] = RO_vim_id
610 return RO_vim_id
611
612 if not ns_params:
613 return None
614 RO_ns_params = {
615 # "name": ns_params["nsName"],
616 # "description": ns_params.get("nsDescription"),
617 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
618 # "scenario": ns_params["nsdId"],
619 "vnfs": {},
620 "networks": {},
621 }
622 if ns_params.get("ssh-authorized-key"):
623 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
624 if ns_params.get("vnf"):
625 for vnf in ns_params["vnf"]:
626 RO_vnf = {}
627 if "vimAccountId" in vnf:
628 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
629 if RO_vnf:
630 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
631 if ns_params.get("vld"):
632 for vld in ns_params["vld"]:
633 RO_vld = {}
634 if "ip-profile" in vld:
635 RO_vld["ip-profile"] = vld["ip-profile"]
636 if "vim-network-name" in vld:
637 RO_vld["sites"] = []
638 if isinstance(vld["vim-network-name"], dict):
639 for vim_account, vim_net in vld["vim-network-name"].items():
640 RO_vld["sites"].append({
641 "netmap-use": vim_net,
642 "datacenter": vim_account_2_RO(vim_account)
643 })
644 else: #isinstance str
645 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
646 if RO_vld:
647 RO_ns_params["networks"][vld["name"]] = RO_vld
648 return RO_ns_params
649
650 async def ns_instantiate(self, nsr_id, nslcmop_id):
651 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
652 self.logger.debug(logging_text + "Enter")
653 # get all needed from database
654 db_nsr = None
655 db_nslcmop = None
656 exc = None
657 step = "Getting nsr, nslcmop, RO_vims from db"
658 try:
659 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
660 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
661 nsd = db_nsr["nsd"]
662 nsr_name = db_nsr["name"] # TODO short-name??
663
664 needed_vnfd = {}
665 for c_vnf in nsd["constituent-vnfd"]:
666 vnfd_id = c_vnf["vnfd-id-ref"]
667 if vnfd_id not in needed_vnfd:
668 step = "Getting vnfd={} from db".format(vnfd_id)
669 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
670
671 nsr_lcm = db_nsr["_admin"].get("deployed")
672 if not nsr_lcm:
673 nsr_lcm = db_nsr["_admin"]["deployed"] = {
674 "id": nsr_id,
675 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
676 "nsr_ip": {},
677 "VCA": {},
678 }
679 db_nsr["detailed-status"] = "creating"
680 db_nsr["operational-status"] = "init"
681
682 RO = ROclient.ROClient(self.loop, **self.ro_config)
683
684 # get vnfds, instantiate at RO
685 for vnfd_id, vnfd in needed_vnfd.items():
686 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
687 self.logger.debug(logging_text + step)
688 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
689
690 # look if present
691 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
692 if vnfd_list:
693 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
694 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
695 vnfd_id, vnfd_list[0]["uuid"]))
696 else:
697 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
698 desc = await RO.create("vnfd", descriptor=vnfd_RO)
699 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
700 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
701 self.update_db("nsrs", nsr_id, db_nsr)
702
703 # create nsd at RO
704 nsd_id = nsd["id"]
705 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
706 self.logger.debug(logging_text + step)
707
708 nsd_id_RO = nsd_id + "." + nsd_id[:200]
709 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
710 if nsd_list:
711 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
712 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
713 nsd_id, nsd_list[0]["uuid"]))
714 else:
715 nsd_RO = deepcopy(nsd)
716 nsd_RO["id"] = nsd_id_RO
717 nsd_RO.pop("_id", None)
718 nsd_RO.pop("_admin", None)
719 for c_vnf in nsd_RO["constituent-vnfd"]:
720 vnfd_id = c_vnf["vnfd-id-ref"]
721 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
722 desc = await RO.create("nsd", descriptor=nsd_RO)
723 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
724 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
725 self.update_db("nsrs", nsr_id, db_nsr)
726
727 # Crate ns at RO
728 # if present use it unless in error status
729 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
730 if RO_nsr_id:
731 try:
732 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
733 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
734 desc = await RO.show("ns", RO_nsr_id)
735 except ROclient.ROClientException as e:
736 if e.http_code != HTTPStatus.NOT_FOUND:
737 raise
738 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
739 if RO_nsr_id:
740 ns_status, ns_status_info = RO.check_ns_status(desc)
741 nsr_lcm["RO"]["nsr_status"] = ns_status
742 if ns_status == "ERROR":
743 step = db_nsr["detailed-status"] = "Deleting ns at RO"
744 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
745 await RO.delete("ns", RO_nsr_id)
746 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
747
748 if not RO_nsr_id:
749 step = db_nsr["detailed-status"] = "Creating ns at RO"
750 self.logger.debug(logging_text + step)
751 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
752 desc = await RO.create("ns", descriptor=RO_ns_params,
753 name=db_nsr["name"],
754 scenario=nsr_lcm["RO"]["nsd_id"])
755 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
756 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
757 nsr_lcm["RO"]["nsr_status"] = "BUILD"
758 self.update_db("nsrs", nsr_id, db_nsr)
759
760 # wait until NS is ready
761 step = ns_status_detailed = "Waiting ns ready at RO"
762 db_nsr["detailed-status"] = ns_status_detailed
763 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
764 deployment_timeout = 2*3600 # Two hours
765 while deployment_timeout > 0:
766 desc = await RO.show("ns", RO_nsr_id)
767 ns_status, ns_status_info = RO.check_ns_status(desc)
768 nsr_lcm["RO"]["nsr_status"] = ns_status
769 if ns_status == "ERROR":
770 raise ROclient.ROClientException(ns_status_info)
771 elif ns_status == "BUILD":
772 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
773 self.update_db("nsrs", nsr_id, db_nsr)
774 elif ns_status == "ACTIVE":
775 step = "Getting ns VNF management IP address"
776 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
777 break
778 else:
779 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
780
781 await asyncio.sleep(5, loop=self.loop)
782 deployment_timeout -= 5
783 if deployment_timeout <= 0:
784 raise ROclient.ROClientException("Timeout waiting ns to be ready")
785 db_nsr["detailed-status"] = "Configuring vnfr"
786 self.update_db("nsrs", nsr_id, db_nsr)
787
788 # The parameters we'll need to deploy a charm
789 number_to_configure = 0
790
791 def deploy():
792 """An inner function to deploy the charm from either vnf or vdu
793 """
794
795 # Login to the VCA.
796 # if number_to_configure == 0:
797 # self.logger.debug("Logging into N2VC...")
798 # task = asyncio.ensure_future(self.n2vc.login())
799 # yield from asyncio.wait_for(task, 30.0)
800 # self.logger.debug("Logged into N2VC!")
801
802 ## await self.n2vc.login()
803
804 # Note: The charm needs to exist on disk at the location
805 # specified by charm_path.
806 base_folder = vnfd["_admin"]["storage"]
807 storage_params = self.fs.get_params()
808 charm_path = "{}{}/{}/charms/{}".format(
809 storage_params["path"],
810 base_folder["folder"],
811 base_folder["pkg-dir"],
812 proxy_charm
813 )
814
815 # Setup the runtime parameters for this VNF
816 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip']["vnf"][vnf_index]
817
818 # ns_name will be ignored in the current version of N2VC
819 # but will be implemented for the next point release.
820 model_name = 'default'
821 application_name = self.n2vc.FormatApplicationName(
822 nsr_name,
823 vnf_index,
824 vnfd['name'],
825 )
826
827 nsr_lcm["VCA"][vnf_index] = {
828 "model": model_name,
829 "application": application_name,
830 "operational-status": "init",
831 "detailed-status": "",
832 "vnfd_id": vnfd_id,
833 }
834
835 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
836 task = asyncio.ensure_future(
837 self.n2vc.DeployCharms(
838 model_name, # The network service name
839 application_name, # The application name
840 vnfd, # The vnf descriptor
841 charm_path, # Path to charm
842 params, # Runtime params, like mgmt ip
843 {}, # for native charms only
844 self.n2vc_callback, # Callback for status changes
845 db_nsr, # Callback parameter
846 db_nslcmop,
847 vnf_index, # Callback parameter
848 None, # Callback parameter (task)
849 )
850 )
851 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
852 db_nsr, db_nslcmop, vnf_index))
853 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
854
855 # TODO: Make this call inside deploy()
856 # Login to the VCA. If there are multiple calls to login(),
857 # subsequent calls will be a nop and return immediately.
858 await self.n2vc.login()
859
860 step = "Looking for needed vnfd to configure"
861 self.logger.debug(logging_text + step)
862 for c_vnf in nsd["constituent-vnfd"]:
863 vnfd_id = c_vnf["vnfd-id-ref"]
864 vnf_index = str(c_vnf["member-vnf-index"])
865 vnfd = needed_vnfd[vnfd_id]
866
867 # Check if this VNF has a charm configuration
868 vnf_config = vnfd.get("vnf-configuration")
869
870 if vnf_config and vnf_config.get("juju"):
871 proxy_charm = vnf_config["juju"]["charm"]
872 params = {}
873
874 if proxy_charm:
875 if 'initial-config-primitive' in vnf_config:
876 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
877
878 deploy()
879 number_to_configure += 1
880
881 # Deploy charms for each VDU that supports one.
882 for vdu in vnfd['vdu']:
883 vdu_config = vdu.get('vdu-configuration')
884 proxy_charm = None
885 params = {}
886
887 if vdu_config and vdu_config.get("juju"):
888 proxy_charm = vdu_config["juju"]["charm"]
889
890 if 'initial-config-primitive' in vdu_config:
891 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
892
893 if proxy_charm:
894 deploy()
895 number_to_configure += 1
896
897 if number_to_configure:
898 db_nsr["config-status"] = "configuring"
899 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
900 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
901 else:
902 db_nslcmop["operationState"] = "COMPLETED"
903 db_nslcmop["detailed-status"] = "done"
904 db_nsr["config-status"] = "configured"
905 db_nsr["detailed-status"] = "done"
906 db_nsr["operational-status"] = "running"
907 self.update_db("nsrs", nsr_id, db_nsr)
908 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
909 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
910 return nsr_lcm
911
912 except (ROclient.ROClientException, DbException, LcmException) as e:
913 self.logger.error(logging_text + "Exit Exception {}".format(e))
914 exc = e
915 except Exception as e:
916 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
917 exc = e
918 finally:
919 if exc:
920 if db_nsr:
921 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
922 db_nsr["operational-status"] = "failed"
923 self.update_db("nsrs", nsr_id, db_nsr)
924 if db_nslcmop:
925 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
926 db_nslcmop["operationState"] = "FAILED"
927 db_nslcmop["statusEnteredTime"] = time()
928 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
929
930 async def ns_terminate(self, nsr_id, nslcmop_id):
931 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
932 self.logger.debug(logging_text + "Enter")
933 db_nsr = None
934 db_nslcmop = None
935 exc = None
936 step = "Getting nsr, nslcmop from db"
937 failed_detail = [] # annotates all failed error messages
938 vca_task_list = []
939 vca_task_dict = {}
940 try:
941 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
942 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
943 # nsd = db_nsr["nsd"]
944 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
945 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
946 return
947 # TODO ALF remove
948 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
949 # #TODO check if VIM is creating and wait
950 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
951
952 db_nsr_update = {
953 "operational-status": "terminating",
954 "config-status": "terminating",
955 "detailed-status": "Deleting charms",
956 }
957 self.update_db_2("nsrs", nsr_id, db_nsr_update)
958
959 try:
960 self.logger.debug(logging_text + step)
961 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
962 if deploy_info and deploy_info.get("application"):
963 task = asyncio.ensure_future(
964 self.n2vc.RemoveCharms(
965 deploy_info['model'],
966 deploy_info['application'],
967 # self.n2vc_callback,
968 # db_nsr,
969 # db_nslcmop,
970 # vnf_index,
971 )
972 )
973 vca_task_list.append(task)
974 vca_task_dict[vnf_index] = task
975 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
976 # deploy_info['application'], None, db_nsr,
977 # db_nslcmop, vnf_index))
978 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
979 except Exception as e:
980 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
981 # remove from RO
982
983 RO = ROclient.ROClient(self.loop, **self.ro_config)
984 # Delete ns
985 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
986 if RO_nsr_id:
987 try:
988 step = db_nsr["detailed-status"] = "Deleting ns at RO"
989 self.logger.debug(logging_text + step)
990 desc = await RO.delete("ns", RO_nsr_id)
991 nsr_lcm["RO"]["nsr_id"] = None
992 nsr_lcm["RO"]["nsr_status"] = "DELETED"
993 except ROclient.ROClientException as e:
994 if e.http_code == 404: # not found
995 nsr_lcm["RO"]["nsr_id"] = None
996 nsr_lcm["RO"]["nsr_status"] = "DELETED"
997 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
998 elif e.http_code == 409: #conflict
999 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1000 self.logger.debug(logging_text + failed_detail[-1])
1001 else:
1002 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1003 self.logger.error(logging_text + failed_detail[-1])
1004
1005 # Delete nsd
1006 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1007 if RO_nsd_id:
1008 try:
1009 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1010 desc = await RO.delete("nsd", RO_nsd_id)
1011 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1012 nsr_lcm["RO"]["nsd_id"] = None
1013 except ROclient.ROClientException as e:
1014 if e.http_code == 404: # not found
1015 nsr_lcm["RO"]["nsd_id"] = None
1016 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1017 elif e.http_code == 409: #conflict
1018 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1019 self.logger.debug(logging_text + failed_detail[-1])
1020 else:
1021 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1022 self.logger.error(logging_text + failed_detail[-1])
1023
1024 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1025 if not RO_vnfd_id:
1026 continue
1027 try:
1028 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1029 desc = await RO.delete("vnfd", RO_vnfd_id)
1030 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1031 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1032 except ROclient.ROClientException as e:
1033 if e.http_code == 404: # not found
1034 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1035 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1036 elif e.http_code == 409: #conflict
1037 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1038 self.logger.debug(logging_text + failed_detail[-1])
1039 else:
1040 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1041 self.logger.error(logging_text + failed_detail[-1])
1042
1043 if vca_task_list:
1044 await asyncio.wait(vca_task_list, timeout=300)
1045 for vnf_index, task in vca_task_dict.items():
1046 if task.cancelled():
1047 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1048 elif task.done():
1049 exc = task.exception()
1050 if exc:
1051 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1052 else:
1053 nsr_lcm["VCA"][vnf_index] = None
1054 else: # timeout
1055 # TODO Should it be cancelled?!!
1056 task.cancel()
1057 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1058
1059 if failed_detail:
1060 self.logger.error(logging_text + " ;".join(failed_detail))
1061 db_nsr_update = {
1062 "operational-status": "failed",
1063 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1064 "_admin": {"deployed": nsr_lcm, }
1065 }
1066 db_nslcmop_update = {
1067 "detailed-status": "; ".join(failed_detail),
1068 "operationState": "FAILED",
1069 "statusEnteredTime": time()
1070 }
1071 elif db_nslcmop["operationParams"].get("autoremove"):
1072 self.db.del_one("nsrs", {"_id": nsr_id})
1073 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1074 else:
1075 db_nsr_update = {
1076 "operational-status": "terminated",
1077 "detailed-status": "Done",
1078 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1079 }
1080 db_nslcmop_update = {
1081 "detailed-status": "Done",
1082 "operationState": "COMPLETED",
1083 "statusEnteredTime": time()
1084 }
1085 self.logger.debug(logging_text + "Exit")
1086
1087 except (ROclient.ROClientException, DbException) as e:
1088 self.logger.error(logging_text + "Exit Exception {}".format(e))
1089 exc = e
1090 except Exception as e:
1091 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1092 exc = e
1093 finally:
1094 if exc and db_nslcmop:
1095 db_nslcmop_update = {
1096 "detailed-status": "FAILED {}: {}".format(step, exc),
1097 "operationState": "FAILED",
1098 "statusEnteredTime": time(),
1099 }
1100 if db_nslcmop_update:
1101 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1102 if db_nsr_update:
1103 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1104
1105 async def ns_action(self, nsr_id, nslcmop_id):
1106 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1107 self.logger.debug(logging_text + "Enter")
1108 # get all needed from database
1109 db_nsr = None
1110 db_nslcmop = None
1111 db_nslcmop_update = None
1112 exc = None
1113 step = "Getting nsr, nslcmop"
1114 try:
1115 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1116 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1117 nsr_lcm = db_nsr["_admin"].get("deployed")
1118 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1119
1120 #TODO check if ns is in a proper status
1121 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1122 if not vca_deployed:
1123 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1124 model_name = vca_deployed.get("model")
1125 application_name = vca_deployed.get("application")
1126 if not model_name or not application_name:
1127 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1128 if vca_deployed["operational-status"] != "active":
1129 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1130 vnf_index, vca_deployed["operational-status"]))
1131 primitive = db_nslcmop["operationParams"]["primitive"]
1132 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1133 callback = None # self.n2vc_callback
1134 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1135 await self.n2vc.login()
1136 task = asyncio.ensure_future(
1137 self.n2vc.ExecutePrimitive(
1138 model_name,
1139 application_name,
1140 primitive, callback,
1141 *callback_args,
1142 **primitive_params
1143 )
1144 )
1145 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1146 # db_nsr, db_nslcmop, vnf_index))
1147 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1148 # wait until completed with timeout
1149 await asyncio.wait((task,), timeout=300)
1150
1151 result = "FAILED" # by default
1152 result_detail = ""
1153 if task.cancelled():
1154 db_nslcmop["detailed-status"] = "Task has been cancelled"
1155 elif task.done():
1156 exc = task.exception()
1157 if exc:
1158 result_detail = str(exc)
1159 else:
1160 self.logger.debug(logging_text + " task Done")
1161 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1162 result = "COMPLETED"
1163 result_detail = "Done"
1164 else: # timeout
1165 # TODO Should it be cancelled?!!
1166 task.cancel()
1167 result_detail = "timeout"
1168
1169 db_nslcmop_update = {
1170 "detailed-status": result_detail,
1171 "operationState": result,
1172 "statusEnteredTime": time()
1173 }
1174 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1175 return # database update is called inside finally
1176
1177 except (DbException, LcmException) as e:
1178 self.logger.error(logging_text + "Exit Exception {}".format(e))
1179 exc = e
1180 except Exception as e:
1181 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1182 exc = e
1183 finally:
1184 if exc and db_nslcmop:
1185 db_nslcmop_update = {
1186 "detailed-status": "FAILED {}: {}".format(step, exc),
1187 "operationState": "FAILED",
1188 "statusEnteredTime": time(),
1189 }
1190 if db_nslcmop_update:
1191 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1192
1193 async def test(self, param=None):
1194 self.logger.debug("Starting/Ending test task: {}".format(param))
1195
1196 def cancel_tasks(self, topic, _id):
1197 """
1198 Cancel all active tasks of a concrete nsr or vim identified for _id
1199 :param topic: can be ns or vim_account
1200 :param _id: nsr or vim identity
1201 :return: None, or raises an exception if not possible
1202 """
1203 if topic == "ns":
1204 lcm_tasks = self.lcm_ns_tasks
1205 elif topic== "vim_account":
1206 lcm_tasks = self.lcm_vim_tasks
1207 elif topic== "sdn":
1208 lcm_tasks = self.lcm_sdn_tasks
1209
1210 if not lcm_tasks.get(_id):
1211 return
1212 for order_id, tasks_set in lcm_tasks[_id].items():
1213 for task_name, task in tasks_set.items():
1214 result = task.cancel()
1215 if result:
1216 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1217 lcm_tasks[_id] = {}
1218
1219 async def kafka_ping(self):
1220 self.logger.debug("Task kafka_ping Enter")
1221 consecutive_errors = 0
1222 first_start = True
1223 kafka_has_received = False
1224 self.pings_not_received = 1
1225 while True:
1226 try:
1227 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1228 # time between pings are low when it is not received and at starting
1229 wait_time = 5 if not kafka_has_received else 120
1230 if not self.pings_not_received:
1231 kafka_has_received = True
1232 self.pings_not_received += 1
1233 await asyncio.sleep(wait_time, loop=self.loop)
1234 if self.pings_not_received > 10:
1235 raise LcmException("It is not receiving pings from Kafka bus")
1236 consecutive_errors = 0
1237 first_start = False
1238 except LcmException:
1239 raise
1240 except Exception as e:
1241 # if not first_start is the first time after starting. So leave more time and wait
1242 # to allow kafka starts
1243 if consecutive_errors == 8 if not first_start else 30:
1244 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1245 raise
1246 consecutive_errors += 1
1247 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1248 wait_time = 1 if not first_start else 5
1249 await asyncio.sleep(wait_time, loop=self.loop)
1250
1251 async def kafka_read(self):
1252 self.logger.debug("Task kafka_read Enter")
1253 order_id = 1
1254 # future = asyncio.Future()
1255 consecutive_errors = 0
1256 first_start = True
1257 while consecutive_errors < 10:
1258 try:
1259 topics = ("admin", "ns", "vim_account", "sdn")
1260 topic, command, params = await self.msg.aioread(topics, self.loop)
1261 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1262 consecutive_errors = 0
1263 first_start = False
1264 order_id += 1
1265 if command == "exit":
1266 print("Bye!")
1267 break
1268 elif command.startswith("#"):
1269 continue
1270 elif command == "echo":
1271 # just for test
1272 print(params)
1273 sys.stdout.flush()
1274 continue
1275 elif command == "test":
1276 asyncio.Task(self.test(params), loop=self.loop)
1277 continue
1278
1279 if topic == "admin":
1280 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1281 self.pings_not_received = 0
1282 continue
1283 elif topic == "ns":
1284 if command == "instantiate":
1285 # self.logger.debug("Deploying NS {}".format(nsr_id))
1286 nslcmop = params
1287 nslcmop_id = nslcmop["_id"]
1288 nsr_id = nslcmop["nsInstanceId"]
1289 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1290 if nsr_id not in self.lcm_ns_tasks:
1291 self.lcm_ns_tasks[nsr_id] = {}
1292 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1293 continue
1294 elif command == "terminate":
1295 # self.logger.debug("Deleting NS {}".format(nsr_id))
1296 nslcmop = params
1297 nslcmop_id = nslcmop["_id"]
1298 nsr_id = nslcmop["nsInstanceId"]
1299 self.cancel_tasks(topic, nsr_id)
1300 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1301 if nsr_id not in self.lcm_ns_tasks:
1302 self.lcm_ns_tasks[nsr_id] = {}
1303 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1304 continue
1305 elif command == "action":
1306 # self.logger.debug("Update NS {}".format(nsr_id))
1307 nslcmop = params
1308 nslcmop_id = nslcmop["_id"]
1309 nsr_id = nslcmop["nsInstanceId"]
1310 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1311 if nsr_id not in self.lcm_ns_tasks:
1312 self.lcm_ns_tasks[nsr_id] = {}
1313 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1314 continue
1315 elif command == "show":
1316 try:
1317 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1318 print(
1319 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1320 "{}\n deploy: {}\n tasks: {}".format(
1321 nsr_id, db_nsr["operational-status"],
1322 db_nsr["config-status"], db_nsr["detailed-status"],
1323 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1324 except Exception as e:
1325 print("nsr {} not found: {}".format(nsr_id, e))
1326 sys.stdout.flush()
1327 continue
1328 elif command == "deleted":
1329 continue # TODO cleaning of task just in case should be done
1330 elif topic == "vim_account":
1331 vim_id = params["_id"]
1332 if command == "create":
1333 task = asyncio.ensure_future(self.vim_create(params, order_id))
1334 if vim_id not in self.lcm_vim_tasks:
1335 self.lcm_vim_tasks[vim_id] = {}
1336 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1337 continue
1338 elif command == "delete":
1339 self.cancel_tasks(topic, vim_id)
1340 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1341 if vim_id not in self.lcm_vim_tasks:
1342 self.lcm_vim_tasks[vim_id] = {}
1343 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1344 continue
1345 elif command == "show":
1346 print("not implemented show with vim_account")
1347 sys.stdout.flush()
1348 continue
1349 elif command == "edit":
1350 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1351 if vim_id not in self.lcm_vim_tasks:
1352 self.lcm_vim_tasks[vim_id] = {}
1353 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1354 continue
1355 elif topic == "sdn":
1356 _sdn_id = params["_id"]
1357 if command == "create":
1358 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1359 if _sdn_id not in self.lcm_sdn_tasks:
1360 self.lcm_sdn_tasks[_sdn_id] = {}
1361 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1362 continue
1363 elif command == "delete":
1364 self.cancel_tasks(topic, _sdn_id)
1365 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1366 if _sdn_id not in self.lcm_sdn_tasks:
1367 self.lcm_sdn_tasks[_sdn_id] = {}
1368 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1369 continue
1370 elif command == "edit":
1371 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1372 if _sdn_id not in self.lcm_sdn_tasks:
1373 self.lcm_sdn_tasks[_sdn_id] = {}
1374 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1375 continue
1376 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1377 except Exception as e:
1378 # if not first_start is the first time after starting. So leave more time and wait
1379 # to allow kafka starts
1380 if consecutive_errors == 8 if not first_start else 30:
1381 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1382 raise
1383 consecutive_errors += 1
1384 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1385 wait_time = 2 if not first_start else 5
1386 await asyncio.sleep(wait_time, loop=self.loop)
1387
1388 # self.logger.debug("Task kafka_read terminating")
1389 self.logger.debug("Task kafka_read exit")
1390
1391 def start(self):
1392 self.loop = asyncio.get_event_loop()
1393 self.loop.run_until_complete(asyncio.gather(
1394 self.kafka_read(),
1395 self.kafka_ping()
1396 ))
1397 # TODO
1398 # self.logger.debug("Terminating cancelling creation tasks")
1399 # self.cancel_tasks("ALL", "create")
1400 # timeout = 200
1401 # while self.is_pending_tasks():
1402 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1403 # await asyncio.sleep(2, loop=self.loop)
1404 # timeout -= 2
1405 # if not timeout:
1406 # self.cancel_tasks("ALL", "ALL")
1407 self.loop.close()
1408 self.loop = None
1409 if self.db:
1410 self.db.db_disconnect()
1411 if self.msg:
1412 self.msg.disconnect()
1413 if self.fs:
1414 self.fs.fs_disconnect()
1415
1416
1417 def read_config_file(self, config_file):
1418 # TODO make a [ini] + yaml inside parser
1419 # the configparser library is not suitable, because it does not admit comments at the end of line,
1420 # and not parse integer or boolean
1421 try:
1422 with open(config_file) as f:
1423 conf = yaml.load(f)
1424 for k, v in environ.items():
1425 if not k.startswith("OSMLCM_"):
1426 continue
1427 k_items = k.lower().split("_")
1428 c = conf
1429 try:
1430 for k_item in k_items[1:-1]:
1431 if k_item in ("ro", "vca"):
1432 # put in capital letter
1433 k_item = k_item.upper()
1434 c = c[k_item]
1435 if k_items[-1] == "port":
1436 c[k_items[-1]] = int(v)
1437 else:
1438 c[k_items[-1]] = v
1439 except Exception as e:
1440 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1441
1442 return conf
1443 except Exception as e:
1444 self.logger.critical("At config file '{}': {}".format(config_file, e))
1445 exit(1)
1446
1447
1448 if __name__ == '__main__':
1449
1450 config_file = "lcm.cfg"
1451 lcm = Lcm(config_file)
1452
1453 lcm.start()