Initial LCM contribution
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 from osm_common import dbmemory
8 from osm_common import dbmongo
9 from osm_common import fslocal
10 from osm_common import msglocal
11 from osm_common import msgkafka
12 import logging
13 import functools
14 import sys
15 from osm_common.dbbase import DbException
16 from osm_common.fsbase import FsException
17 from osm_common.msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 from n2vc import version as N2VC_version
22 # import os.path
23 # import time
24
25 from copy import deepcopy
26 from http import HTTPStatus
27 from time import time
28
29
30 class LcmException(Exception):
31 pass
32
33
34 class Lcm:
35
36 def __init__(self, config_file):
37 """
38 Init, Connect to database, filesystem storage, and messaging
39 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
40 :return: None
41 """
42
43 self.db = None
44 self.msg = None
45 self.fs = None
46 self.pings_not_received = 1
47
48 # contains created tasks/futures to be able to cancel
49 self.lcm_ns_tasks = {}
50 self.lcm_vim_tasks = {}
51 self.lcm_sdn_tasks = {}
52 # logging
53 self.logger = logging.getLogger('lcm')
54 # load configuration
55 config = self.read_config_file(config_file)
56 self.config = config
57 self.ro_config={
58 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
59 "tenant": config.get("tenant", "osm"),
60 "logger_name": "lcm.ROclient",
61 "loglevel": "ERROR",
62 }
63
64 self.vca = config["VCA"] # TODO VCA
65 self.loop = None
66
67 # logging
68 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
69 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
70 config["database"]["logger_name"] = "lcm.db"
71 config["storage"]["logger_name"] = "lcm.fs"
72 config["message"]["logger_name"] = "lcm.msg"
73 if "logfile" in config["global"]:
74 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
75 maxBytes=100e6, backupCount=9, delay=0)
76 file_handler.setFormatter(log_formatter_simple)
77 self.logger.addHandler(file_handler)
78 else:
79 str_handler = logging.StreamHandler()
80 str_handler.setFormatter(log_formatter_simple)
81 self.logger.addHandler(str_handler)
82
83 if config["global"].get("loglevel"):
84 self.logger.setLevel(config["global"]["loglevel"])
85
86 # logging other modules
87 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
88 config[k1]["logger_name"] = logname
89 logger_module = logging.getLogger(logname)
90 if "logfile" in config[k1]:
91 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
92 maxBytes=100e6, backupCount=9, delay=0)
93 file_handler.setFormatter(log_formatter_simple)
94 logger_module.addHandler(file_handler)
95 if "loglevel" in config[k1]:
96 logger_module.setLevel(config[k1]["loglevel"])
97
98 self.n2vc = N2VC(
99 log=self.logger,
100 server=config['VCA']['host'],
101 port=config['VCA']['port'],
102 user=config['VCA']['user'],
103 secret=config['VCA']['secret'],
104 # TODO: This should point to the base folder where charms are stored,
105 # if there is a common one (like object storage). Otherwise, leave
106 # it unset and pass it via DeployCharms
107 # artifacts=config['VCA'][''],
108 artifacts=None,
109 )
110 # check version of N2VC
111 # TODO enhance with int conversion or from distutils.version import LooseVersion
112 # or with list(map(int, version.split(".")))
113 if N2VC_version < "0.0.2":
114 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
115 try:
116 if config["database"]["driver"] == "mongo":
117 self.db = dbmongo.DbMongo()
118 self.db.db_connect(config["database"])
119 elif config["database"]["driver"] == "memory":
120 self.db = dbmemory.DbMemory()
121 self.db.db_connect(config["database"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
124 config["database"]["driver"]))
125
126 if config["storage"]["driver"] == "local":
127 self.fs = fslocal.FsLocal()
128 self.fs.fs_connect(config["storage"])
129 else:
130 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
131 config["storage"]["driver"]))
132
133 if config["message"]["driver"] == "local":
134 self.msg = msglocal.MsgLocal()
135 self.msg.connect(config["message"])
136 elif config["message"]["driver"] == "kafka":
137 self.msg = msgkafka.MsgKafka()
138 self.msg.connect(config["message"])
139 else:
140 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
141 config["storage"]["driver"]))
142 except (DbException, FsException, MsgException) as e:
143 self.logger.critical(str(e), exc_info=True)
144 raise LcmException(str(e))
145
146 def update_db(self, item, _id, _desc):
147 try:
148 self.db.replace(item, _id, _desc)
149 except DbException as e:
150 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
151
152 def update_db_2(self, item, _id, _desc):
153 try:
154 self.db.set_one(item, {"_id": _id}, _desc)
155 except DbException as e:
156 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
157
158 async def vim_create(self, vim_content, order_id):
159 vim_id = vim_content["_id"]
160 logging_text = "Task vim_create={} ".format(vim_id)
161 self.logger.debug(logging_text + "Enter")
162 db_vim = None
163 exc = None
164 try:
165 step = "Getting vim from db"
166 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
167 if "_admin" not in db_vim:
168 db_vim["_admin"] = {}
169 if "deployed" not in db_vim["_admin"]:
170 db_vim["_admin"]["deployed"] = {}
171 db_vim["_admin"]["deployed"]["RO"] = None
172
173 step = "Creating vim at RO"
174 RO = ROclient.ROClient(self.loop, **self.ro_config)
175 vim_RO = deepcopy(vim_content)
176 vim_RO.pop("_id", None)
177 vim_RO.pop("_admin", None)
178 vim_RO.pop("schema_version", None)
179 vim_RO.pop("schema_type", None)
180 vim_RO.pop("vim_tenant_name", None)
181 vim_RO["type"] = vim_RO.pop("vim_type")
182 vim_RO.pop("vim_user", None)
183 vim_RO.pop("vim_password", None)
184 desc = await RO.create("vim", descriptor=vim_RO)
185 RO_vim_id = desc["uuid"]
186 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
187 self.update_db("vim_accounts", vim_id, db_vim)
188
189 step = "Attach vim to RO tenant"
190 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
191 "vim_username": vim_content["vim_user"],
192 "vim_password": vim_content["vim_password"],
193 "config": vim_content["config"]
194 }
195 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
196 db_vim["_admin"]["operationalState"] = "ENABLED"
197 self.update_db("vim_accounts", vim_id, db_vim)
198
199 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
200 return RO_vim_id
201
202 except (ROclient.ROClientException, DbException) as e:
203 self.logger.error(logging_text + "Exit Exception {}".format(e))
204 exc = e
205 except Exception as e:
206 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
207 exc = e
208 finally:
209 if exc and db_vim:
210 db_vim["_admin"]["operationalState"] = "ERROR"
211 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
212 self.update_db("vim_accounts", vim_id, db_vim)
213
214 async def vim_edit(self, vim_content, order_id):
215 vim_id = vim_content["_id"]
216 logging_text = "Task vim_edit={} ".format(vim_id)
217 self.logger.debug(logging_text + "Enter")
218 db_vim = None
219 exc = None
220 step = "Getting vim from db"
221 try:
222 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
223 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
224 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
225 step = "Editing vim at RO"
226 RO = ROclient.ROClient(self.loop, **self.ro_config)
227 vim_RO = deepcopy(vim_content)
228 vim_RO.pop("_id", None)
229 vim_RO.pop("_admin", None)
230 vim_RO.pop("schema_version", None)
231 vim_RO.pop("schema_type", None)
232 vim_RO.pop("vim_tenant_name", None)
233 vim_RO["type"] = vim_RO.pop("vim_type")
234 vim_RO.pop("vim_user", None)
235 vim_RO.pop("vim_password", None)
236 if vim_RO:
237 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
238
239 step = "Editing vim-account at RO tenant"
240 vim_RO = {}
241 for k in ("vim_tenant_name", "vim_password", "config"):
242 if k in vim_content:
243 vim_RO[k] = vim_content[k]
244 if "vim_user" in vim_content:
245 vim_content["vim_username"] = vim_content["vim_user"]
246 if vim_RO:
247 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
248 db_vim["_admin"]["operationalState"] = "ENABLED"
249 self.update_db("vim_accounts", vim_id, db_vim)
250
251 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
252 return RO_vim_id
253
254 except (ROclient.ROClientException, DbException) as e:
255 self.logger.error(logging_text + "Exit Exception {}".format(e))
256 exc = e
257 except Exception as e:
258 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
259 exc = e
260 finally:
261 if exc and db_vim:
262 db_vim["_admin"]["operationalState"] = "ERROR"
263 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
264 self.update_db("vim_accounts", vim_id, db_vim)
265
266 async def vim_delete(self, vim_id, order_id):
267 logging_text = "Task vim_delete={} ".format(vim_id)
268 self.logger.debug(logging_text + "Enter")
269 db_vim = None
270 exc = None
271 step = "Getting vim from db"
272 try:
273 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
274 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
275 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
276 RO = ROclient.ROClient(self.loop, **self.ro_config)
277 step = "Detaching vim from RO tenant"
278 try:
279 await RO.detach_datacenter(RO_vim_id)
280 except ROclient.ROClientException as e:
281 if e.http_code == 404: # not found
282 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
283 else:
284 raise
285
286 step = "Deleting vim from RO"
287 try:
288 await RO.delete("vim", RO_vim_id)
289 except ROclient.ROClientException as e:
290 if e.http_code == 404: # not found
291 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
292 else:
293 raise
294 else:
295 # nothing to delete
296 self.logger.error(logging_text + "Skipping. There is not RO information at database")
297 self.db.del_one("vim_accounts", {"_id": vim_id})
298 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
299 return None
300
301 except (ROclient.ROClientException, DbException) as e:
302 self.logger.error(logging_text + "Exit Exception {}".format(e))
303 exc = e
304 except Exception as e:
305 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
306 exc = e
307 finally:
308 if exc and db_vim:
309 db_vim["_admin"]["operationalState"] = "ERROR"
310 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
311 self.update_db("vim_accounts", vim_id, db_vim)
312
313 async def sdn_create(self, sdn_content, order_id):
314 sdn_id = sdn_content["_id"]
315 logging_text = "Task sdn_create={} ".format(sdn_id)
316 self.logger.debug(logging_text + "Enter")
317 db_sdn = None
318 exc = None
319 try:
320 step = "Getting sdn from db"
321 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
322 if "_admin" not in db_sdn:
323 db_sdn["_admin"] = {}
324 if "deployed" not in db_sdn["_admin"]:
325 db_sdn["_admin"]["deployed"] = {}
326 db_sdn["_admin"]["deployed"]["RO"] = None
327
328 step = "Creating sdn at RO"
329 RO = ROclient.ROClient(self.loop, **self.ro_config)
330 sdn_RO = deepcopy(sdn_content)
331 sdn_RO.pop("_id", None)
332 sdn_RO.pop("_admin", None)
333 sdn_RO.pop("schema_version", None)
334 sdn_RO.pop("schema_type", None)
335 sdn_RO.pop("description", None)
336 desc = await RO.create("sdn", descriptor=sdn_RO)
337 RO_sdn_id = desc["uuid"]
338 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
339 db_sdn["_admin"]["operationalState"] = "ENABLED"
340 self.update_db("sdns", sdn_id, db_sdn)
341 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
342 return RO_sdn_id
343
344 except (ROclient.ROClientException, DbException) as e:
345 self.logger.error(logging_text + "Exit Exception {}".format(e))
346 exc = e
347 except Exception as e:
348 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
349 exc = e
350 finally:
351 if exc and db_sdn:
352 db_sdn["_admin"]["operationalState"] = "ERROR"
353 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
354 self.update_db("sdns", sdn_id, db_sdn)
355
356 async def sdn_edit(self, sdn_content, order_id):
357 sdn_id = sdn_content["_id"]
358 logging_text = "Task sdn_edit={} ".format(sdn_id)
359 self.logger.debug(logging_text + "Enter")
360 db_sdn = None
361 exc = None
362 step = "Getting sdn from db"
363 try:
364 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
365 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
366 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
367 RO = ROclient.ROClient(self.loop, **self.ro_config)
368 step = "Editing sdn at RO"
369 sdn_RO = deepcopy(sdn_content)
370 sdn_RO.pop("_id", None)
371 sdn_RO.pop("_admin", None)
372 sdn_RO.pop("schema_version", None)
373 sdn_RO.pop("schema_type", None)
374 sdn_RO.pop("description", None)
375 if sdn_RO:
376 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
377 db_sdn["_admin"]["operationalState"] = "ENABLED"
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
381 return RO_sdn_id
382
383 except (ROclient.ROClientException, DbException) as e:
384 self.logger.error(logging_text + "Exit Exception {}".format(e))
385 exc = e
386 except Exception as e:
387 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
388 exc = e
389 finally:
390 if exc and db_sdn:
391 db_sdn["_admin"]["operationalState"] = "ERROR"
392 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
393 self.update_db("sdns", sdn_id, db_sdn)
394
395 async def sdn_delete(self, sdn_id, order_id):
396 logging_text = "Task sdn_delete={} ".format(sdn_id)
397 self.logger.debug(logging_text + "Enter")
398 db_sdn = None
399 exc = None
400 step = "Getting sdn from db"
401 try:
402 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
403 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
404 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
405 RO = ROclient.ROClient(self.loop, **self.ro_config)
406 step = "Deleting sdn from RO"
407 try:
408 await RO.delete("sdn", RO_sdn_id)
409 except ROclient.ROClientException as e:
410 if e.http_code == 404: # not found
411 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
412 else:
413 raise
414 else:
415 # nothing to delete
416 self.logger.error(logging_text + "Skipping. There is not RO information at database")
417 self.db.del_one("sdns", {"_id": sdn_id})
418 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
419 return None
420
421 except (ROclient.ROClientException, DbException) as e:
422 self.logger.error(logging_text + "Exit Exception {}".format(e))
423 exc = e
424 except Exception as e:
425 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
426 exc = e
427 finally:
428 if exc and db_sdn:
429 db_sdn["_admin"]["operationalState"] = "ERROR"
430 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
431 self.update_db("sdns", sdn_id, db_sdn)
432
433 def vnfd2RO(self, vnfd, new_id=None):
434 """
435 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
436 :param vnfd: input vnfd
437 :param new_id: overrides vnf id if provided
438 :return: copy of vnfd
439 """
440 ci_file = None
441 try:
442 vnfd_RO = deepcopy(vnfd)
443 vnfd_RO.pop("_id", None)
444 vnfd_RO.pop("_admin", None)
445 if new_id:
446 vnfd_RO["id"] = new_id
447 for vdu in vnfd_RO["vdu"]:
448 if "cloud-init-file" in vdu:
449 base_folder = vnfd["_admin"]["storage"]
450 clout_init_file = "{}/{}/cloud_init/{}".format(
451 base_folder["folder"],
452 base_folder["pkg-dir"],
453 vdu["cloud-init-file"]
454 )
455 ci_file = self.fs.file_open(clout_init_file, "r")
456 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
457 clout_init_content = ci_file.read()
458 ci_file.close()
459 ci_file = None
460 vdu.pop("cloud-init-file", None)
461 vdu["cloud-init"] = clout_init_content
462 return vnfd_RO
463 except FsException as e:
464 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
465 finally:
466 if ci_file:
467 ci_file.close()
468
469 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, vnf_member_index, task=None):
470 """
471 Callback both for charm status change and task completion
472 :param model_name: Charm model name
473 :param application_name: Charm application name
474 :param status: Can be
475 - blocked: The unit needs manual intervention
476 - maintenance: The unit is actively deploying/configuring
477 - waiting: The unit is waiting for another charm to be ready
478 - active: The unit is deployed, configured, and ready
479 - error: The charm has failed and needs attention.
480 - terminated: The charm has been destroyed
481 - removing,
482 - removed
483 :param message: detailed message error
484 :param db_nsr: nsr database content
485 :param db_nslcmop: nslcmop database content
486 :param vnf_member_index: NSD vnf-member-index
487 :param task: None for charm status change, or task for completion task callback
488 :return:
489 """
490 nsr_id = None
491 nslcmop_id = None
492 update_nsr = update_nslcmop = False
493 try:
494 nsr_id = db_nsr["_id"]
495 nslcmop_id = db_nslcmop["_id"]
496 nsr_lcm = db_nsr["_admin"]["deployed"]
497 ns_action = db_nslcmop["lcmOperationType"]
498 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
499 vnf_member_index)
500
501 if task:
502 if task.cancelled():
503 self.logger.debug(logging_text + " task Cancelled")
504 # TODO update db_nslcmop
505 return
506
507 if task.done():
508 exc = task.exception()
509 if exc:
510 self.logger.error(logging_text + " task Exception={}".format(exc))
511 if ns_action in ("instantiate", "terminate"):
512 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
513 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
514 elif ns_action == "action":
515 db_nslcmop["operationState"] = "FAILED"
516 db_nslcmop["detailed-status"] = str(exc)
517 db_nslcmop["statusEnteredTime"] = time()
518 update_nslcmop = True
519 return
520
521 else:
522 self.logger.debug(logging_text + " task Done")
523 # TODO revise with Adam if action is finished and ok when task is done
524 if ns_action == "action":
525 db_nslcmop["operationState"] = "COMPLETED"
526 db_nslcmop["detailed-status"] = "Done"
527 db_nslcmop["statusEnteredTime"] = time()
528 update_nslcmop = True
529 # task is Done, but callback is still ongoing. So ignore
530 return
531 elif status:
532 self.logger.debug(logging_text + " Enter status={}".format(status))
533 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == status:
534 return # same status, ignore
535 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = status
536 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(message)
537 else:
538 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
539 return
540
541 all_active = True
542 status_map = {}
543 n2vc_error_text = [] # contain text error list. If empty no one is in error status
544 for vnf_index, vca_info in nsr_lcm["VCA"].items():
545 vca_status = vca_info["operational-status"]
546 if vca_status not in status_map:
547 # Initialize it
548 status_map[vca_status] = 0
549 status_map[vca_status] += 1
550
551 if vca_status != "active":
552 all_active = False
553 elif vca_status in ("error", "blocked"):
554 n2vc_error_text.append("member_vnf_index={} {}: {}".format(vnf_member_index, vca_status,
555 vca_info["detailed-status"]))
556
557 if all_active:
558 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
559 db_nsr["config-status"] = "configured"
560 db_nsr["detailed-status"] = "done"
561 db_nslcmop["operationState"] = "COMPLETED"
562 db_nslcmop["detailed-status"] = "Done"
563 db_nslcmop["statusEnteredTime"] = time()
564 elif n2vc_error_text:
565 db_nsr["config-status"] = "failed"
566 error_text = "fail configuring " + ";".join(n2vc_error_text)
567 db_nsr["detailed-status"] = error_text
568 db_nslcmop["operationState"] = "FAILED_TEMP"
569 db_nslcmop["detailed-status"] = error_text
570 db_nslcmop["statusEnteredTime"] = time()
571 else:
572 cs = "configuring: "
573 separator = ""
574 for status, num in status_map.items():
575 cs += separator + "{}: {}".format(status, num)
576 separator = ", "
577 db_nsr["config-status"] = cs
578 db_nsr["detailed-status"] = cs
579 db_nslcmop["detailed-status"] = cs
580 update_nsr = update_nslcmop = True
581
582 except Exception as e:
583 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
584 finally:
585 try:
586 if update_nslcmop:
587 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
588 if update_nsr:
589 self.update_db("nsrs", nsr_id, db_nsr)
590 except Exception as e:
591 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
592 vnf_member_index, e), exc_info=True)
593
594 def ns_params_2_RO(self, ns_params):
595 """
596 Creates a RO ns descriptor from OSM ns_instantite params
597 :param ns_params: OSM instantiate params
598 :return: The RO ns descriptor
599 """
600 vim_2_RO = {}
601 def vim_account_2_RO(vim_account):
602 if vim_account in vim_2_RO:
603 return vim_2_RO[vim_account]
604 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
605 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
606 # #TODO check if VIM is creating and wait
607 if db_vim["_admin"]["operationalState"] != "ENABLED":
608 raise LcmException("VIM={} is not available. operationalState={}".format(
609 vim_account, db_vim["_admin"]["operationalState"]))
610 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
611 vim_2_RO[vim_account] = RO_vim_id
612 return RO_vim_id
613
614 if not ns_params:
615 return None
616 RO_ns_params = {
617 # "name": ns_params["nsName"],
618 # "description": ns_params.get("nsDescription"),
619 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
620 # "scenario": ns_params["nsdId"],
621 "vnfs": {},
622 "networks": {},
623 }
624 if ns_params.get("ssh-authorized-key"):
625 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
626 if ns_params.get("vnf"):
627 for vnf in ns_params["vnf"]:
628 RO_vnf = {}
629 if "vimAccountId" in vnf:
630 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
631 if RO_vnf:
632 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
633 if ns_params.get("vld"):
634 for vld in ns_params["vld"]:
635 RO_vld = {}
636 if "ip-profile" in vld:
637 RO_vld["ip-profile"] = vld["ip-profile"]
638 if "vim-network-name" in vld:
639 RO_vld["sites"] = []
640 if isinstance(vld["vim-network-name"], dict):
641 for vim_account, vim_net in vld["vim-network-name"].items():
642 RO_vld["sites"].append({
643 "netmap-use": vim_net,
644 "datacenter": vim_account_2_RO(vim_account)
645 })
646 else: #isinstance str
647 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
648 if RO_vld:
649 RO_ns_params["networks"][vld["name"]] = RO_vld
650 return RO_ns_params
651
652 async def ns_instantiate(self, nsr_id, nslcmop_id):
653 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
654 self.logger.debug(logging_text + "Enter")
655 # get all needed from database
656 db_nsr = None
657 db_nslcmop = None
658 db_vnfr = {}
659 exc = None
660 step = "Getting nsr, nslcmop, RO_vims from db"
661 try:
662 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
663 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
664 nsd = db_nsr["nsd"]
665 nsr_name = db_nsr["name"] # TODO short-name??
666 needed_vnfd = {}
667 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
668 for c_vnf in nsd["constituent-vnfd"]:
669 vnfd_id = c_vnf["vnfd-id-ref"]
670 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
671 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
672 if vnfd_id not in needed_vnfd:
673 step = "Getting vnfd={} from db".format(vnfd_id)
674 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
675
676 nsr_lcm = db_nsr["_admin"].get("deployed")
677 if not nsr_lcm:
678 nsr_lcm = db_nsr["_admin"]["deployed"] = {
679 "id": nsr_id,
680 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
681 "nsr_ip": {},
682 "VCA": {},
683 }
684 db_nsr["detailed-status"] = "creating"
685 db_nsr["operational-status"] = "init"
686
687 RO = ROclient.ROClient(self.loop, **self.ro_config)
688
689 # get vnfds, instantiate at RO
690 for vnfd_id, vnfd in needed_vnfd.items():
691 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
692 self.logger.debug(logging_text + step)
693 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
694
695 # look if present
696 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
697 if vnfd_list:
698 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
699 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
700 vnfd_id, vnfd_list[0]["uuid"]))
701 else:
702 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
703 desc = await RO.create("vnfd", descriptor=vnfd_RO)
704 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
705 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
706 self.update_db("nsrs", nsr_id, db_nsr)
707
708 # create nsd at RO
709 nsd_id = nsd["id"]
710 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
711 self.logger.debug(logging_text + step)
712
713 nsd_id_RO = nsd_id + "." + nsd_id[:200]
714 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
715 if nsd_list:
716 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
717 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
718 nsd_id, nsd_list[0]["uuid"]))
719 else:
720 nsd_RO = deepcopy(nsd)
721 nsd_RO["id"] = nsd_id_RO
722 nsd_RO.pop("_id", None)
723 nsd_RO.pop("_admin", None)
724 for c_vnf in nsd_RO["constituent-vnfd"]:
725 vnfd_id = c_vnf["vnfd-id-ref"]
726 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
727 desc = await RO.create("nsd", descriptor=nsd_RO)
728 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
729 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
730 self.update_db("nsrs", nsr_id, db_nsr)
731
732 # Crate ns at RO
733 # if present use it unless in error status
734 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
735 if RO_nsr_id:
736 try:
737 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
738 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
739 desc = await RO.show("ns", RO_nsr_id)
740 except ROclient.ROClientException as e:
741 if e.http_code != HTTPStatus.NOT_FOUND:
742 raise
743 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
744 if RO_nsr_id:
745 ns_status, ns_status_info = RO.check_ns_status(desc)
746 nsr_lcm["RO"]["nsr_status"] = ns_status
747 if ns_status == "ERROR":
748 step = db_nsr["detailed-status"] = "Deleting ns at RO"
749 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
750 await RO.delete("ns", RO_nsr_id)
751 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
752 if not RO_nsr_id:
753 step = db_nsr["detailed-status"] = "Creating ns at RO"
754 self.logger.debug(logging_text + step)
755 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
756 desc = await RO.create("ns", descriptor=RO_ns_params,
757 name=db_nsr["name"],
758 scenario=nsr_lcm["RO"]["nsd_id"])
759 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
760 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
761 nsr_lcm["RO"]["nsr_status"] = "BUILD"
762
763 self.update_db("nsrs", nsr_id, db_nsr)
764 # update VNFR vimAccount
765 step = "Updating VNFR vimAcccount"
766 for vnf_index, vnfr in db_vnfr.items():
767 if vnfr.get("vim-account-id"):
768 continue
769 if db_nsr["instantiate_params"].get("vnf") and db_nsr["instantiate_params"]["vnf"].get(vnf_index) \
770 and db_nsr["instantiate_params"]["vnf"][vnf_index].get("vimAccountId"):
771 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vnf"][vnf_index]["vimAccountId"]
772 else:
773 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
774 self.update_db("vnfrs", vnfr["_id"], vnfr)
775
776 # wait until NS is ready
777 step = ns_status_detailed = "Waiting ns ready at RO"
778 db_nsr["detailed-status"] = ns_status_detailed
779 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
780 deployment_timeout = 2*3600 # Two hours
781 while deployment_timeout > 0:
782 desc = await RO.show("ns", RO_nsr_id)
783 ns_status, ns_status_info = RO.check_ns_status(desc)
784 nsr_lcm["RO"]["nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 raise ROclient.ROClientException(ns_status_info)
787 elif ns_status == "BUILD":
788 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
789 self.update_db("nsrs", nsr_id, db_nsr)
790 elif ns_status == "ACTIVE":
791 step = "Getting ns VIM information"
792 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
793 break
794 else:
795 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
796
797 await asyncio.sleep(5, loop=self.loop)
798 deployment_timeout -= 5
799 if deployment_timeout <= 0:
800 raise ROclient.ROClientException("Timeout waiting ns to be ready")
801 step = "Updating VNFRs"
802 for vnf_index, vnfr_deployed in ns_RO_info.items():
803 vnfr = db_vnfr[vnf_index]
804 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
805 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
806 for vdur in vnfr["vdur"]:
807 if vdur["vdu-id-ref"] == vdu_id:
808 vdur["vim-id"] = vdu_deployed.get("vim_id")
809 vdur["ip-address"] = vdu_deployed.get("ip_address")
810 break
811 self.update_db("vnfrs", vnfr["_id"], vnfr)
812
813 db_nsr["detailed-status"] = "Configuring vnfr"
814 self.update_db("nsrs", nsr_id, db_nsr)
815
816 # The parameters we'll need to deploy a charm
817 number_to_configure = 0
818
819 def deploy():
820 """An inner function to deploy the charm from either vnf or vdu
821 """
822
823 # Login to the VCA.
824 # if number_to_configure == 0:
825 # self.logger.debug("Logging into N2VC...")
826 # task = asyncio.ensure_future(self.n2vc.login())
827 # yield from asyncio.wait_for(task, 30.0)
828 # self.logger.debug("Logged into N2VC!")
829
830 ## await self.n2vc.login()
831
832 # Note: The charm needs to exist on disk at the location
833 # specified by charm_path.
834 base_folder = vnfd["_admin"]["storage"]
835 storage_params = self.fs.get_params()
836 charm_path = "{}{}/{}/charms/{}".format(
837 storage_params["path"],
838 base_folder["folder"],
839 base_folder["pkg-dir"],
840 proxy_charm
841 )
842
843 # Setup the runtime parameters for this VNF
844 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
845
846 # ns_name will be ignored in the current version of N2VC
847 # but will be implemented for the next point release.
848 model_name = 'default'
849 application_name = self.n2vc.FormatApplicationName(
850 nsr_name,
851 vnf_index,
852 vnfd['name'],
853 )
854
855 nsr_lcm["VCA"][vnf_index] = {
856 "model": model_name,
857 "application": application_name,
858 "operational-status": "init",
859 "detailed-status": "",
860 "vnfd_id": vnfd_id,
861 }
862
863 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
864 task = asyncio.ensure_future(
865 self.n2vc.DeployCharms(
866 model_name, # The network service name
867 application_name, # The application name
868 vnfd, # The vnf descriptor
869 charm_path, # Path to charm
870 params, # Runtime params, like mgmt ip
871 {}, # for native charms only
872 self.n2vc_callback, # Callback for status changes
873 db_nsr, # Callback parameter
874 db_nslcmop,
875 vnf_index, # Callback parameter
876 None, # Callback parameter (task)
877 )
878 )
879 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
880 db_nsr, db_nslcmop, vnf_index))
881 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
882
883 # TODO: Make this call inside deploy()
884 # Login to the VCA. If there are multiple calls to login(),
885 # subsequent calls will be a nop and return immediately.
886 await self.n2vc.login()
887
888 step = "Looking for needed vnfd to configure"
889 self.logger.debug(logging_text + step)
890 for c_vnf in nsd["constituent-vnfd"]:
891 vnfd_id = c_vnf["vnfd-id-ref"]
892 vnf_index = str(c_vnf["member-vnf-index"])
893 vnfd = needed_vnfd[vnfd_id]
894
895 # Check if this VNF has a charm configuration
896 vnf_config = vnfd.get("vnf-configuration")
897
898 if vnf_config and vnf_config.get("juju"):
899 proxy_charm = vnf_config["juju"]["charm"]
900 params = {}
901
902 if proxy_charm:
903 if 'initial-config-primitive' in vnf_config:
904 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
905
906 deploy()
907 number_to_configure += 1
908
909 # Deploy charms for each VDU that supports one.
910 for vdu in vnfd['vdu']:
911 vdu_config = vdu.get('vdu-configuration')
912 proxy_charm = None
913 params = {}
914
915 if vdu_config and vdu_config.get("juju"):
916 proxy_charm = vdu_config["juju"]["charm"]
917
918 if 'initial-config-primitive' in vdu_config:
919 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
920
921 if proxy_charm:
922 deploy()
923 number_to_configure += 1
924
925 if number_to_configure:
926 db_nsr["config-status"] = "configuring"
927 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
928 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
929 else:
930 db_nslcmop["operationState"] = "COMPLETED"
931 db_nslcmop["detailed-status"] = "done"
932 db_nsr["config-status"] = "configured"
933 db_nsr["detailed-status"] = "done"
934 db_nsr["operational-status"] = "running"
935 self.update_db("nsrs", nsr_id, db_nsr)
936 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
937 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
938 return nsr_lcm
939
940 except (ROclient.ROClientException, DbException, LcmException) as e:
941 self.logger.error(logging_text + "Exit Exception {}".format(e))
942 exc = e
943 except Exception as e:
944 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
945 exc = e
946 finally:
947 if exc:
948 if db_nsr:
949 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
950 db_nsr["operational-status"] = "failed"
951 self.update_db("nsrs", nsr_id, db_nsr)
952 if db_nslcmop:
953 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
954 db_nslcmop["operationState"] = "FAILED"
955 db_nslcmop["statusEnteredTime"] = time()
956 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
957
958 async def ns_terminate(self, nsr_id, nslcmop_id):
959 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
960 self.logger.debug(logging_text + "Enter")
961 db_nsr = None
962 db_nslcmop = None
963 exc = None
964 step = "Getting nsr, nslcmop from db"
965 failed_detail = [] # annotates all failed error messages
966 vca_task_list = []
967 vca_task_dict = {}
968 try:
969 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
970 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
971 # nsd = db_nsr["nsd"]
972 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
973 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
974 return
975 # TODO ALF remove
976 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
977 # #TODO check if VIM is creating and wait
978 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
979
980 db_nsr_update = {
981 "operational-status": "terminating",
982 "config-status": "terminating",
983 "detailed-status": "Deleting charms",
984 }
985 self.update_db_2("nsrs", nsr_id, db_nsr_update)
986
987 try:
988 self.logger.debug(logging_text + step)
989 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
990 if deploy_info and deploy_info.get("application"):
991 task = asyncio.ensure_future(
992 self.n2vc.RemoveCharms(
993 deploy_info['model'],
994 deploy_info['application'],
995 # self.n2vc_callback,
996 # db_nsr,
997 # db_nslcmop,
998 # vnf_index,
999 )
1000 )
1001 vca_task_list.append(task)
1002 vca_task_dict[vnf_index] = task
1003 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1004 # deploy_info['application'], None, db_nsr,
1005 # db_nslcmop, vnf_index))
1006 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1007 except Exception as e:
1008 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1009 # remove from RO
1010
1011 RO = ROclient.ROClient(self.loop, **self.ro_config)
1012 # Delete ns
1013 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1014 if RO_nsr_id:
1015 try:
1016 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1017 self.logger.debug(logging_text + step)
1018 desc = await RO.delete("ns", RO_nsr_id)
1019 nsr_lcm["RO"]["nsr_id"] = None
1020 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1021 except ROclient.ROClientException as e:
1022 if e.http_code == 404: # not found
1023 nsr_lcm["RO"]["nsr_id"] = None
1024 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1025 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1026 elif e.http_code == 409: #conflict
1027 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1028 self.logger.debug(logging_text + failed_detail[-1])
1029 else:
1030 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1031 self.logger.error(logging_text + failed_detail[-1])
1032
1033 # Delete nsd
1034 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1035 if RO_nsd_id:
1036 try:
1037 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1038 desc = await RO.delete("nsd", RO_nsd_id)
1039 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1040 nsr_lcm["RO"]["nsd_id"] = None
1041 except ROclient.ROClientException as e:
1042 if e.http_code == 404: # not found
1043 nsr_lcm["RO"]["nsd_id"] = None
1044 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1045 elif e.http_code == 409: #conflict
1046 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1047 self.logger.debug(logging_text + failed_detail[-1])
1048 else:
1049 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1050 self.logger.error(logging_text + failed_detail[-1])
1051
1052 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1053 if not RO_vnfd_id:
1054 continue
1055 try:
1056 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1057 desc = await RO.delete("vnfd", RO_vnfd_id)
1058 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1059 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1060 except ROclient.ROClientException as e:
1061 if e.http_code == 404: # not found
1062 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1063 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1064 elif e.http_code == 409: #conflict
1065 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1066 self.logger.debug(logging_text + failed_detail[-1])
1067 else:
1068 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1069 self.logger.error(logging_text + failed_detail[-1])
1070
1071 if vca_task_list:
1072 await asyncio.wait(vca_task_list, timeout=300)
1073 for vnf_index, task in vca_task_dict.items():
1074 if task.cancelled():
1075 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1076 elif task.done():
1077 exc = task.exception()
1078 if exc:
1079 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1080 else:
1081 nsr_lcm["VCA"][vnf_index] = None
1082 else: # timeout
1083 # TODO Should it be cancelled?!!
1084 task.cancel()
1085 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1086
1087 if failed_detail:
1088 self.logger.error(logging_text + " ;".join(failed_detail))
1089 db_nsr_update = {
1090 "operational-status": "failed",
1091 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1092 "_admin": {"deployed": nsr_lcm, }
1093 }
1094 db_nslcmop_update = {
1095 "detailed-status": "; ".join(failed_detail),
1096 "operationState": "FAILED",
1097 "statusEnteredTime": time()
1098 }
1099 elif db_nslcmop["operationParams"].get("autoremove"):
1100 self.db.del_one("nsrs", {"_id": nsr_id})
1101 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1102 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1103 else:
1104 db_nsr_update = {
1105 "operational-status": "terminated",
1106 "detailed-status": "Done",
1107 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1108 }
1109 db_nslcmop_update = {
1110 "detailed-status": "Done",
1111 "operationState": "COMPLETED",
1112 "statusEnteredTime": time()
1113 }
1114 self.logger.debug(logging_text + "Exit")
1115
1116 except (ROclient.ROClientException, DbException) as e:
1117 self.logger.error(logging_text + "Exit Exception {}".format(e))
1118 exc = e
1119 except Exception as e:
1120 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1121 exc = e
1122 finally:
1123 if exc and db_nslcmop:
1124 db_nslcmop_update = {
1125 "detailed-status": "FAILED {}: {}".format(step, exc),
1126 "operationState": "FAILED",
1127 "statusEnteredTime": time(),
1128 }
1129 if db_nslcmop_update:
1130 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1131 if db_nsr_update:
1132 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1133
1134 async def ns_action(self, nsr_id, nslcmop_id):
1135 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1136 self.logger.debug(logging_text + "Enter")
1137 # get all needed from database
1138 db_nsr = None
1139 db_nslcmop = None
1140 db_nslcmop_update = None
1141 exc = None
1142 try:
1143 step = "Getting information from database"
1144 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1145 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1146 nsr_lcm = db_nsr["_admin"].get("deployed")
1147 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1148
1149 #TODO check if ns is in a proper status
1150 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1151 if not vca_deployed:
1152 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1153 model_name = vca_deployed.get("model")
1154 application_name = vca_deployed.get("application")
1155 if not model_name or not application_name:
1156 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1157 if vca_deployed["operational-status"] != "active":
1158 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1159 vnf_index, vca_deployed["operational-status"]))
1160 primitive = db_nslcmop["operationParams"]["primitive"]
1161 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1162 callback = None # self.n2vc_callback
1163 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1164 await self.n2vc.login()
1165 task = asyncio.ensure_future(
1166 self.n2vc.ExecutePrimitive(
1167 model_name,
1168 application_name,
1169 primitive, callback,
1170 *callback_args,
1171 **primitive_params
1172 )
1173 )
1174 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1175 # db_nsr, db_nslcmop, vnf_index))
1176 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1177 # wait until completed with timeout
1178 await asyncio.wait((task,), timeout=300)
1179
1180 result = "FAILED" # by default
1181 result_detail = ""
1182 if task.cancelled():
1183 db_nslcmop["detailed-status"] = "Task has been cancelled"
1184 elif task.done():
1185 exc = task.exception()
1186 if exc:
1187 result_detail = str(exc)
1188 else:
1189 self.logger.debug(logging_text + " task Done")
1190 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1191 result = "COMPLETED"
1192 result_detail = "Done"
1193 else: # timeout
1194 # TODO Should it be cancelled?!!
1195 task.cancel()
1196 result_detail = "timeout"
1197
1198 db_nslcmop_update = {
1199 "detailed-status": result_detail,
1200 "operationState": result,
1201 "statusEnteredTime": time()
1202 }
1203 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1204 return # database update is called inside finally
1205
1206 except (DbException, LcmException) as e:
1207 self.logger.error(logging_text + "Exit Exception {}".format(e))
1208 exc = e
1209 except Exception as e:
1210 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1211 exc = e
1212 finally:
1213 if exc and db_nslcmop:
1214 db_nslcmop_update = {
1215 "detailed-status": "FAILED {}: {}".format(step, exc),
1216 "operationState": "FAILED",
1217 "statusEnteredTime": time(),
1218 }
1219 if db_nslcmop_update:
1220 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1221
1222 async def test(self, param=None):
1223 self.logger.debug("Starting/Ending test task: {}".format(param))
1224
1225 def cancel_tasks(self, topic, _id):
1226 """
1227 Cancel all active tasks of a concrete nsr or vim identified for _id
1228 :param topic: can be ns or vim_account
1229 :param _id: nsr or vim identity
1230 :return: None, or raises an exception if not possible
1231 """
1232 if topic == "ns":
1233 lcm_tasks = self.lcm_ns_tasks
1234 elif topic== "vim_account":
1235 lcm_tasks = self.lcm_vim_tasks
1236 elif topic== "sdn":
1237 lcm_tasks = self.lcm_sdn_tasks
1238
1239 if not lcm_tasks.get(_id):
1240 return
1241 for order_id, tasks_set in lcm_tasks[_id].items():
1242 for task_name, task in tasks_set.items():
1243 result = task.cancel()
1244 if result:
1245 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1246 lcm_tasks[_id] = {}
1247
1248 async def kafka_ping(self):
1249 self.logger.debug("Task kafka_ping Enter")
1250 consecutive_errors = 0
1251 first_start = True
1252 kafka_has_received = False
1253 self.pings_not_received = 1
1254 while True:
1255 try:
1256 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1257 # time between pings are low when it is not received and at starting
1258 wait_time = 5 if not kafka_has_received else 120
1259 if not self.pings_not_received:
1260 kafka_has_received = True
1261 self.pings_not_received += 1
1262 await asyncio.sleep(wait_time, loop=self.loop)
1263 if self.pings_not_received > 10:
1264 raise LcmException("It is not receiving pings from Kafka bus")
1265 consecutive_errors = 0
1266 first_start = False
1267 except LcmException:
1268 raise
1269 except Exception as e:
1270 # if not first_start is the first time after starting. So leave more time and wait
1271 # to allow kafka starts
1272 if consecutive_errors == 8 if not first_start else 30:
1273 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1274 raise
1275 consecutive_errors += 1
1276 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1277 wait_time = 1 if not first_start else 5
1278 await asyncio.sleep(wait_time, loop=self.loop)
1279
1280 async def kafka_read(self):
1281 self.logger.debug("Task kafka_read Enter")
1282 order_id = 1
1283 # future = asyncio.Future()
1284 consecutive_errors = 0
1285 first_start = True
1286 while consecutive_errors < 10:
1287 try:
1288 topics = ("admin", "ns", "vim_account", "sdn")
1289 topic, command, params = await self.msg.aioread(topics, self.loop)
1290 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1291 consecutive_errors = 0
1292 first_start = False
1293 order_id += 1
1294 if command == "exit":
1295 print("Bye!")
1296 break
1297 elif command.startswith("#"):
1298 continue
1299 elif command == "echo":
1300 # just for test
1301 print(params)
1302 sys.stdout.flush()
1303 continue
1304 elif command == "test":
1305 asyncio.Task(self.test(params), loop=self.loop)
1306 continue
1307
1308 if topic == "admin":
1309 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1310 self.pings_not_received = 0
1311 continue
1312 elif topic == "ns":
1313 if command == "instantiate":
1314 # self.logger.debug("Deploying NS {}".format(nsr_id))
1315 nslcmop = params
1316 nslcmop_id = nslcmop["_id"]
1317 nsr_id = nslcmop["nsInstanceId"]
1318 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1319 if nsr_id not in self.lcm_ns_tasks:
1320 self.lcm_ns_tasks[nsr_id] = {}
1321 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1322 continue
1323 elif command == "terminate":
1324 # self.logger.debug("Deleting NS {}".format(nsr_id))
1325 nslcmop = params
1326 nslcmop_id = nslcmop["_id"]
1327 nsr_id = nslcmop["nsInstanceId"]
1328 self.cancel_tasks(topic, nsr_id)
1329 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1330 if nsr_id not in self.lcm_ns_tasks:
1331 self.lcm_ns_tasks[nsr_id] = {}
1332 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1333 continue
1334 elif command == "action":
1335 # self.logger.debug("Update NS {}".format(nsr_id))
1336 nslcmop = params
1337 nslcmop_id = nslcmop["_id"]
1338 nsr_id = nslcmop["nsInstanceId"]
1339 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1340 if nsr_id not in self.lcm_ns_tasks:
1341 self.lcm_ns_tasks[nsr_id] = {}
1342 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1343 continue
1344 elif command == "show":
1345 try:
1346 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1347 print(
1348 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1349 "{}\n deploy: {}\n tasks: {}".format(
1350 nsr_id, db_nsr["operational-status"],
1351 db_nsr["config-status"], db_nsr["detailed-status"],
1352 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1353 except Exception as e:
1354 print("nsr {} not found: {}".format(nsr_id, e))
1355 sys.stdout.flush()
1356 continue
1357 elif command == "deleted":
1358 continue # TODO cleaning of task just in case should be done
1359 elif topic == "vim_account":
1360 vim_id = params["_id"]
1361 if command == "create":
1362 task = asyncio.ensure_future(self.vim_create(params, order_id))
1363 if vim_id not in self.lcm_vim_tasks:
1364 self.lcm_vim_tasks[vim_id] = {}
1365 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1366 continue
1367 elif command == "delete":
1368 self.cancel_tasks(topic, vim_id)
1369 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1370 if vim_id not in self.lcm_vim_tasks:
1371 self.lcm_vim_tasks[vim_id] = {}
1372 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1373 continue
1374 elif command == "show":
1375 print("not implemented show with vim_account")
1376 sys.stdout.flush()
1377 continue
1378 elif command == "edit":
1379 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1380 if vim_id not in self.lcm_vim_tasks:
1381 self.lcm_vim_tasks[vim_id] = {}
1382 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1383 continue
1384 elif topic == "sdn":
1385 _sdn_id = params["_id"]
1386 if command == "create":
1387 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1388 if _sdn_id not in self.lcm_sdn_tasks:
1389 self.lcm_sdn_tasks[_sdn_id] = {}
1390 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1391 continue
1392 elif command == "delete":
1393 self.cancel_tasks(topic, _sdn_id)
1394 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1395 if _sdn_id not in self.lcm_sdn_tasks:
1396 self.lcm_sdn_tasks[_sdn_id] = {}
1397 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1398 continue
1399 elif command == "edit":
1400 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1401 if _sdn_id not in self.lcm_sdn_tasks:
1402 self.lcm_sdn_tasks[_sdn_id] = {}
1403 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1404 continue
1405 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1406 except Exception as e:
1407 # if not first_start is the first time after starting. So leave more time and wait
1408 # to allow kafka starts
1409 if consecutive_errors == 8 if not first_start else 30:
1410 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1411 raise
1412 consecutive_errors += 1
1413 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1414 wait_time = 2 if not first_start else 5
1415 await asyncio.sleep(wait_time, loop=self.loop)
1416
1417 # self.logger.debug("Task kafka_read terminating")
1418 self.logger.debug("Task kafka_read exit")
1419
1420 def start(self):
1421 self.loop = asyncio.get_event_loop()
1422 self.loop.run_until_complete(asyncio.gather(
1423 self.kafka_read(),
1424 self.kafka_ping()
1425 ))
1426 # TODO
1427 # self.logger.debug("Terminating cancelling creation tasks")
1428 # self.cancel_tasks("ALL", "create")
1429 # timeout = 200
1430 # while self.is_pending_tasks():
1431 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1432 # await asyncio.sleep(2, loop=self.loop)
1433 # timeout -= 2
1434 # if not timeout:
1435 # self.cancel_tasks("ALL", "ALL")
1436 self.loop.close()
1437 self.loop = None
1438 if self.db:
1439 self.db.db_disconnect()
1440 if self.msg:
1441 self.msg.disconnect()
1442 if self.fs:
1443 self.fs.fs_disconnect()
1444
1445
1446 def read_config_file(self, config_file):
1447 # TODO make a [ini] + yaml inside parser
1448 # the configparser library is not suitable, because it does not admit comments at the end of line,
1449 # and not parse integer or boolean
1450 try:
1451 with open(config_file) as f:
1452 conf = yaml.load(f)
1453 for k, v in environ.items():
1454 if not k.startswith("OSMLCM_"):
1455 continue
1456 k_items = k.lower().split("_")
1457 c = conf
1458 try:
1459 for k_item in k_items[1:-1]:
1460 if k_item in ("ro", "vca"):
1461 # put in capital letter
1462 k_item = k_item.upper()
1463 c = c[k_item]
1464 if k_items[-1] == "port":
1465 c[k_items[-1]] = int(v)
1466 else:
1467 c[k_items[-1]] = v
1468 except Exception as e:
1469 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1470
1471 return conf
1472 except Exception as e:
1473 self.logger.critical("At config file '{}': {}".format(config_file, e))
1474 exit(1)
1475
1476
1477 if __name__ == '__main__':
1478
1479 config_file = "lcm.cfg"
1480 lcm = Lcm(config_file)
1481
1482 lcm.start()