adding flake8 test
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import logging
8 import logging.handlers
9 import getopt
10 import functools
11 import sys
12 from osm_common import dbmemory
13 from osm_common import dbmongo
14 from osm_common import fslocal
15 from osm_common import msglocal
16 from osm_common import msgkafka
17 from osm_common.dbbase import DbException
18 from osm_common.fsbase import FsException
19 from osm_common.msgbase import MsgException
20 from os import environ, path
21 from n2vc.vnf import N2VC
22 from n2vc import version as N2VC_version
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 __author__ = "Alfonso Tierno"
30
31
32 class LcmException(Exception):
33 pass
34
35
36 class Lcm:
37
38 def __init__(self, config_file):
39 """
40 Init, Connect to database, filesystem storage, and messaging
41 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
42 :return: None
43 """
44
45 self.db = None
46 self.msg = None
47 self.fs = None
48 self.pings_not_received = 1
49
50 # contains created tasks/futures to be able to cancel
51 self.lcm_ns_tasks = {}
52 self.lcm_vim_tasks = {}
53 self.lcm_sdn_tasks = {}
54 # logging
55 self.logger = logging.getLogger('lcm')
56 # load configuration
57 config = self.read_config_file(config_file)
58 self.config = config
59 self.ro_config = {
60 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
61 "tenant": config.get("tenant", "osm"),
62 "logger_name": "lcm.ROclient",
63 "loglevel": "ERROR",
64 }
65
66 self.vca = config["VCA"] # TODO VCA
67 self.loop = None
68
69 # logging
70 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
71 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
72 config["database"]["logger_name"] = "lcm.db"
73 config["storage"]["logger_name"] = "lcm.fs"
74 config["message"]["logger_name"] = "lcm.msg"
75 if "logfile" in config["global"]:
76 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
77 maxBytes=100e6, backupCount=9, delay=0)
78 file_handler.setFormatter(log_formatter_simple)
79 self.logger.addHandler(file_handler)
80 else:
81 str_handler = logging.StreamHandler()
82 str_handler.setFormatter(log_formatter_simple)
83 self.logger.addHandler(str_handler)
84
85 if config["global"].get("loglevel"):
86 self.logger.setLevel(config["global"]["loglevel"])
87
88 # logging other modules
89 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
90 config[k1]["logger_name"] = logname
91 logger_module = logging.getLogger(logname)
92 if "logfile" in config[k1]:
93 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
94 maxBytes=100e6, backupCount=9, delay=0)
95 file_handler.setFormatter(log_formatter_simple)
96 logger_module.addHandler(file_handler)
97 if "loglevel" in config[k1]:
98 logger_module.setLevel(config[k1]["loglevel"])
99
100 self.n2vc = N2VC(
101 log=self.logger,
102 server=config['VCA']['host'],
103 port=config['VCA']['port'],
104 user=config['VCA']['user'],
105 secret=config['VCA']['secret'],
106 # TODO: This should point to the base folder where charms are stored,
107 # if there is a common one (like object storage). Otherwise, leave
108 # it unset and pass it via DeployCharms
109 # artifacts=config['VCA'][''],
110 artifacts=None,
111 )
112 # check version of N2VC
113 # TODO enhance with int conversion or from distutils.version import LooseVersion
114 # or with list(map(int, version.split(".")))
115 if N2VC_version < "0.0.2":
116 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
117 try:
118 if config["database"]["driver"] == "mongo":
119 self.db = dbmongo.DbMongo()
120 self.db.db_connect(config["database"])
121 elif config["database"]["driver"] == "memory":
122 self.db = dbmemory.DbMemory()
123 self.db.db_connect(config["database"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
126 config["database"]["driver"]))
127
128 if config["storage"]["driver"] == "local":
129 self.fs = fslocal.FsLocal()
130 self.fs.fs_connect(config["storage"])
131 else:
132 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
133 config["storage"]["driver"]))
134
135 if config["message"]["driver"] == "local":
136 self.msg = msglocal.MsgLocal()
137 self.msg.connect(config["message"])
138 elif config["message"]["driver"] == "kafka":
139 self.msg = msgkafka.MsgKafka()
140 self.msg.connect(config["message"])
141 else:
142 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
143 config["storage"]["driver"]))
144 except (DbException, FsException, MsgException) as e:
145 self.logger.critical(str(e), exc_info=True)
146 raise LcmException(str(e))
147
148 def update_db(self, item, _id, _desc):
149 try:
150 self.db.replace(item, _id, _desc)
151 except DbException as e:
152 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
153
154 def update_db_2(self, item, _id, _desc):
155 try:
156 self.db.set_one(item, {"_id": _id}, _desc)
157 except DbException as e:
158 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
159
160 async def vim_create(self, vim_content, order_id):
161 vim_id = vim_content["_id"]
162 logging_text = "Task vim_create={} ".format(vim_id)
163 self.logger.debug(logging_text + "Enter")
164 db_vim = None
165 exc = None
166 try:
167 step = "Getting vim from db"
168 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
169 if "_admin" not in db_vim:
170 db_vim["_admin"] = {}
171 if "deployed" not in db_vim["_admin"]:
172 db_vim["_admin"]["deployed"] = {}
173 db_vim["_admin"]["deployed"]["RO"] = None
174
175 step = "Creating vim at RO"
176 RO = ROclient.ROClient(self.loop, **self.ro_config)
177 vim_RO = deepcopy(vim_content)
178 vim_RO.pop("_id", None)
179 vim_RO.pop("_admin", None)
180 vim_RO.pop("schema_version", None)
181 vim_RO.pop("schema_type", None)
182 vim_RO.pop("vim_tenant_name", None)
183 vim_RO["type"] = vim_RO.pop("vim_type")
184 vim_RO.pop("vim_user", None)
185 vim_RO.pop("vim_password", None)
186 desc = await RO.create("vim", descriptor=vim_RO)
187 RO_vim_id = desc["uuid"]
188 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
189 self.update_db("vim_accounts", vim_id, db_vim)
190
191 step = "Attach vim to RO tenant"
192 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
193 "vim_username": vim_content["vim_user"],
194 "vim_password": vim_content["vim_password"],
195 "config": vim_content["config"]
196 }
197 desc = await RO.attach_datacenter(RO_vim_id, descriptor=vim_RO)
198 db_vim["_admin"]["operationalState"] = "ENABLED"
199 self.update_db("vim_accounts", vim_id, db_vim)
200
201 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
202 return RO_vim_id
203
204 except (ROclient.ROClientException, DbException) as e:
205 self.logger.error(logging_text + "Exit Exception {}".format(e))
206 exc = e
207 except Exception as e:
208 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
209 exc = e
210 finally:
211 if exc and db_vim:
212 db_vim["_admin"]["operationalState"] = "ERROR"
213 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
214 self.update_db("vim_accounts", vim_id, db_vim)
215
216 async def vim_edit(self, vim_content, order_id):
217 vim_id = vim_content["_id"]
218 logging_text = "Task vim_edit={} ".format(vim_id)
219 self.logger.debug(logging_text + "Enter")
220 db_vim = None
221 exc = None
222 step = "Getting vim from db"
223 try:
224 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
225 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
226 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
227 step = "Editing vim at RO"
228 RO = ROclient.ROClient(self.loop, **self.ro_config)
229 vim_RO = deepcopy(vim_content)
230 vim_RO.pop("_id", None)
231 vim_RO.pop("_admin", None)
232 vim_RO.pop("schema_version", None)
233 vim_RO.pop("schema_type", None)
234 vim_RO.pop("vim_tenant_name", None)
235 vim_RO["type"] = vim_RO.pop("vim_type")
236 vim_RO.pop("vim_user", None)
237 vim_RO.pop("vim_password", None)
238 if vim_RO:
239 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
240
241 step = "Editing vim-account at RO tenant"
242 vim_RO = {}
243 for k in ("vim_tenant_name", "vim_password", "config"):
244 if k in vim_content:
245 vim_RO[k] = vim_content[k]
246 if "vim_user" in vim_content:
247 vim_content["vim_username"] = vim_content["vim_user"]
248 if vim_RO:
249 await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
250 db_vim["_admin"]["operationalState"] = "ENABLED"
251 self.update_db("vim_accounts", vim_id, db_vim)
252
253 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
254 return RO_vim_id
255
256 except (ROclient.ROClientException, DbException) as e:
257 self.logger.error(logging_text + "Exit Exception {}".format(e))
258 exc = e
259 except Exception as e:
260 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
261 exc = e
262 finally:
263 if exc and db_vim:
264 db_vim["_admin"]["operationalState"] = "ERROR"
265 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
266 self.update_db("vim_accounts", vim_id, db_vim)
267
268 async def vim_delete(self, vim_id, order_id):
269 logging_text = "Task vim_delete={} ".format(vim_id)
270 self.logger.debug(logging_text + "Enter")
271 db_vim = None
272 exc = None
273 step = "Getting vim from db"
274 try:
275 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
276 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
277 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
278 RO = ROclient.ROClient(self.loop, **self.ro_config)
279 step = "Detaching vim from RO tenant"
280 try:
281 await RO.detach_datacenter(RO_vim_id)
282 except ROclient.ROClientException as e:
283 if e.http_code == 404: # not found
284 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
285 else:
286 raise
287
288 step = "Deleting vim from RO"
289 try:
290 await RO.delete("vim", RO_vim_id)
291 except ROclient.ROClientException as e:
292 if e.http_code == 404: # not found
293 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
294 else:
295 raise
296 else:
297 # nothing to delete
298 self.logger.error(logging_text + "Skipping. There is not RO information at database")
299 self.db.del_one("vim_accounts", {"_id": vim_id})
300 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
301 return None
302
303 except (ROclient.ROClientException, DbException) as e:
304 self.logger.error(logging_text + "Exit Exception {}".format(e))
305 exc = e
306 except Exception as e:
307 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
308 exc = e
309 finally:
310 if exc and db_vim:
311 db_vim["_admin"]["operationalState"] = "ERROR"
312 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
313 self.update_db("vim_accounts", vim_id, db_vim)
314
315 async def sdn_create(self, sdn_content, order_id):
316 sdn_id = sdn_content["_id"]
317 logging_text = "Task sdn_create={} ".format(sdn_id)
318 self.logger.debug(logging_text + "Enter")
319 db_sdn = None
320 exc = None
321 try:
322 step = "Getting sdn from db"
323 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
324 if "_admin" not in db_sdn:
325 db_sdn["_admin"] = {}
326 if "deployed" not in db_sdn["_admin"]:
327 db_sdn["_admin"]["deployed"] = {}
328 db_sdn["_admin"]["deployed"]["RO"] = None
329
330 step = "Creating sdn at RO"
331 RO = ROclient.ROClient(self.loop, **self.ro_config)
332 sdn_RO = deepcopy(sdn_content)
333 sdn_RO.pop("_id", None)
334 sdn_RO.pop("_admin", None)
335 sdn_RO.pop("schema_version", None)
336 sdn_RO.pop("schema_type", None)
337 sdn_RO.pop("description", None)
338 desc = await RO.create("sdn", descriptor=sdn_RO)
339 RO_sdn_id = desc["uuid"]
340 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
341 db_sdn["_admin"]["operationalState"] = "ENABLED"
342 self.update_db("sdns", sdn_id, db_sdn)
343 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
344 return RO_sdn_id
345
346 except (ROclient.ROClientException, DbException) as e:
347 self.logger.error(logging_text + "Exit Exception {}".format(e))
348 exc = e
349 except Exception as e:
350 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
351 exc = e
352 finally:
353 if exc and db_sdn:
354 db_sdn["_admin"]["operationalState"] = "ERROR"
355 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
356 self.update_db("sdns", sdn_id, db_sdn)
357
358 async def sdn_edit(self, sdn_content, order_id):
359 sdn_id = sdn_content["_id"]
360 logging_text = "Task sdn_edit={} ".format(sdn_id)
361 self.logger.debug(logging_text + "Enter")
362 db_sdn = None
363 exc = None
364 step = "Getting sdn from db"
365 try:
366 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
367 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
368 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
369 RO = ROclient.ROClient(self.loop, **self.ro_config)
370 step = "Editing sdn at RO"
371 sdn_RO = deepcopy(sdn_content)
372 sdn_RO.pop("_id", None)
373 sdn_RO.pop("_admin", None)
374 sdn_RO.pop("schema_version", None)
375 sdn_RO.pop("schema_type", None)
376 sdn_RO.pop("description", None)
377 if sdn_RO:
378 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
379 db_sdn["_admin"]["operationalState"] = "ENABLED"
380 self.update_db("sdns", sdn_id, db_sdn)
381
382 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
383 return RO_sdn_id
384
385 except (ROclient.ROClientException, DbException) as e:
386 self.logger.error(logging_text + "Exit Exception {}".format(e))
387 exc = e
388 except Exception as e:
389 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
390 exc = e
391 finally:
392 if exc and db_sdn:
393 db_sdn["_admin"]["operationalState"] = "ERROR"
394 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
395 self.update_db("sdns", sdn_id, db_sdn)
396
397 async def sdn_delete(self, sdn_id, order_id):
398 logging_text = "Task sdn_delete={} ".format(sdn_id)
399 self.logger.debug(logging_text + "Enter")
400 db_sdn = None
401 exc = None
402 step = "Getting sdn from db"
403 try:
404 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
405 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
406 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
407 RO = ROclient.ROClient(self.loop, **self.ro_config)
408 step = "Deleting sdn from RO"
409 try:
410 await RO.delete("sdn", RO_sdn_id)
411 except ROclient.ROClientException as e:
412 if e.http_code == 404: # not found
413 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
414 else:
415 raise
416 else:
417 # nothing to delete
418 self.logger.error(logging_text + "Skipping. There is not RO information at database")
419 self.db.del_one("sdns", {"_id": sdn_id})
420 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
421 return None
422
423 except (ROclient.ROClientException, DbException) as e:
424 self.logger.error(logging_text + "Exit Exception {}".format(e))
425 exc = e
426 except Exception as e:
427 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
428 exc = e
429 finally:
430 if exc and db_sdn:
431 db_sdn["_admin"]["operationalState"] = "ERROR"
432 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
433 self.update_db("sdns", sdn_id, db_sdn)
434
435 def vnfd2RO(self, vnfd, new_id=None):
436 """
437 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
438 :param vnfd: input vnfd
439 :param new_id: overrides vnf id if provided
440 :return: copy of vnfd
441 """
442 ci_file = None
443 try:
444 vnfd_RO = deepcopy(vnfd)
445 vnfd_RO.pop("_id", None)
446 vnfd_RO.pop("_admin", None)
447 if new_id:
448 vnfd_RO["id"] = new_id
449 for vdu in vnfd_RO["vdu"]:
450 if "cloud-init-file" in vdu:
451 base_folder = vnfd["_admin"]["storage"]
452 clout_init_file = "{}/{}/cloud_init/{}".format(
453 base_folder["folder"],
454 base_folder["pkg-dir"],
455 vdu["cloud-init-file"]
456 )
457 ci_file = self.fs.file_open(clout_init_file, "r")
458 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
459 # convert to base 64 or similar
460 clout_init_content = ci_file.read()
461 ci_file.close()
462 ci_file = None
463 vdu.pop("cloud-init-file", None)
464 vdu["cloud-init"] = clout_init_content
465 return vnfd_RO
466 except FsException as e:
467 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
468 finally:
469 if ci_file:
470 ci_file.close()
471
472 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, member_vnf_index,
473 task=None):
474 """
475 Callback both for charm status change and task completion
476 :param model_name: Charm model name
477 :param application_name: Charm application name
478 :param status: Can be
479 - blocked: The unit needs manual intervention
480 - maintenance: The unit is actively deploying/configuring
481 - waiting: The unit is waiting for another charm to be ready
482 - active: The unit is deployed, configured, and ready
483 - error: The charm has failed and needs attention.
484 - terminated: The charm has been destroyed
485 - removing,
486 - removed
487 :param message: detailed message error
488 :param db_nsr: nsr database content
489 :param db_nslcmop: nslcmop database content
490 :param member_vnf_index: NSD member-vnf-index
491 :param task: None for charm status change, or task for completion task callback
492 :return:
493 """
494 nsr_id = None
495 nslcmop_id = None
496 update_nsr = update_nslcmop = False
497 try:
498 nsr_id = db_nsr["_id"]
499 nslcmop_id = db_nslcmop["_id"]
500 nsr_lcm = db_nsr["_admin"]["deployed"]
501 ns_action = db_nslcmop["lcmOperationType"]
502 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
503 member_vnf_index)
504
505 if task:
506 if task.cancelled():
507 self.logger.debug(logging_text + " task Cancelled")
508 # TODO update db_nslcmop
509 return
510
511 if task.done():
512 exc = task.exception()
513 if exc:
514 self.logger.error(logging_text + " task Exception={}".format(exc))
515 if ns_action in ("instantiate", "terminate"):
516 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error"
517 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc)
518 elif ns_action == "action":
519 db_nslcmop["operationState"] = "FAILED"
520 db_nslcmop["detailed-status"] = str(exc)
521 db_nslcmop["statusEnteredTime"] = time()
522 update_nslcmop = True
523 return
524
525 else:
526 self.logger.debug(logging_text + " task Done")
527 # TODO revise with Adam if action is finished and ok when task is done
528 if ns_action == "action":
529 db_nslcmop["operationState"] = "COMPLETED"
530 db_nslcmop["detailed-status"] = "Done"
531 db_nslcmop["statusEnteredTime"] = time()
532 update_nslcmop = True
533 # task is Done, but callback is still ongoing. So ignore
534 return
535 elif status:
536 self.logger.debug(logging_text + " Enter status={}".format(status))
537 if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status:
538 return # same status, ignore
539 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status
540 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message)
541 else:
542 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
543 return
544
545 all_active = True
546 status_map = {}
547 n2vc_error_text = [] # contain text error list. If empty no one is in error status
548 for vnf_index, vca_info in nsr_lcm["VCA"].items():
549 vca_status = vca_info["operational-status"]
550 if vca_status not in status_map:
551 # Initialize it
552 status_map[vca_status] = 0
553 status_map[vca_status] += 1
554
555 if vca_status != "active":
556 all_active = False
557 elif vca_status in ("error", "blocked"):
558 n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status,
559 vca_info["detailed-status"]))
560
561 if all_active:
562 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id,
563 member_vnf_index))
564 db_nsr["config-status"] = "configured"
565 db_nsr["detailed-status"] = "done"
566 db_nslcmop["operationState"] = "COMPLETED"
567 db_nslcmop["detailed-status"] = "Done"
568 db_nslcmop["statusEnteredTime"] = time()
569 elif n2vc_error_text:
570 db_nsr["config-status"] = "failed"
571 error_text = "fail configuring " + ";".join(n2vc_error_text)
572 db_nsr["detailed-status"] = error_text
573 db_nslcmop["operationState"] = "FAILED_TEMP"
574 db_nslcmop["detailed-status"] = error_text
575 db_nslcmop["statusEnteredTime"] = time()
576 else:
577 cs = "configuring: "
578 separator = ""
579 for status, num in status_map.items():
580 cs += separator + "{}: {}".format(status, num)
581 separator = ", "
582 db_nsr["config-status"] = cs
583 db_nsr["detailed-status"] = cs
584 db_nslcmop["detailed-status"] = cs
585 update_nsr = update_nslcmop = True
586
587 except Exception as e:
588 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True)
589 finally:
590 try:
591 if update_nslcmop:
592 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
593 if update_nsr:
594 self.update_db("nsrs", nsr_id, db_nsr)
595 except Exception as e:
596 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
597 member_vnf_index, e), exc_info=True)
598
599 def ns_params_2_RO(self, ns_params):
600 """
601 Creates a RO ns descriptor from OSM ns_instantite params
602 :param ns_params: OSM instantiate params
603 :return: The RO ns descriptor
604 """
605 vim_2_RO = {}
606
607 def vim_account_2_RO(vim_account):
608 if vim_account in vim_2_RO:
609 return vim_2_RO[vim_account]
610 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
611 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
612 # #TODO check if VIM is creating and wait
613 if db_vim["_admin"]["operationalState"] != "ENABLED":
614 raise LcmException("VIM={} is not available. operationalState={}".format(
615 vim_account, db_vim["_admin"]["operationalState"]))
616 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
617 vim_2_RO[vim_account] = RO_vim_id
618 return RO_vim_id
619
620 if not ns_params:
621 return None
622 RO_ns_params = {
623 # "name": ns_params["nsName"],
624 # "description": ns_params.get("nsDescription"),
625 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
626 # "scenario": ns_params["nsdId"],
627 "vnfs": {},
628 "networks": {},
629 }
630 if ns_params.get("ssh-authorized-key"):
631 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
632 if ns_params.get("vnf"):
633 for vnf in ns_params["vnf"]:
634 RO_vnf = {}
635 if "vimAccountId" in vnf:
636 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
637 if RO_vnf:
638 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
639 if ns_params.get("vld"):
640 for vld in ns_params["vld"]:
641 RO_vld = {}
642 if "ip-profile" in vld:
643 RO_vld["ip-profile"] = vld["ip-profile"]
644 if "vim-network-name" in vld:
645 RO_vld["sites"] = []
646 if isinstance(vld["vim-network-name"], dict):
647 for vim_account, vim_net in vld["vim-network-name"].items():
648 RO_vld["sites"].append({
649 "netmap-use": vim_net,
650 "datacenter": vim_account_2_RO(vim_account)
651 })
652 else: # isinstance str
653 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
654 if RO_vld:
655 RO_ns_params["networks"][vld["name"]] = RO_vld
656 return RO_ns_params
657
658 async def ns_instantiate(self, nsr_id, nslcmop_id):
659 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
660 self.logger.debug(logging_text + "Enter")
661 # get all needed from database
662 db_nsr = None
663 db_nslcmop = None
664 db_vnfr = {}
665 exc = None
666 step = "Getting nsr, nslcmop, RO_vims from db"
667 try:
668 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
669 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
670 nsd = db_nsr["nsd"]
671 nsr_name = db_nsr["name"] # TODO short-name??
672 needed_vnfd = {}
673 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
674 for c_vnf in nsd["constituent-vnfd"]:
675 vnfd_id = c_vnf["vnfd-id-ref"]
676 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
677 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
678 if vnfd_id not in needed_vnfd:
679 step = "Getting vnfd={} from db".format(vnfd_id)
680 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
681
682 nsr_lcm = db_nsr["_admin"].get("deployed")
683 if not nsr_lcm:
684 nsr_lcm = db_nsr["_admin"]["deployed"] = {
685 "id": nsr_id,
686 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
687 "nsr_ip": {},
688 "VCA": {},
689 }
690 db_nsr["detailed-status"] = "creating"
691 db_nsr["operational-status"] = "init"
692
693 RO = ROclient.ROClient(self.loop, **self.ro_config)
694
695 # get vnfds, instantiate at RO
696 for vnfd_id, vnfd in needed_vnfd.items():
697 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
698 self.logger.debug(logging_text + step)
699 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
700
701 # look if present
702 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
703 if vnfd_list:
704 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
705 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
706 vnfd_id, vnfd_list[0]["uuid"]))
707 else:
708 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
709 desc = await RO.create("vnfd", descriptor=vnfd_RO)
710 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
711 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
712 self.update_db("nsrs", nsr_id, db_nsr)
713
714 # create nsd at RO
715 nsd_id = nsd["id"]
716 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
717 self.logger.debug(logging_text + step)
718
719 nsd_id_RO = nsd_id + "." + nsd_id[:200]
720 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
721 if nsd_list:
722 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
723 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
724 nsd_id, nsd_list[0]["uuid"]))
725 else:
726 nsd_RO = deepcopy(nsd)
727 nsd_RO["id"] = nsd_id_RO
728 nsd_RO.pop("_id", None)
729 nsd_RO.pop("_admin", None)
730 for c_vnf in nsd_RO["constituent-vnfd"]:
731 vnfd_id = c_vnf["vnfd-id-ref"]
732 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
733 desc = await RO.create("nsd", descriptor=nsd_RO)
734 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
735 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
736 self.update_db("nsrs", nsr_id, db_nsr)
737
738 # Crate ns at RO
739 # if present use it unless in error status
740 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
741 if RO_nsr_id:
742 try:
743 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
744 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
745 desc = await RO.show("ns", RO_nsr_id)
746 except ROclient.ROClientException as e:
747 if e.http_code != HTTPStatus.NOT_FOUND:
748 raise
749 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
750 if RO_nsr_id:
751 ns_status, ns_status_info = RO.check_ns_status(desc)
752 nsr_lcm["RO"]["nsr_status"] = ns_status
753 if ns_status == "ERROR":
754 step = db_nsr["detailed-status"] = "Deleting ns at RO"
755 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
756 await RO.delete("ns", RO_nsr_id)
757 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
758 if not RO_nsr_id:
759 step = db_nsr["detailed-status"] = "Creating ns at RO"
760 self.logger.debug(logging_text + step)
761 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
762 desc = await RO.create("ns", descriptor=RO_ns_params,
763 name=db_nsr["name"],
764 scenario=nsr_lcm["RO"]["nsd_id"])
765 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
766 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
767 nsr_lcm["RO"]["nsr_status"] = "BUILD"
768
769 self.update_db("nsrs", nsr_id, db_nsr)
770 # update VNFR vimAccount
771 step = "Updating VNFR vimAcccount"
772 for vnf_index, vnfr in db_vnfr.items():
773 if vnfr.get("vim-account-id"):
774 continue
775 if db_nsr["instantiate_params"].get("vnf") and db_nsr["instantiate_params"]["vnf"].get(vnf_index) \
776 and db_nsr["instantiate_params"]["vnf"][vnf_index].get("vimAccountId"):
777 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vnf"][vnf_index]["vimAccountId"]
778 else:
779 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
780 self.update_db("vnfrs", vnfr["_id"], vnfr)
781
782 # wait until NS is ready
783 step = ns_status_detailed = "Waiting ns ready at RO"
784 db_nsr["detailed-status"] = ns_status_detailed
785 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
786 deployment_timeout = 2 * 3600 # Two hours
787 while deployment_timeout > 0:
788 desc = await RO.show("ns", RO_nsr_id)
789 ns_status, ns_status_info = RO.check_ns_status(desc)
790 nsr_lcm["RO"]["nsr_status"] = ns_status
791 if ns_status == "ERROR":
792 raise ROclient.ROClientException(ns_status_info)
793 elif ns_status == "BUILD":
794 db_nsr_detailed_status_old = db_nsr["detailed-status"]
795 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
796 if db_nsr_detailed_status_old != db_nsr["detailed-status"]:
797 self.update_db("nsrs", nsr_id, db_nsr)
798 elif ns_status == "ACTIVE":
799 step = "Waiting for management IP address from VIM"
800 try:
801 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
802 break
803 except ROclient.ROClientException as e:
804 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
805 raise e
806 else:
807 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
808 await asyncio.sleep(5, loop=self.loop)
809 deployment_timeout -= 5
810 if deployment_timeout <= 0:
811 raise ROclient.ROClientException("Timeout waiting ns to be ready")
812 step = "Updating VNFRs"
813 for vnf_index, vnfr_deployed in ns_RO_info.items():
814 vnfr = db_vnfr[vnf_index]
815 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
816 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
817 for vdur in vnfr["vdur"]:
818 if vdur["vdu-id-ref"] == vdu_id:
819 vdur["vim-id"] = vdu_deployed.get("vim_id")
820 vdur["ip-address"] = vdu_deployed.get("ip_address")
821 break
822 self.update_db("vnfrs", vnfr["_id"], vnfr)
823
824 db_nsr["detailed-status"] = "Configuring vnfr"
825 self.update_db("nsrs", nsr_id, db_nsr)
826
827 # The parameters we'll need to deploy a charm
828 number_to_configure = 0
829
830 def deploy():
831 """An inner function to deploy the charm from either vnf or vdu
832 """
833
834 # Login to the VCA.
835 # if number_to_configure == 0:
836 # self.logger.debug("Logging into N2VC...")
837 # task = asyncio.ensure_future(self.n2vc.login())
838 # yield from asyncio.wait_for(task, 30.0)
839 # self.logger.debug("Logged into N2VC!")
840
841 # # await self.n2vc.login()
842
843 # Note: The charm needs to exist on disk at the location
844 # specified by charm_path.
845 base_folder = vnfd["_admin"]["storage"]
846 storage_params = self.fs.get_params()
847 charm_path = "{}{}/{}/charms/{}".format(
848 storage_params["path"],
849 base_folder["folder"],
850 base_folder["pkg-dir"],
851 proxy_charm
852 )
853
854 # Setup the runtime parameters for this VNF
855 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
856
857 # ns_name will be ignored in the current version of N2VC
858 # but will be implemented for the next point release.
859 model_name = 'default'
860 application_name = self.n2vc.FormatApplicationName(
861 nsr_name,
862 vnf_index,
863 vnfd['name'],
864 )
865
866 nsr_lcm["VCA"][vnf_index] = {
867 "model": model_name,
868 "application": application_name,
869 "operational-status": "init",
870 "detailed-status": "",
871 "vnfd_id": vnfd_id,
872 }
873
874 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
875 proxy_charm))
876 task = asyncio.ensure_future(
877 self.n2vc.DeployCharms(
878 model_name, # The network service name
879 application_name, # The application name
880 vnfd, # The vnf descriptor
881 charm_path, # Path to charm
882 params, # Runtime params, like mgmt ip
883 {}, # for native charms only
884 self.n2vc_callback, # Callback for status changes
885 db_nsr, # Callback parameter
886 db_nslcmop,
887 vnf_index, # Callback parameter
888 None, # Callback parameter (task)
889 )
890 )
891 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
892 db_nsr, db_nslcmop, vnf_index))
893 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
894
895 # TODO: Make this call inside deploy()
896 # Login to the VCA. If there are multiple calls to login(),
897 # subsequent calls will be a nop and return immediately.
898 await self.n2vc.login()
899
900 step = "Looking for needed vnfd to configure"
901 self.logger.debug(logging_text + step)
902 for c_vnf in nsd["constituent-vnfd"]:
903 vnfd_id = c_vnf["vnfd-id-ref"]
904 vnf_index = str(c_vnf["member-vnf-index"])
905 vnfd = needed_vnfd[vnfd_id]
906
907 # Check if this VNF has a charm configuration
908 vnf_config = vnfd.get("vnf-configuration")
909
910 if vnf_config and vnf_config.get("juju"):
911 proxy_charm = vnf_config["juju"]["charm"]
912 params = {}
913
914 if proxy_charm:
915 if 'initial-config-primitive' in vnf_config:
916 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
917
918 deploy()
919 number_to_configure += 1
920
921 # Deploy charms for each VDU that supports one.
922 for vdu in vnfd['vdu']:
923 vdu_config = vdu.get('vdu-configuration')
924 proxy_charm = None
925 params = {}
926
927 if vdu_config and vdu_config.get("juju"):
928 proxy_charm = vdu_config["juju"]["charm"]
929
930 if 'initial-config-primitive' in vdu_config:
931 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
932
933 if proxy_charm:
934 deploy()
935 number_to_configure += 1
936
937 if number_to_configure:
938 db_nsr["config-status"] = "configuring"
939 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
940 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
941 else:
942 db_nslcmop["operationState"] = "COMPLETED"
943 db_nslcmop["detailed-status"] = "done"
944 db_nsr["config-status"] = "configured"
945 db_nsr["detailed-status"] = "done"
946 db_nsr["operational-status"] = "running"
947 self.update_db("nsrs", nsr_id, db_nsr)
948 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
949 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
950 return nsr_lcm
951
952 except (ROclient.ROClientException, DbException, LcmException) as e:
953 self.logger.error(logging_text + "Exit Exception {}".format(e))
954 exc = e
955 except Exception as e:
956 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
957 exc = e
958 finally:
959 if exc:
960 if db_nsr:
961 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
962 db_nsr["operational-status"] = "failed"
963 self.update_db("nsrs", nsr_id, db_nsr)
964 if db_nslcmop:
965 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
966 db_nslcmop["operationState"] = "FAILED"
967 db_nslcmop["statusEnteredTime"] = time()
968 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
969
970 async def ns_terminate(self, nsr_id, nslcmop_id):
971 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
972 self.logger.debug(logging_text + "Enter")
973 db_nsr = None
974 db_nslcmop = None
975 exc = None
976 step = "Getting nsr, nslcmop from db"
977 failed_detail = [] # annotates all failed error messages
978 vca_task_list = []
979 vca_task_dict = {}
980 try:
981 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
982 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
983 # nsd = db_nsr["nsd"]
984 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
985 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
986 return
987 # TODO ALF remove
988 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
989 # #TODO check if VIM is creating and wait
990 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
991
992 db_nsr_update = {
993 "operational-status": "terminating",
994 "config-status": "terminating",
995 "detailed-status": "Deleting charms",
996 }
997 self.update_db_2("nsrs", nsr_id, db_nsr_update)
998
999 try:
1000 self.logger.debug(logging_text + step)
1001 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
1002 if deploy_info and deploy_info.get("application"):
1003 task = asyncio.ensure_future(
1004 self.n2vc.RemoveCharms(
1005 deploy_info['model'],
1006 deploy_info['application'],
1007 # self.n2vc_callback,
1008 # db_nsr,
1009 # db_nslcmop,
1010 # vnf_index,
1011 )
1012 )
1013 vca_task_list.append(task)
1014 vca_task_dict[vnf_index] = task
1015 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1016 # deploy_info['application'], None, db_nsr,
1017 # db_nslcmop, vnf_index))
1018 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1019 except Exception as e:
1020 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1021 # remove from RO
1022
1023 RO = ROclient.ROClient(self.loop, **self.ro_config)
1024 # Delete ns
1025 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1026 if RO_nsr_id:
1027 try:
1028 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1029 self.logger.debug(logging_text + step)
1030 await RO.delete("ns", RO_nsr_id)
1031 nsr_lcm["RO"]["nsr_id"] = None
1032 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1033 except ROclient.ROClientException as e:
1034 if e.http_code == 404: # not found
1035 nsr_lcm["RO"]["nsr_id"] = None
1036 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1037 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1038 elif e.http_code == 409: # conflict
1039 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1040 self.logger.debug(logging_text + failed_detail[-1])
1041 else:
1042 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1043 self.logger.error(logging_text + failed_detail[-1])
1044
1045 # Delete nsd
1046 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1047 if RO_nsd_id:
1048 try:
1049 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1050 await RO.delete("nsd", RO_nsd_id)
1051 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1052 nsr_lcm["RO"]["nsd_id"] = None
1053 except ROclient.ROClientException as e:
1054 if e.http_code == 404: # not found
1055 nsr_lcm["RO"]["nsd_id"] = None
1056 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1057 elif e.http_code == 409: # conflict
1058 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1059 self.logger.debug(logging_text + failed_detail[-1])
1060 else:
1061 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1062 self.logger.error(logging_text + failed_detail[-1])
1063
1064 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1065 if not RO_vnfd_id:
1066 continue
1067 try:
1068 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1069 await RO.delete("vnfd", RO_vnfd_id)
1070 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1071 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1072 except ROclient.ROClientException as e:
1073 if e.http_code == 404: # not found
1074 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1075 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1076 elif e.http_code == 409: # conflict
1077 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1078 self.logger.debug(logging_text + failed_detail[-1])
1079 else:
1080 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1081 self.logger.error(logging_text + failed_detail[-1])
1082
1083 if vca_task_list:
1084 await asyncio.wait(vca_task_list, timeout=300)
1085 for vnf_index, task in vca_task_dict.items():
1086 if task.cancelled():
1087 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1088 elif task.done():
1089 exc = task.exception()
1090 if exc:
1091 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1092 else:
1093 nsr_lcm["VCA"][vnf_index] = None
1094 else: # timeout
1095 # TODO Should it be cancelled?!!
1096 task.cancel()
1097 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1098
1099 if failed_detail:
1100 self.logger.error(logging_text + " ;".join(failed_detail))
1101 db_nsr_update = {
1102 "operational-status": "failed",
1103 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1104 "_admin": {"deployed": nsr_lcm, }
1105 }
1106 db_nslcmop_update = {
1107 "detailed-status": "; ".join(failed_detail),
1108 "operationState": "FAILED",
1109 "statusEnteredTime": time()
1110 }
1111 elif db_nslcmop["operationParams"].get("autoremove"):
1112 self.db.del_one("nsrs", {"_id": nsr_id})
1113 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1114 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1115 else:
1116 db_nsr_update = {
1117 "operational-status": "terminated",
1118 "detailed-status": "Done",
1119 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1120 }
1121 db_nslcmop_update = {
1122 "detailed-status": "Done",
1123 "operationState": "COMPLETED",
1124 "statusEnteredTime": time()
1125 }
1126 self.logger.debug(logging_text + "Exit")
1127
1128 except (ROclient.ROClientException, DbException) as e:
1129 self.logger.error(logging_text + "Exit Exception {}".format(e))
1130 exc = e
1131 except Exception as e:
1132 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1133 exc = e
1134 finally:
1135 if exc and db_nslcmop:
1136 db_nslcmop_update = {
1137 "detailed-status": "FAILED {}: {}".format(step, exc),
1138 "operationState": "FAILED",
1139 "statusEnteredTime": time(),
1140 }
1141 if db_nslcmop_update:
1142 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1143 if db_nsr_update:
1144 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1145
1146 async def ns_action(self, nsr_id, nslcmop_id):
1147 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1148 self.logger.debug(logging_text + "Enter")
1149 # get all needed from database
1150 db_nsr = None
1151 db_nslcmop = None
1152 db_nslcmop_update = None
1153 exc = None
1154 try:
1155 step = "Getting information from database"
1156 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1157 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1158 nsr_lcm = db_nsr["_admin"].get("deployed")
1159 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1160
1161 # TODO check if ns is in a proper status
1162 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1163 if not vca_deployed:
1164 raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index))
1165 model_name = vca_deployed.get("model")
1166 application_name = vca_deployed.get("application")
1167 if not model_name or not application_name:
1168 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index))
1169 if vca_deployed["operational-status"] != "active":
1170 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1171 vnf_index, vca_deployed["operational-status"]))
1172 primitive = db_nslcmop["operationParams"]["primitive"]
1173 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1174 callback = None # self.n2vc_callback
1175 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1176 await self.n2vc.login()
1177 task = asyncio.ensure_future(
1178 self.n2vc.ExecutePrimitive(
1179 model_name,
1180 application_name,
1181 primitive, callback,
1182 *callback_args,
1183 **primitive_params
1184 )
1185 )
1186 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1187 # db_nsr, db_nslcmop, vnf_index))
1188 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1189 # wait until completed with timeout
1190 await asyncio.wait((task,), timeout=300)
1191
1192 result = "FAILED" # by default
1193 result_detail = ""
1194 if task.cancelled():
1195 db_nslcmop["detailed-status"] = "Task has been cancelled"
1196 elif task.done():
1197 exc = task.exception()
1198 if exc:
1199 result_detail = str(exc)
1200 else:
1201 self.logger.debug(logging_text + " task Done")
1202 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1203 result = "COMPLETED"
1204 result_detail = "Done"
1205 else: # timeout
1206 # TODO Should it be cancelled?!!
1207 task.cancel()
1208 result_detail = "timeout"
1209
1210 db_nslcmop_update = {
1211 "detailed-status": result_detail,
1212 "operationState": result,
1213 "statusEnteredTime": time()
1214 }
1215 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1216 return # database update is called inside finally
1217
1218 except (DbException, LcmException) as e:
1219 self.logger.error(logging_text + "Exit Exception {}".format(e))
1220 exc = e
1221 except Exception as e:
1222 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1223 exc = e
1224 finally:
1225 if exc and db_nslcmop:
1226 db_nslcmop_update = {
1227 "detailed-status": "FAILED {}: {}".format(step, exc),
1228 "operationState": "FAILED",
1229 "statusEnteredTime": time(),
1230 }
1231 if db_nslcmop_update:
1232 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1233
1234 async def test(self, param=None):
1235 self.logger.debug("Starting/Ending test task: {}".format(param))
1236
1237 def cancel_tasks(self, topic, _id):
1238 """
1239 Cancel all active tasks of a concrete nsr or vim identified for _id
1240 :param topic: can be ns or vim_account
1241 :param _id: nsr or vim identity
1242 :return: None, or raises an exception if not possible
1243 """
1244 if topic == "ns":
1245 lcm_tasks = self.lcm_ns_tasks
1246 elif topic == "vim_account":
1247 lcm_tasks = self.lcm_vim_tasks
1248 elif topic == "sdn":
1249 lcm_tasks = self.lcm_sdn_tasks
1250
1251 if not lcm_tasks.get(_id):
1252 return
1253 for order_id, tasks_set in lcm_tasks[_id].items():
1254 for task_name, task in tasks_set.items():
1255 result = task.cancel()
1256 if result:
1257 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1258 lcm_tasks[_id] = {}
1259
1260 async def kafka_ping(self):
1261 self.logger.debug("Task kafka_ping Enter")
1262 consecutive_errors = 0
1263 first_start = True
1264 kafka_has_received = False
1265 self.pings_not_received = 1
1266 while True:
1267 try:
1268 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1269 # time between pings are low when it is not received and at starting
1270 wait_time = 5 if not kafka_has_received else 120
1271 if not self.pings_not_received:
1272 kafka_has_received = True
1273 self.pings_not_received += 1
1274 await asyncio.sleep(wait_time, loop=self.loop)
1275 if self.pings_not_received > 10:
1276 raise LcmException("It is not receiving pings from Kafka bus")
1277 consecutive_errors = 0
1278 first_start = False
1279 except LcmException:
1280 raise
1281 except Exception as e:
1282 # if not first_start is the first time after starting. So leave more time and wait
1283 # to allow kafka starts
1284 if consecutive_errors == 8 if not first_start else 30:
1285 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1286 raise
1287 consecutive_errors += 1
1288 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1289 wait_time = 1 if not first_start else 5
1290 await asyncio.sleep(wait_time, loop=self.loop)
1291
1292 async def kafka_read(self):
1293 self.logger.debug("Task kafka_read Enter")
1294 order_id = 1
1295 # future = asyncio.Future()
1296 consecutive_errors = 0
1297 first_start = True
1298 while consecutive_errors < 10:
1299 try:
1300 topics = ("admin", "ns", "vim_account", "sdn")
1301 topic, command, params = await self.msg.aioread(topics, self.loop)
1302 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1303 consecutive_errors = 0
1304 first_start = False
1305 order_id += 1
1306 if command == "exit":
1307 print("Bye!")
1308 break
1309 elif command.startswith("#"):
1310 continue
1311 elif command == "echo":
1312 # just for test
1313 print(params)
1314 sys.stdout.flush()
1315 continue
1316 elif command == "test":
1317 asyncio.Task(self.test(params), loop=self.loop)
1318 continue
1319
1320 if topic == "admin":
1321 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1322 self.pings_not_received = 0
1323 continue
1324 elif topic == "ns":
1325 if command == "instantiate":
1326 # self.logger.debug("Deploying NS {}".format(nsr_id))
1327 nslcmop = params
1328 nslcmop_id = nslcmop["_id"]
1329 nsr_id = nslcmop["nsInstanceId"]
1330 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1331 if nsr_id not in self.lcm_ns_tasks:
1332 self.lcm_ns_tasks[nsr_id] = {}
1333 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1334 continue
1335 elif command == "terminate":
1336 # self.logger.debug("Deleting NS {}".format(nsr_id))
1337 nslcmop = params
1338 nslcmop_id = nslcmop["_id"]
1339 nsr_id = nslcmop["nsInstanceId"]
1340 self.cancel_tasks(topic, nsr_id)
1341 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1342 if nsr_id not in self.lcm_ns_tasks:
1343 self.lcm_ns_tasks[nsr_id] = {}
1344 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1345 continue
1346 elif command == "action":
1347 # self.logger.debug("Update NS {}".format(nsr_id))
1348 nslcmop = params
1349 nslcmop_id = nslcmop["_id"]
1350 nsr_id = nslcmop["nsInstanceId"]
1351 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1352 if nsr_id not in self.lcm_ns_tasks:
1353 self.lcm_ns_tasks[nsr_id] = {}
1354 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1355 continue
1356 elif command == "show":
1357 try:
1358 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1359 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1360 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1361 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
1362 db_nsr["detailed-status"],
1363 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1364 except Exception as e:
1365 print("nsr {} not found: {}".format(nsr_id, e))
1366 sys.stdout.flush()
1367 continue
1368 elif command == "deleted":
1369 continue # TODO cleaning of task just in case should be done
1370 elif topic == "vim_account":
1371 vim_id = params["_id"]
1372 if command == "create":
1373 task = asyncio.ensure_future(self.vim_create(params, order_id))
1374 if vim_id not in self.lcm_vim_tasks:
1375 self.lcm_vim_tasks[vim_id] = {}
1376 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1377 continue
1378 elif command == "delete":
1379 self.cancel_tasks(topic, vim_id)
1380 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1381 if vim_id not in self.lcm_vim_tasks:
1382 self.lcm_vim_tasks[vim_id] = {}
1383 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1384 continue
1385 elif command == "show":
1386 print("not implemented show with vim_account")
1387 sys.stdout.flush()
1388 continue
1389 elif command == "edit":
1390 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1391 if vim_id not in self.lcm_vim_tasks:
1392 self.lcm_vim_tasks[vim_id] = {}
1393 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1394 continue
1395 elif topic == "sdn":
1396 _sdn_id = params["_id"]
1397 if command == "create":
1398 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1399 if _sdn_id not in self.lcm_sdn_tasks:
1400 self.lcm_sdn_tasks[_sdn_id] = {}
1401 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1402 continue
1403 elif command == "delete":
1404 self.cancel_tasks(topic, _sdn_id)
1405 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1406 if _sdn_id not in self.lcm_sdn_tasks:
1407 self.lcm_sdn_tasks[_sdn_id] = {}
1408 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1409 continue
1410 elif command == "edit":
1411 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1412 if _sdn_id not in self.lcm_sdn_tasks:
1413 self.lcm_sdn_tasks[_sdn_id] = {}
1414 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1415 continue
1416 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1417 except Exception as e:
1418 # if not first_start is the first time after starting. So leave more time and wait
1419 # to allow kafka starts
1420 if consecutive_errors == 8 if not first_start else 30:
1421 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1422 raise
1423 consecutive_errors += 1
1424 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1425 wait_time = 2 if not first_start else 5
1426 await asyncio.sleep(wait_time, loop=self.loop)
1427
1428 # self.logger.debug("Task kafka_read terminating")
1429 self.logger.debug("Task kafka_read exit")
1430
1431 def start(self):
1432 self.loop = asyncio.get_event_loop()
1433 self.loop.run_until_complete(asyncio.gather(
1434 self.kafka_read(),
1435 self.kafka_ping()
1436 ))
1437 # TODO
1438 # self.logger.debug("Terminating cancelling creation tasks")
1439 # self.cancel_tasks("ALL", "create")
1440 # timeout = 200
1441 # while self.is_pending_tasks():
1442 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1443 # await asyncio.sleep(2, loop=self.loop)
1444 # timeout -= 2
1445 # if not timeout:
1446 # self.cancel_tasks("ALL", "ALL")
1447 self.loop.close()
1448 self.loop = None
1449 if self.db:
1450 self.db.db_disconnect()
1451 if self.msg:
1452 self.msg.disconnect()
1453 if self.fs:
1454 self.fs.fs_disconnect()
1455
1456 def read_config_file(self, config_file):
1457 # TODO make a [ini] + yaml inside parser
1458 # the configparser library is not suitable, because it does not admit comments at the end of line,
1459 # and not parse integer or boolean
1460 try:
1461 with open(config_file) as f:
1462 conf = yaml.load(f)
1463 for k, v in environ.items():
1464 if not k.startswith("OSMLCM_"):
1465 continue
1466 k_items = k.lower().split("_")
1467 c = conf
1468 try:
1469 for k_item in k_items[1:-1]:
1470 if k_item in ("ro", "vca"):
1471 # put in capital letter
1472 k_item = k_item.upper()
1473 c = c[k_item]
1474 if k_items[-1] == "port":
1475 c[k_items[-1]] = int(v)
1476 else:
1477 c[k_items[-1]] = v
1478 except Exception as e:
1479 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1480
1481 return conf
1482 except Exception as e:
1483 self.logger.critical("At config file '{}': {}".format(config_file, e))
1484 exit(1)
1485
1486
1487 def usage():
1488 print("""Usage: {} [options]
1489 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
1490 -h|--help: shows this help
1491 """.format(sys.argv[0]))
1492 # --log-socket-host HOST: send logs to this host")
1493 # --log-socket-port PORT: send logs using this port (default: 9022)")
1494
1495
1496 if __name__ == '__main__':
1497 try:
1498 # load parameters and configuration
1499 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
1500 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
1501 config_file = None
1502 for o, a in opts:
1503 if o in ("-h", "--help"):
1504 usage()
1505 sys.exit()
1506 elif o in ("-c", "--config"):
1507 config_file = a
1508 # elif o == "--log-socket-port":
1509 # log_socket_port = a
1510 # elif o == "--log-socket-host":
1511 # log_socket_host = a
1512 # elif o == "--log-file":
1513 # log_file = a
1514 else:
1515 assert False, "Unhandled option"
1516 if config_file:
1517 if not path.isfile(config_file):
1518 print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
1519 exit(1)
1520 else:
1521 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
1522 if path.isfile(config_file):
1523 break
1524 else:
1525 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
1526 exit(1)
1527 lcm = Lcm(config_file)
1528 lcm.start()
1529 except getopt.GetoptError as e:
1530 print(str(e), file=sys.stderr)
1531 # usage()
1532 exit(1)