FIX bug 576
[osm/LCM.git] / osm_lcm / ns.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import logging
7 import logging.handlers
8 import functools
9 import traceback
10
11 import ROclient
12 from lcm_utils import LcmException, LcmBase
13
14 from osm_common.dbbase import DbException
15 from osm_common.fsbase import FsException
16 from n2vc.vnf import N2VC
17
18 from copy import copy, deepcopy
19 from http import HTTPStatus
20 from time import time
21 from uuid import uuid4
22
23 __author__ = "Alfonso Tierno"
24
25
26 def get_iterable(in_dict, in_key):
27 """
28 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
29 :param in_dict: a dictionary
30 :param in_key: the key to look for at in_dict
31 :return: in_dict[in_var] or () if it is None or not present
32 """
33 if not in_dict.get(in_key):
34 return ()
35 return in_dict[in_key]
36
37
38 def populate_dict(target_dict, key_list, value):
39 """
40 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
41 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
42 :param target_dict: dictionary to be changed
43 :param key_list: list of keys to insert at target_dict
44 :param value:
45 :return: None
46 """
47 for key in key_list[0:-1]:
48 if key not in target_dict:
49 target_dict[key] = {}
50 target_dict = target_dict[key]
51 target_dict[key_list[-1]] = value
52
53
54 class NsLcm(LcmBase):
55
56 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 # logging
63 self.logger = logging.getLogger('lcm.ns')
64 self.loop = loop
65 self.lcm_tasks = lcm_tasks
66
67 super().__init__(db, msg, fs, self.logger)
68
69 self.ro_config = ro_config
70
71 self.n2vc = N2VC(
72 log=self.logger,
73 server=vca_config['host'],
74 port=vca_config['port'],
75 user=vca_config['user'],
76 secret=vca_config['secret'],
77 # TODO: This should point to the base folder where charms are stored,
78 # if there is a common one (like object storage). Otherwise, leave
79 # it unset and pass it via DeployCharms
80 # artifacts=vca_config[''],
81 artifacts=None,
82 )
83
84 def _look_for_pdu(self, vdur, vnfr, vim_account): # TODO feature 1417: project_id
85 """
86 Look for a free PDU in the catalog matching vdur type and interfaces. Fills vdur with ip_address information
87 :param vdur: vnfr:vdur descriptor. It is modified with pdu interface info if pdu is found
88 :param member_vnf_index: used just for logging. Target vnfd of nsd
89 :param vim_account:
90 :return: vder_update: dictionary to update vnfr:vdur with pdu info. In addition it modified choosen pdu to set
91 at status IN_USE
92 """
93 pdu_type = vdur.get("pdu-type")
94 assert pdu_type
95 pdu_filter = {
96 "vim.vim_accounts": vim_account,
97 "type": pdu_type,
98 "_admin.operationalState": "ENABLED",
99 "_admin.usageSate": "NOT_IN_USE",
100 # TODO feature 1417: "shared": True,
101 # TODO feature 1417: "_admin.projects_read.cont": ["ANY", project_id],
102 }
103 available_pdus = self.db.get_list("pdus", pdu_filter)
104 for pdu in available_pdus:
105 # step 1 check if this pdu contains needed interfaces:
106 match_interfaces = True
107 for vdur_interface in vdur["interfaces"]:
108 for pdu_interface in pdu["interfaces"]:
109 if pdu_interface["name"] == vdur_interface["name"]:
110 # TODO feature 1417: match per mgmt type
111 break
112 else: # no interface found
113 match_interfaces = False
114 break
115 if not match_interfaces:
116 continue
117
118 # step 2. Update pdu
119 self.update_db_2("pdus", pdu["_id"], {"_admin.usageSate": "IN_USE",
120 "_admin.usage.vnfr_id": vnfr["_id"],
121 "_admin.usage.nsr_id": vnfr["nsr-id-ref"],
122 "_admin.usage.vdur": vdur["vdu-id-ref"],
123 })
124 # step 3. Fill vnfr info by filling vdur
125 vdur["pdu-id"] = pdu["_id"]
126 for vdur_interface in vdur["interfaces"]:
127 for pdu_interface in pdu["interfaces"]:
128 if pdu_interface["name"] == vdur_interface["name"]:
129 vdur_interface.update(pdu_interface)
130 break
131
132 return
133 raise LcmException("No PDU of type={} found for member_vng_index={} at vim_account={} matching interface "
134 "names".format(vdur["vdu-id-ref"], vnfr["member-vnf-index-ref"], pdu_type))
135
136 def vnfd2RO(self, vnfd, new_id=None):
137 """
138 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
139 :param vnfd: input vnfd
140 :param new_id: overrides vnf id if provided
141 :return: copy of vnfd
142 """
143 ci_file = None
144 try:
145 vnfd_RO = deepcopy(vnfd)
146 vnfd_RO.pop("_id", None)
147 vnfd_RO.pop("_admin", None)
148 if new_id:
149 vnfd_RO["id"] = new_id
150 for vdu in vnfd_RO["vdu"]:
151 if "cloud-init-file" in vdu:
152 base_folder = vnfd["_admin"]["storage"]
153 clout_init_file = "{}/{}/cloud_init/{}".format(
154 base_folder["folder"],
155 base_folder["pkg-dir"],
156 vdu["cloud-init-file"]
157 )
158 ci_file = self.fs.file_open(clout_init_file, "r")
159 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
160 # convert to base 64 or similar
161 clout_init_content = ci_file.read()
162 ci_file.close()
163 ci_file = None
164 vdu.pop("cloud-init-file", None)
165 vdu["cloud-init"] = clout_init_content
166 # remnove unused by RO configuration, monitoring, scaling
167 vnfd_RO.pop("vnf-configuration", None)
168 vnfd_RO.pop("monitoring-param", None)
169 vnfd_RO.pop("scaling-group-descriptor", None)
170 return vnfd_RO
171 except FsException as e:
172 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
173 finally:
174 if ci_file:
175 ci_file.close()
176
177 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
178 """
179 Callback both for charm status change and task completion
180 :param model_name: Charm model name
181 :param application_name: Charm application name
182 :param status: Can be
183 - blocked: The unit needs manual intervention
184 - maintenance: The unit is actively deploying/configuring
185 - waiting: The unit is waiting for another charm to be ready
186 - active: The unit is deployed, configured, and ready
187 - error: The charm has failed and needs attention.
188 - terminated: The charm has been destroyed
189 - removing,
190 - removed
191 :param message: detailed message error
192 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
193 nsr_id:
194 nslcmop_id:
195 lcmOperationType: currently "instantiate"
196 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
197 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
198 n2vc_event: event used to notify instantiation task that some change has been produced
199 :param task: None for charm status change, or task for completion task callback
200 :return:
201 """
202 try:
203 nsr_id = n2vc_info["nsr_id"]
204 deployed = n2vc_info["deployed"]
205 db_nsr_update = n2vc_info["db_update"]
206 nslcmop_id = n2vc_info["nslcmop_id"]
207 ns_operation = n2vc_info["lcmOperationType"]
208 n2vc_event = n2vc_info["n2vc_event"]
209 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
210 application_name)
211 vca_deployed = deployed.get(application_name)
212 if not vca_deployed:
213 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
214 return
215
216 if task:
217 if task.cancelled():
218 self.logger.debug(logging_text + " task Cancelled")
219 vca_deployed['operational-status'] = "error"
220 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
221 vca_deployed['detailed-status'] = "Task Cancelled"
222 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = "Task Cancelled"
223
224 elif task.done():
225 exc = task.exception()
226 if exc:
227 self.logger.error(logging_text + " task Exception={}".format(exc))
228 vca_deployed['operational-status'] = "error"
229 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
230 vca_deployed['detailed-status'] = str(exc)
231 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc)
232 else:
233 self.logger.debug(logging_text + " task Done")
234 # task is Done, but callback is still ongoing. So ignore
235 return
236 elif status:
237 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
238 if vca_deployed['operational-status'] == status:
239 return # same status, ignore
240 vca_deployed['operational-status'] = status
241 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = status
242 vca_deployed['detailed-status'] = str(message)
243 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(message)
244 else:
245 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
246 return
247 # wake up instantiate task
248 n2vc_event.set()
249 except Exception as e:
250 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
251
252 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
253 """
254 Creates a RO ns descriptor from OSM ns_instantiate params
255 :param ns_params: OSM instantiate params
256 :return: The RO ns descriptor
257 """
258 vim_2_RO = {}
259 # TODO feature 1417: Check that no instantiation is set over PDU
260 # check if PDU forces a concrete vim-network-id and add it
261 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
262
263 def vim_account_2_RO(vim_account):
264 if vim_account in vim_2_RO:
265 return vim_2_RO[vim_account]
266
267 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
268 if db_vim["_admin"]["operationalState"] != "ENABLED":
269 raise LcmException("VIM={} is not available. operationalState={}".format(
270 vim_account, db_vim["_admin"]["operationalState"]))
271 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
272 vim_2_RO[vim_account] = RO_vim_id
273 return RO_vim_id
274
275 def ip_profile_2_RO(ip_profile):
276 RO_ip_profile = deepcopy((ip_profile))
277 if "dns-server" in RO_ip_profile:
278 if isinstance(RO_ip_profile["dns-server"], list):
279 RO_ip_profile["dns-address"] = []
280 for ds in RO_ip_profile.pop("dns-server"):
281 RO_ip_profile["dns-address"].append(ds['address'])
282 else:
283 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
284 if RO_ip_profile.get("ip-version") == "ipv4":
285 RO_ip_profile["ip-version"] = "IPv4"
286 if RO_ip_profile.get("ip-version") == "ipv6":
287 RO_ip_profile["ip-version"] = "IPv6"
288 if "dhcp-params" in RO_ip_profile:
289 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
290 return RO_ip_profile
291
292 if not ns_params:
293 return None
294 RO_ns_params = {
295 # "name": ns_params["nsName"],
296 # "description": ns_params.get("nsDescription"),
297 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
298 # "scenario": ns_params["nsdId"],
299 }
300 if n2vc_key_list:
301 for vnfd_ref, vnfd in vnfd_dict.items():
302 vdu_needed_access = []
303 mgmt_cp = None
304 if vnfd.get("vnf-configuration"):
305 if vnfd.get("mgmt-interface"):
306 if vnfd["mgmt-interface"].get("vdu-id"):
307 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
308 elif vnfd["mgmt-interface"].get("cp"):
309 mgmt_cp = vnfd["mgmt-interface"]["cp"]
310
311 for vdu in vnfd.get("vdu"):
312 if vdu.get("vdu-configuration"):
313 vdu_needed_access.append(vdu["id"])
314 elif mgmt_cp:
315 for vdu_interface in vdu.get("interface"):
316 if vdu_interface.get("external-connection-point-ref") and \
317 vdu_interface["external-connection-point-ref"] == mgmt_cp:
318 vdu_needed_access.append(vdu["id"])
319 mgmt_cp = None
320 break
321
322 if vdu_needed_access:
323 for vnf_member in nsd.get("constituent-vnfd"):
324 if vnf_member["vnfd-id-ref"] != vnfd_ref:
325 continue
326 for vdu in vdu_needed_access:
327 populate_dict(RO_ns_params,
328 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
329 n2vc_key_list)
330
331 if ns_params.get("vduImage"):
332 RO_ns_params["vduImage"] = ns_params["vduImage"]
333
334 if ns_params.get("ssh_keys"):
335 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
336 for vnf_params in get_iterable(ns_params, "vnf"):
337 for constituent_vnfd in nsd["constituent-vnfd"]:
338 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
339 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
340 break
341 else:
342 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
343 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
344 if vnf_params.get("vimAccountId"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
346 vim_account_2_RO(vnf_params["vimAccountId"]))
347
348 for vdu_params in get_iterable(vnf_params, "vdu"):
349 # TODO feature 1417: check that this VDU exist and it is not a PDU
350 if vdu_params.get("volume"):
351 for volume_params in vdu_params["volume"]:
352 if volume_params.get("vim-volume-id"):
353 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
354 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
355 volume_params["vim-volume-id"])
356 if vdu_params.get("interface"):
357 for interface_params in vdu_params["interface"]:
358 if interface_params.get("ip-address"):
359 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
360 vdu_params["id"], "interfaces", interface_params["name"],
361 "ip_address"),
362 interface_params["ip-address"])
363 if interface_params.get("mac-address"):
364 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
365 vdu_params["id"], "interfaces", interface_params["name"],
366 "mac_address"),
367 interface_params["mac-address"])
368 if interface_params.get("floating-ip-required"):
369 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
370 vdu_params["id"], "interfaces", interface_params["name"],
371 "floating-ip"),
372 interface_params["floating-ip-required"])
373
374 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
375 if internal_vld_params.get("vim-network-name"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-name"),
378 internal_vld_params["vim-network-name"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383
384 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
385 # look for interface
386 iface_found = False
387 for vdu_descriptor in vnf_descriptor["vdu"]:
388 for vdu_interface in vdu_descriptor["interface"]:
389 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
390 if icp_params.get("ip-address"):
391 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
392 vdu_descriptor["id"], "interfaces",
393 vdu_interface["name"], "ip_address"),
394 icp_params["ip-address"])
395
396 if icp_params.get("mac-address"):
397 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
398 vdu_descriptor["id"], "interfaces",
399 vdu_interface["name"], "mac_address"),
400 icp_params["mac-address"])
401 iface_found = True
402 break
403 if iface_found:
404 break
405 else:
406 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
407 "internal-vld:id-ref={} is not present at vnfd:internal-"
408 "connection-point".format(vnf_params["member-vnf-index"],
409 icp_params["id-ref"]))
410
411 for vld_params in get_iterable(ns_params, "vld"):
412 if "ip-profile" in vld_params:
413 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
414 ip_profile_2_RO(vld_params["ip-profile"]))
415 if vld_params.get("vim-network-name"):
416 RO_vld_sites = []
417 if isinstance(vld_params["vim-network-name"], dict):
418 for vim_account, vim_net in vld_params["vim-network-name"].items():
419 RO_vld_sites.append({
420 "netmap-use": vim_net,
421 "datacenter": vim_account_2_RO(vim_account)
422 })
423 else: # isinstance str
424 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
425 if RO_vld_sites:
426 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
427 if "vnfd-connection-point-ref" in vld_params:
428 for cp_params in vld_params["vnfd-connection-point-ref"]:
429 # look for interface
430 for constituent_vnfd in nsd["constituent-vnfd"]:
431 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
432 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
433 break
434 else:
435 raise LcmException(
436 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
437 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
438 match_cp = False
439 for vdu_descriptor in vnf_descriptor["vdu"]:
440 for interface_descriptor in vdu_descriptor["interface"]:
441 if interface_descriptor.get("external-connection-point-ref") == \
442 cp_params["vnfd-connection-point-ref"]:
443 match_cp = True
444 break
445 if match_cp:
446 break
447 else:
448 raise LcmException(
449 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
450 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
451 cp_params["member-vnf-index-ref"],
452 cp_params["vnfd-connection-point-ref"],
453 vnf_descriptor["id"]))
454 if cp_params.get("ip-address"):
455 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 interface_descriptor["name"], "ip_address"),
458 cp_params["ip-address"])
459 if cp_params.get("mac-address"):
460 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
461 vdu_descriptor["id"], "interfaces",
462 interface_descriptor["name"], "mac_address"),
463 cp_params["mac-address"])
464 return RO_ns_params
465
466 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
467 # make a copy to do not change
468 vdu_create = copy(vdu_create)
469 vdu_delete = copy(vdu_delete)
470
471 vdurs = db_vnfr.get("vdur")
472 if vdurs is None:
473 vdurs = []
474 vdu_index = len(vdurs)
475 while vdu_index:
476 vdu_index -= 1
477 vdur = vdurs[vdu_index]
478 if vdur.get("pdu-type"):
479 continue
480 vdu_id_ref = vdur["vdu-id-ref"]
481 if vdu_create and vdu_create.get(vdu_id_ref):
482 for index in range(0, vdu_create[vdu_id_ref]):
483 vdur = deepcopy(vdur)
484 vdur["_id"] = str(uuid4())
485 vdur["count-index"] += 1
486 vdurs.insert(vdu_index+1+index, vdur)
487 del vdu_create[vdu_id_ref]
488 if vdu_delete and vdu_delete.get(vdu_id_ref):
489 del vdurs[vdu_index]
490 vdu_delete[vdu_id_ref] -= 1
491 if not vdu_delete[vdu_id_ref]:
492 del vdu_delete[vdu_id_ref]
493 # check all operations are done
494 if vdu_create or vdu_delete:
495 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
496 vdu_create))
497 if vdu_delete:
498 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
499 vdu_delete))
500
501 vnfr_update = {"vdur": vdurs}
502 db_vnfr["vdur"] = vdurs
503 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
504
505 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
506 """
507 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
508 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
509 :param nsr_desc_RO: nsr descriptor from RO
510 :return: Nothing, LcmException is raised on errors
511 """
512 for vnf_index, db_vnfr in db_vnfrs.items():
513 for vnf_RO in nsr_desc_RO["vnfs"]:
514 if vnf_RO["member_vnf_index"] != vnf_index:
515 continue
516 vnfr_update = {}
517 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO.get("ip_address")
518
519 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
520 vdur_RO_count_index = 0
521 if vdur.get("pdu-type"):
522 continue
523 for vdur_RO in get_iterable(vnf_RO, "vms"):
524 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
525 continue
526 if vdur["count-index"] != vdur_RO_count_index:
527 vdur_RO_count_index += 1
528 continue
529 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
530 vdur["ip-address"] = vdur_RO.get("ip_address")
531 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
532 vdur["name"] = vdur_RO.get("vim_name")
533 vdur["status"] = vdur_RO.get("status")
534 vdur["status-detailed"] = vdur_RO.get("error_msg")
535 for ifacer in get_iterable(vdur, "interfaces"):
536 for interface_RO in get_iterable(vdur_RO, "interfaces"):
537 if ifacer["name"] == interface_RO.get("internal_name"):
538 ifacer["ip-address"] = interface_RO.get("ip_address")
539 ifacer["mac-address"] = interface_RO.get("mac_address")
540 break
541 else:
542 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
543 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
544 vnfr_update["vdur.{}".format(vdu_index)] = vdur
545 break
546 else:
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
548 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
549 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
550 break
551
552 else:
553 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
554
555 async def instantiate(self, nsr_id, nslcmop_id):
556 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
557 self.logger.debug(logging_text + "Enter")
558 # get all needed from database
559 db_nsr = None
560 db_nslcmop = None
561 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
562 db_nslcmop_update = {}
563 nslcmop_operation_state = None
564 db_vnfrs = {}
565 RO_descriptor_number = 0 # number of descriptors created at RO
566 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
567 n2vc_info = {}
568 exc = None
569 try:
570 step = "Getting nslcmop={} from db".format(nslcmop_id)
571 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
572 step = "Getting nsr={} from db".format(nsr_id)
573 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
574 ns_params = db_nsr.get("instantiate_params")
575 nsd = db_nsr["nsd"]
576 nsr_name = db_nsr["name"] # TODO short-name??
577
578 # look if previous tasks in process
579 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
580 if task_dependency:
581 step = db_nslcmop_update["detailed-status"] = \
582 "Waiting for related tasks to be completed: {}".format(task_name)
583 self.logger.debug(logging_text + step)
584 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
585 _, pending = await asyncio.wait(task_dependency, timeout=3600)
586 if pending:
587 raise LcmException("Timeout waiting related tasks to be completed")
588
589 step = "Getting vnfrs from db"
590 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
591 db_vnfds_ref = {}
592 db_vnfds = {}
593 for vnfr in db_vnfrs_list:
594 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
595 vnfd_id = vnfr["vnfd-id"]
596 vnfd_ref = vnfr["vnfd-ref"]
597 if vnfd_id not in db_vnfds:
598 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
599 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
600 db_vnfds_ref[vnfd_ref] = vnfd
601 db_vnfds[vnfd_id] = vnfd
602
603 # update VNFR vimAccount and pdu information
604
605 vim_account = vnfr.get("vim-account-id")
606 vnfr_update = {}
607 if not vim_account:
608 step = "Updating VNFR vimAccount"
609 vim_account = db_nsr["instantiate_params"]["vimAccountId"]
610 # check instantiate parameters
611 if db_nsr["instantiate_params"].get("vnf"):
612 for vnf_params in db_nsr["instantiate_params"]["vnf"]:
613 if vnf_params.get("member-vnf-index") == vnfr["member-vnf-index-ref"]:
614 if vnf_params.get("vimAccountId"):
615 vim_account = vnf_params.get("vimAccountId")
616 break
617 vnfr_update["vim-account-id"] = vim_account
618 vnfr["vim-account-id"] = vim_account
619
620 # check PDUs and get PDU
621 for vdur_index, vdur in enumerate(get_iterable(vnfr, "vdur")):
622 pdu_type = vdur.get("pdu-type")
623 if not pdu_type:
624 continue
625 # look for free pdu
626 self._look_for_pdu(vdur, vnfr, vim_account)
627 # fill vnfr with pdu info
628 vnfr_update["vdur.{}".format(vdur_index)] = vdur
629 if vnfr_update:
630 self.update_db_2("vnfrs", vnfr["_id"], vnfr_update)
631
632 nsr_lcm = db_nsr["_admin"].get("deployed")
633 if not nsr_lcm:
634 nsr_lcm = db_nsr["_admin"]["deployed"] = {
635 "id": nsr_id,
636 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
637 "nsr_ip": {},
638 "VCA": {},
639 }
640 db_nsr_update["detailed-status"] = "creating"
641 db_nsr_update["operational-status"] = "init"
642
643 RO = ROclient.ROClient(self.loop, **self.ro_config)
644
645 # get vnfds, instantiate at RO
646 for vnfd_id, vnfd in db_vnfds.items():
647 vnfd_ref = vnfd["id"]
648 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
649 # self.logger.debug(logging_text + step)
650 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
651 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
652 RO_descriptor_number += 1
653
654 # look if present
655 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
656 if vnfd_list:
657 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
658 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
659 vnfd_ref, vnfd_list[0]["uuid"]))
660 else:
661 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
662 desc = await RO.create("vnfd", descriptor=vnfd_RO)
663 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
664 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
665 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
666 vnfd_ref, desc["uuid"]))
667 self.update_db_2("nsrs", nsr_id, db_nsr_update)
668
669 # create nsd at RO
670 nsd_ref = nsd["id"]
671 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
672 # self.logger.debug(logging_text + step)
673
674 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
675 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
676 RO_descriptor_number += 1
677 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
678 if nsd_list:
679 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
680 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
681 nsd_ref, RO_nsd_uuid))
682 else:
683 nsd_RO = deepcopy(nsd)
684 nsd_RO["id"] = RO_osm_nsd_id
685 nsd_RO.pop("_id", None)
686 nsd_RO.pop("_admin", None)
687 for c_vnf in nsd_RO["constituent-vnfd"]:
688 vnfd_id = c_vnf["vnfd-id-ref"]
689 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
690 desc = await RO.create("nsd", descriptor=nsd_RO)
691 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
692 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
693 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
694 self.update_db_2("nsrs", nsr_id, db_nsr_update)
695
696 # Crate ns at RO
697 # if present use it unless in error status
698 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
699 if RO_nsr_id:
700 try:
701 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
702 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
703 desc = await RO.show("ns", RO_nsr_id)
704 except ROclient.ROClientException as e:
705 if e.http_code != HTTPStatus.NOT_FOUND:
706 raise
707 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
708 if RO_nsr_id:
709 ns_status, ns_status_info = RO.check_ns_status(desc)
710 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
711 if ns_status == "ERROR":
712 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
713 self.logger.debug(logging_text + step)
714 await RO.delete("ns", RO_nsr_id)
715 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
716 if not RO_nsr_id:
717 step = db_nsr_update["detailed-status"] = "Checking dependencies"
718 # self.logger.debug(logging_text + step)
719
720 # check if VIM is creating and wait look if previous tasks in process
721 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
722 if task_dependency:
723 step = "Waiting for related tasks to be completed: {}".format(task_name)
724 self.logger.debug(logging_text + step)
725 await asyncio.wait(task_dependency, timeout=3600)
726 if ns_params.get("vnf"):
727 for vnf in ns_params["vnf"]:
728 if "vimAccountId" in vnf:
729 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
730 vnf["vimAccountId"])
731 if task_dependency:
732 step = "Waiting for related tasks to be completed: {}".format(task_name)
733 self.logger.debug(logging_text + step)
734 await asyncio.wait(task_dependency, timeout=3600)
735
736 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
737
738 # feature 1429. Add n2vc public key to needed VMs
739 n2vc_key = await self.n2vc.GetPublicKey()
740 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
741
742 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
743 desc = await RO.create("ns", descriptor=RO_ns_params,
744 name=db_nsr["name"],
745 scenario=RO_nsd_uuid)
746 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
747 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
748 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
749 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
750 self.update_db_2("nsrs", nsr_id, db_nsr_update)
751
752 # wait until NS is ready
753 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
754 detailed_status_old = None
755 self.logger.debug(logging_text + step)
756
757 deployment_timeout = 2 * 3600 # Two hours
758 while deployment_timeout > 0:
759 desc = await RO.show("ns", RO_nsr_id)
760 ns_status, ns_status_info = RO.check_ns_status(desc)
761 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
762 if ns_status == "ERROR":
763 raise ROclient.ROClientException(ns_status_info)
764 elif ns_status == "BUILD":
765 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
766 elif ns_status == "ACTIVE":
767 step = detailed_status = "Waiting for management IP address reported by the VIM"
768 try:
769 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
770 break
771 except ROclient.ROClientException as e:
772 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
773 raise e
774 else:
775 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
776 if detailed_status != detailed_status_old:
777 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
778 self.update_db_2("nsrs", nsr_id, db_nsr_update)
779 await asyncio.sleep(5, loop=self.loop)
780 deployment_timeout -= 5
781 if deployment_timeout <= 0:
782 raise ROclient.ROClientException("Timeout waiting ns to be ready")
783
784 step = "Updating VNFRs"
785 self.ns_update_vnfr(db_vnfrs, desc)
786
787 db_nsr["detailed-status"] = "Configuring vnfr"
788 self.update_db_2("nsrs", nsr_id, db_nsr_update)
789
790 # The parameters we'll need to deploy a charm
791 number_to_configure = 0
792
793 def deploy(vnf_index, vdu_id, mgmt_ip_address, n2vc_info, config_primitive=None):
794 """An inner function to deploy the charm from either vnf or vdu
795 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
796 """
797 if not mgmt_ip_address:
798 raise LcmException("vnfd/vdu has not management ip address to configure it")
799 # Login to the VCA.
800 # if number_to_configure == 0:
801 # self.logger.debug("Logging into N2VC...")
802 # task = asyncio.ensure_future(self.n2vc.login())
803 # yield from asyncio.wait_for(task, 30.0)
804 # self.logger.debug("Logged into N2VC!")
805
806 # # await self.n2vc.login()
807
808 # Note: The charm needs to exist on disk at the location
809 # specified by charm_path.
810 base_folder = vnfd["_admin"]["storage"]
811 storage_params = self.fs.get_params()
812 charm_path = "{}{}/{}/charms/{}".format(
813 storage_params["path"],
814 base_folder["folder"],
815 base_folder["pkg-dir"],
816 proxy_charm
817 )
818
819 # Setup the runtime parameters for this VNF
820 params = {'rw_mgmt_ip': mgmt_ip_address}
821 if config_primitive:
822 params["initial-config-primitive"] = config_primitive
823
824 # ns_name will be ignored in the current version of N2VC
825 # but will be implemented for the next point release.
826 model_name = 'default'
827 vdu_id_text = "vnfd"
828 if vdu_id:
829 vdu_id_text = vdu_id
830 application_name = self.n2vc.FormatApplicationName(
831 nsr_name,
832 vnf_index,
833 vdu_id_text
834 )
835 if not nsr_lcm.get("VCA"):
836 nsr_lcm["VCA"] = {}
837 nsr_lcm["VCA"][application_name] = db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = {
838 "member-vnf-index": vnf_index,
839 "vdu_id": vdu_id,
840 "model": model_name,
841 "application": application_name,
842 "operational-status": "init",
843 "detailed-status": "",
844 "vnfd_id": vnfd_id,
845 }
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
849 proxy_charm))
850 if not n2vc_info:
851 n2vc_info["nsr_id"] = nsr_id
852 n2vc_info["nslcmop_id"] = nslcmop_id
853 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
854 n2vc_info["lcmOperationType"] = "instantiate"
855 n2vc_info["deployed"] = nsr_lcm["VCA"]
856 n2vc_info["db_update"] = db_nsr_update
857 task = asyncio.ensure_future(
858 self.n2vc.DeployCharms(
859 model_name, # The network service name
860 application_name, # The application name
861 vnfd, # The vnf descriptor
862 charm_path, # Path to charm
863 params, # Runtime params, like mgmt ip
864 {}, # for native charms only
865 self.n2vc_callback, # Callback for status changes
866 n2vc_info, # Callback parameter
867 None, # Callback parameter (task)
868 )
869 )
870 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
871 n2vc_info))
872 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
873
874 step = "Looking for needed vnfd to configure"
875 self.logger.debug(logging_text + step)
876
877 for c_vnf in nsd["constituent-vnfd"]:
878 vnfd_id = c_vnf["vnfd-id-ref"]
879 vnf_index = str(c_vnf["member-vnf-index"])
880 vnfd = db_vnfds_ref[vnfd_id]
881
882 # Check if this VNF has a charm configuration
883 vnf_config = vnfd.get("vnf-configuration")
884
885 if vnf_config and vnf_config.get("juju"):
886 proxy_charm = vnf_config["juju"]["charm"]
887 config_primitive = None
888
889 if proxy_charm:
890 if 'initial-config-primitive' in vnf_config:
891 config_primitive = vnf_config['initial-config-primitive']
892
893 # Login to the VCA. If there are multiple calls to login(),
894 # subsequent calls will be a nop and return immediately.
895 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
896 await self.n2vc.login()
897 deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info, config_primitive)
898 number_to_configure += 1
899
900 # Deploy charms for each VDU that supports one.
901 vdu_index = 0
902 for vdu in vnfd['vdu']:
903 vdu_config = vdu.get('vdu-configuration')
904 proxy_charm = None
905 config_primitive = None
906
907 if vdu_config and vdu_config.get("juju"):
908 proxy_charm = vdu_config["juju"]["charm"]
909
910 if 'initial-config-primitive' in vdu_config:
911 config_primitive = vdu_config['initial-config-primitive']
912
913 if proxy_charm:
914 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
915 await self.n2vc.login()
916 deploy(vnf_index, vdu["id"], db_vnfrs[vnf_index]["vdur"][vdu_index]["ip-address"],
917 n2vc_info, config_primitive)
918 number_to_configure += 1
919 vdu_index += 1
920
921 db_nsr_update["operational-status"] = "running"
922 configuration_failed = False
923 if number_to_configure:
924 old_status = "configuring: init: {}".format(number_to_configure)
925 db_nsr_update["config-status"] = old_status
926 db_nsr_update["detailed-status"] = old_status
927 db_nslcmop_update["detailed-status"] = old_status
928
929 # wait until all are configured.
930 while True:
931 if db_nsr_update:
932 self.update_db_2("nsrs", nsr_id, db_nsr_update)
933 if db_nslcmop_update:
934 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
935 await n2vc_info["n2vc_event"].wait()
936 n2vc_info["n2vc_event"].clear()
937 all_active = True
938 status_map = {}
939 n2vc_error_text = [] # contain text error list. If empty no one is in error status
940 for _, vca_info in nsr_lcm["VCA"].items():
941 vca_status = vca_info["operational-status"]
942 if vca_status not in status_map:
943 # Initialize it
944 status_map[vca_status] = 0
945 status_map[vca_status] += 1
946
947 if vca_status != "active":
948 all_active = False
949 if vca_status in ("error", "blocked"):
950 n2vc_error_text.append(
951 "member_vnf_index={} vdu_id={} {}: {}".format(vca_info["member-vnf-index"],
952 vca_info["vdu_id"], vca_status,
953 vca_info["detailed-status"]))
954
955 if all_active:
956 break
957 elif n2vc_error_text:
958 db_nsr_update["config-status"] = "failed"
959 error_text = "fail configuring " + ";".join(n2vc_error_text)
960 db_nsr_update["detailed-status"] = error_text
961 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
962 db_nslcmop_update["detailed-status"] = error_text
963 db_nslcmop_update["statusEnteredTime"] = time()
964 configuration_failed = True
965 break
966 else:
967 cs = "configuring: "
968 separator = ""
969 for status, num in status_map.items():
970 cs += separator + "{}: {}".format(status, num)
971 separator = ", "
972 if old_status != cs:
973 db_nsr_update["config-status"] = cs
974 db_nsr_update["detailed-status"] = cs
975 db_nslcmop_update["detailed-status"] = cs
976 old_status = cs
977
978 if not configuration_failed:
979 # all is done
980 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
981 db_nslcmop_update["statusEnteredTime"] = time()
982 db_nslcmop_update["detailed-status"] = "done"
983 db_nsr_update["config-status"] = "configured"
984 db_nsr_update["detailed-status"] = "done"
985
986 return
987
988 except (ROclient.ROClientException, DbException, LcmException) as e:
989 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
990 exc = e
991 except asyncio.CancelledError:
992 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
993 exc = "Operation was cancelled"
994 except Exception as e:
995 exc = traceback.format_exc()
996 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
997 exc_info=True)
998 finally:
999 if exc:
1000 if db_nsr:
1001 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1002 db_nsr_update["operational-status"] = "failed"
1003 if db_nslcmop:
1004 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1005 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1006 db_nslcmop_update["statusEnteredTime"] = time()
1007 if db_nsr:
1008 db_nsr_update["_admin.nslcmop"] = None
1009 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1010 if db_nslcmop_update:
1011 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1012 if nslcmop_operation_state:
1013 try:
1014 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1015 "operationState": nslcmop_operation_state})
1016 except Exception as e:
1017 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1018
1019 self.logger.debug(logging_text + "Exit")
1020 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1021
1022 async def terminate(self, nsr_id, nslcmop_id):
1023 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1024 self.logger.debug(logging_text + "Enter")
1025 db_nsr = None
1026 db_nslcmop = None
1027 exc = None
1028 failed_detail = [] # annotates all failed error messages
1029 vca_task_list = []
1030 vca_task_dict = {}
1031 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1032 db_nslcmop_update = {}
1033 nslcmop_operation_state = None
1034 try:
1035 step = "Getting nslcmop={} from db".format(nslcmop_id)
1036 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1037 step = "Getting nsr={} from db".format(nsr_id)
1038 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1039 # nsd = db_nsr["nsd"]
1040 nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed"))
1041 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1042 return
1043 # TODO ALF remove
1044 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1045 # #TODO check if VIM is creating and wait
1046 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1047
1048 db_nsr_update["operational-status"] = "terminating"
1049 db_nsr_update["config-status"] = "terminating"
1050
1051 if nsr_lcm and nsr_lcm.get("VCA"):
1052 try:
1053 step = "Scheduling configuration charms removing"
1054 db_nsr_update["detailed-status"] = "Deleting charms"
1055 self.logger.debug(logging_text + step)
1056 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1057 for application_name, deploy_info in nsr_lcm["VCA"].items():
1058 if deploy_info: # TODO it would be desirable having a and deploy_info.get("deployed"):
1059 task = asyncio.ensure_future(
1060 self.n2vc.RemoveCharms(
1061 deploy_info['model'],
1062 application_name,
1063 # self.n2vc_callback,
1064 # db_nsr,
1065 # db_nslcmop,
1066 )
1067 )
1068 vca_task_list.append(task)
1069 vca_task_dict[application_name] = task
1070 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1071 # deploy_info['application'], None, db_nsr,
1072 # db_nslcmop, vnf_index))
1073 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "delete_charm:" + application_name, task)
1074 except Exception as e:
1075 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1076
1077 # remove from RO
1078 RO_fail = False
1079 RO = ROclient.ROClient(self.loop, **self.ro_config)
1080
1081 # Delete ns
1082 RO_nsr_id = RO_delete_action = None
1083 if nsr_lcm and nsr_lcm.get("RO"):
1084 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1085 RO_delete_action = nsr_lcm["RO"].get("nsr_delete_action_id")
1086 try:
1087 if RO_nsr_id:
1088 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1089 self.logger.debug(logging_text + step)
1090 desc = await RO.delete("ns", RO_nsr_id)
1091 RO_delete_action = desc["action_id"]
1092 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1093 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1094 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1095 if RO_delete_action:
1096 # wait until NS is deleted from VIM
1097 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1098 detailed_status_old = None
1099 self.logger.debug(logging_text + step)
1100
1101 delete_timeout = 20 * 60 # 20 minutes
1102 while delete_timeout > 0:
1103 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1104 extra_item_id=RO_delete_action)
1105 ns_status, ns_status_info = RO.check_action_status(desc)
1106 if ns_status == "ERROR":
1107 raise ROclient.ROClientException(ns_status_info)
1108 elif ns_status == "BUILD":
1109 detailed_status = step + "; {}".format(ns_status_info)
1110 elif ns_status == "ACTIVE":
1111 break
1112 else:
1113 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1114 await asyncio.sleep(5, loop=self.loop)
1115 delete_timeout -= 5
1116 if detailed_status != detailed_status_old:
1117 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1118 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1119 else: # delete_timeout <= 0:
1120 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1121
1122 except ROclient.ROClientException as e:
1123 if e.http_code == 404: # not found
1124 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1125 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1126 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1127 elif e.http_code == 409: # conflict
1128 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1129 self.logger.debug(logging_text + failed_detail[-1])
1130 RO_fail = True
1131 else:
1132 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1133 self.logger.error(logging_text + failed_detail[-1])
1134 RO_fail = True
1135
1136 # Delete nsd
1137 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"):
1138 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1139 try:
1140 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1141 "Deleting nsd at RO"
1142 await RO.delete("nsd", RO_nsd_id)
1143 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1144 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1145 except ROclient.ROClientException as e:
1146 if e.http_code == 404: # not found
1147 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1148 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1149 elif e.http_code == 409: # conflict
1150 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1151 self.logger.debug(logging_text + failed_detail[-1])
1152 RO_fail = True
1153 else:
1154 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1155 self.logger.error(logging_text + failed_detail[-1])
1156 RO_fail = True
1157
1158 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"):
1159 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1160 if not RO_vnfd_id:
1161 continue
1162 try:
1163 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1164 "Deleting vnfd={} at RO".format(vnf_id)
1165 await RO.delete("vnfd", RO_vnfd_id)
1166 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1167 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1168 except ROclient.ROClientException as e:
1169 if e.http_code == 404: # not found
1170 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1171 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1172 elif e.http_code == 409: # conflict
1173 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1174 self.logger.debug(logging_text + failed_detail[-1])
1175 else:
1176 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1177 self.logger.error(logging_text + failed_detail[-1])
1178
1179 if vca_task_list:
1180 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1181 "Waiting for deletion of configuration charms"
1182 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1183 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1184 await asyncio.wait(vca_task_list, timeout=300)
1185 for application_name, task in vca_task_dict.items():
1186 if task.cancelled():
1187 failed_detail.append("VCA[{}] Deletion has been cancelled".format(application_name))
1188 elif task.done():
1189 exc = task.exception()
1190 if exc:
1191 failed_detail.append("VCA[{}] Deletion exception: {}".format(application_name, exc))
1192 else:
1193 db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = None
1194 else: # timeout
1195 # TODO Should it be cancelled?!!
1196 task.cancel()
1197 failed_detail.append("VCA[{}] Deletion timeout".format(application_name))
1198
1199 if failed_detail:
1200 self.logger.error(logging_text + " ;".join(failed_detail))
1201 db_nsr_update["operational-status"] = "failed"
1202 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1203 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1204 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1205 db_nslcmop_update["statusEnteredTime"] = time()
1206 elif db_nslcmop["operationParams"].get("autoremove"):
1207 self.db.del_one("nsrs", {"_id": nsr_id})
1208 db_nsr_update.clear()
1209 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1210 nslcmop_operation_state = "COMPLETED"
1211 db_nslcmop_update.clear()
1212 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1213 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1214 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1215 self.logger.debug(logging_text + "Delete from database")
1216 else:
1217 db_nsr_update["operational-status"] = "terminated"
1218 db_nsr_update["detailed-status"] = "Done"
1219 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1220 db_nslcmop_update["detailed-status"] = "Done"
1221 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1222 db_nslcmop_update["statusEnteredTime"] = time()
1223
1224 except (ROclient.ROClientException, DbException) as e:
1225 self.logger.error(logging_text + "Exit Exception {}".format(e))
1226 exc = e
1227 except asyncio.CancelledError:
1228 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1229 exc = "Operation was cancelled"
1230 except Exception as e:
1231 exc = traceback.format_exc()
1232 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1233 finally:
1234 if exc and db_nslcmop:
1235 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1236 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1237 db_nslcmop_update["statusEnteredTime"] = time()
1238 if db_nslcmop_update:
1239 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1240 if db_nsr:
1241 db_nsr_update["_admin.nslcmop"] = None
1242 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1243 if nslcmop_operation_state:
1244 try:
1245 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1246 "operationState": nslcmop_operation_state})
1247 except Exception as e:
1248 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1249 self.logger.debug(logging_text + "Exit")
1250 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1251
1252 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, primitive, primitive_params):
1253
1254 vdu_id_text = "vnfd"
1255 if vdu_id:
1256 vdu_id_text = vdu_id
1257 application_name = self.n2vc.FormatApplicationName(
1258 nsr_name,
1259 member_vnf_index,
1260 vdu_id_text
1261 )
1262 vca_deployed = db_deployed["VCA"].get(application_name)
1263 if not vca_deployed:
1264 raise LcmException("charm for member_vnf_index={} vdu_id={} is not deployed".format(member_vnf_index,
1265 vdu_id))
1266 model_name = vca_deployed.get("model")
1267 application_name = vca_deployed.get("application")
1268 if not model_name or not application_name:
1269 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index))
1270 if vca_deployed["operational-status"] != "active":
1271 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1272 member_vnf_index, vca_deployed["operational-status"]))
1273 callback = None # self.n2vc_callback
1274 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1275 await self.n2vc.login()
1276 task = asyncio.ensure_future(
1277 self.n2vc.ExecutePrimitive(
1278 model_name,
1279 application_name,
1280 primitive, callback,
1281 *callback_args,
1282 **primitive_params
1283 )
1284 )
1285 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1286 # db_nsr, db_nslcmop, member_vnf_index))
1287 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1288 # wait until completed with timeout
1289 await asyncio.wait((task,), timeout=600)
1290
1291 result = "FAILED" # by default
1292 result_detail = ""
1293 if task.cancelled():
1294 result_detail = "Task has been cancelled"
1295 elif task.done():
1296 exc = task.exception()
1297 if exc:
1298 result_detail = str(exc)
1299 else:
1300 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1301 result = "COMPLETED"
1302 result_detail = "Done"
1303 else: # timeout
1304 # TODO Should it be cancelled?!!
1305 task.cancel()
1306 result_detail = "timeout"
1307 return result, result_detail
1308
1309 async def action(self, nsr_id, nslcmop_id):
1310 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1311 self.logger.debug(logging_text + "Enter")
1312 # get all needed from database
1313 db_nsr = None
1314 db_nslcmop = None
1315 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1316 db_nslcmop_update = {}
1317 nslcmop_operation_state = None
1318 exc = None
1319 try:
1320 step = "Getting information from database"
1321 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1322 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1323 nsr_lcm = db_nsr["_admin"].get("deployed")
1324 nsr_name = db_nsr["name"]
1325 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1326 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1327
1328 # look if previous tasks in process
1329 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1330 if task_dependency:
1331 step = db_nslcmop_update["detailed-status"] = \
1332 "Waiting for related tasks to be completed: {}".format(task_name)
1333 self.logger.debug(logging_text + step)
1334 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1335 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1336 if pending:
1337 raise LcmException("Timeout waiting related tasks to be completed")
1338
1339 # TODO check if ns is in a proper status
1340 primitive = db_nslcmop["operationParams"]["primitive"]
1341 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1342 result, result_detail = await self._ns_execute_primitive(nsr_lcm, nsr_name, vnf_index, vdu_id, primitive,
1343 primitive_params)
1344 db_nslcmop_update["detailed-status"] = result_detail
1345 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1346 db_nslcmop_update["statusEnteredTime"] = time()
1347 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1348 return # database update is called inside finally
1349
1350 except (DbException, LcmException) as e:
1351 self.logger.error(logging_text + "Exit Exception {}".format(e))
1352 exc = e
1353 except asyncio.CancelledError:
1354 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1355 exc = "Operation was cancelled"
1356 except Exception as e:
1357 exc = traceback.format_exc()
1358 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1359 finally:
1360 if exc and db_nslcmop:
1361 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1362 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1363 db_nslcmop_update["statusEnteredTime"] = time()
1364 if db_nslcmop_update:
1365 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1366 if db_nsr:
1367 db_nsr_update["_admin.nslcmop"] = None
1368 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1369 self.logger.debug(logging_text + "Exit")
1370 if nslcmop_operation_state:
1371 try:
1372 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1373 "operationState": nslcmop_operation_state})
1374 except Exception as e:
1375 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1376 self.logger.debug(logging_text + "Exit")
1377 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1378
1379 async def scale(self, nsr_id, nslcmop_id):
1380 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1381 self.logger.debug(logging_text + "Enter")
1382 # get all needed from database
1383 db_nsr = None
1384 db_nslcmop = None
1385 db_nslcmop_update = {}
1386 nslcmop_operation_state = None
1387 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1388 exc = None
1389 # in case of error, indicates what part of scale was failed to put nsr at error status
1390 scale_process = None
1391 old_operational_status = ""
1392 old_config_status = ""
1393 try:
1394 step = "Getting nslcmop from database"
1395 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1396 step = "Getting nsr from database"
1397 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1398 old_operational_status = db_nsr["operational-status"]
1399 old_config_status = db_nsr["config-status"]
1400
1401 # look if previous tasks in process
1402 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1403 if task_dependency:
1404 step = db_nslcmop_update["detailed-status"] = \
1405 "Waiting for related tasks to be completed: {}".format(task_name)
1406 self.logger.debug(logging_text + step)
1407 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1408 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1409 if pending:
1410 raise LcmException("Timeout waiting related tasks to be completed")
1411
1412 step = "Parsing scaling parameters"
1413 db_nsr_update["operational-status"] = "scaling"
1414 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1415 nsr_lcm = db_nsr["_admin"].get("deployed")
1416 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
1417 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1418 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1419 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1420 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1421
1422 step = "Getting vnfr from database"
1423 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1424 step = "Getting vnfd from database"
1425 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1426 step = "Getting scaling-group-descriptor"
1427 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1428 if scaling_descriptor["name"] == scaling_group:
1429 break
1430 else:
1431 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1432 "at vnfd:scaling-group-descriptor".format(scaling_group))
1433 # cooldown_time = 0
1434 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1435 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1436 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1437 # break
1438
1439 # TODO check if ns is in a proper status
1440 step = "Sending scale order to RO"
1441 nb_scale_op = 0
1442 if not db_nsr["_admin"].get("scaling-group"):
1443 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1444 admin_scale_index = 0
1445 else:
1446 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1447 if admin_scale_info["name"] == scaling_group:
1448 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1449 break
1450 else: # not found, set index one plus last element and add new entry with the name
1451 admin_scale_index += 1
1452 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1453 RO_scaling_info = []
1454 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1455 if scaling_type == "SCALE_OUT":
1456 # count if max-instance-count is reached
1457 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1458 max_instance_count = int(scaling_descriptor["max-instance-count"])
1459 if nb_scale_op >= max_instance_count:
1460 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1461 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1462 nb_scale_op = nb_scale_op + 1
1463 vdu_scaling_info["scaling_direction"] = "OUT"
1464 vdu_scaling_info["vdu-create"] = {}
1465 for vdu_scale_info in scaling_descriptor["vdu"]:
1466 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1467 "type": "create", "count": vdu_scale_info.get("count", 1)})
1468 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1469 elif scaling_type == "SCALE_IN":
1470 # count if min-instance-count is reached
1471 min_instance_count = 0
1472 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1473 min_instance_count = int(scaling_descriptor["min-instance-count"])
1474 if nb_scale_op <= min_instance_count:
1475 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1476 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1477 nb_scale_op = nb_scale_op - 1
1478 vdu_scaling_info["scaling_direction"] = "IN"
1479 vdu_scaling_info["vdu-delete"] = {}
1480 for vdu_scale_info in scaling_descriptor["vdu"]:
1481 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1482 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1483 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1484
1485 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1486 vdu_create = vdu_scaling_info.get("vdu-create")
1487 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1488 if vdu_scaling_info["scaling_direction"] == "IN":
1489 for vdur in reversed(db_vnfr["vdur"]):
1490 if vdu_delete.get(vdur["vdu-id-ref"]):
1491 vdu_delete[vdur["vdu-id-ref"]] -= 1
1492 vdu_scaling_info["vdu"].append({
1493 "name": vdur["name"],
1494 "vdu_id": vdur["vdu-id-ref"],
1495 "interface": []
1496 })
1497 for interface in vdur["interfaces"]:
1498 vdu_scaling_info["vdu"][-1]["interface"].append({
1499 "name": interface["name"],
1500 "ip_address": interface["ip-address"],
1501 "mac_address": interface.get("mac-address"),
1502 })
1503 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1504
1505 # execute primitive service PRE-SCALING
1506 step = "Executing pre-scale vnf-config-primitive"
1507 if scaling_descriptor.get("scaling-config-action"):
1508 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1509 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1510 and scaling_type == "SCALE_IN":
1511 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1512 step = db_nslcmop_update["detailed-status"] = \
1513 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1514 # look for primitive
1515 primitive_params = {}
1516 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1517 if config_primitive["name"] == vnf_config_primitive:
1518 for parameter in config_primitive.get("parameter", ()):
1519 if 'default-value' in parameter and \
1520 parameter['default-value'] == "<VDU_SCALE_INFO>":
1521 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1522 default_flow_style=True,
1523 width=256)
1524 break
1525 else:
1526 raise LcmException(
1527 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1528 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1529 "primitive".format(scaling_group, config_primitive))
1530 scale_process = "VCA"
1531 db_nsr_update["config-status"] = "configuring pre-scaling"
1532 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1533 vnf_config_primitive, primitive_params)
1534 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1535 vnf_config_primitive, result, result_detail))
1536 if result == "FAILED":
1537 raise LcmException(result_detail)
1538 db_nsr_update["config-status"] = old_config_status
1539 scale_process = None
1540
1541 if RO_scaling_info:
1542 scale_process = "RO"
1543 RO = ROclient.ROClient(self.loop, **self.ro_config)
1544 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1545 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1546 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1547 # wait until ready
1548 RO_nslcmop_id = RO_desc["instance_action_id"]
1549 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1550
1551 RO_task_done = False
1552 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1553 detailed_status_old = None
1554 self.logger.debug(logging_text + step)
1555
1556 deployment_timeout = 1 * 3600 # One hour
1557 while deployment_timeout > 0:
1558 if not RO_task_done:
1559 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1560 extra_item_id=RO_nslcmop_id)
1561 ns_status, ns_status_info = RO.check_action_status(desc)
1562 if ns_status == "ERROR":
1563 raise ROclient.ROClientException(ns_status_info)
1564 elif ns_status == "BUILD":
1565 detailed_status = step + "; {}".format(ns_status_info)
1566 elif ns_status == "ACTIVE":
1567 RO_task_done = True
1568 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1569 self.logger.debug(logging_text + step)
1570 else:
1571 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1572 else:
1573 desc = await RO.show("ns", RO_nsr_id)
1574 ns_status, ns_status_info = RO.check_ns_status(desc)
1575 if ns_status == "ERROR":
1576 raise ROclient.ROClientException(ns_status_info)
1577 elif ns_status == "BUILD":
1578 detailed_status = step + "; {}".format(ns_status_info)
1579 elif ns_status == "ACTIVE":
1580 step = detailed_status = "Waiting for management IP address reported by the VIM"
1581 try:
1582 desc = await RO.show("ns", RO_nsr_id)
1583 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
1584 break
1585 except ROclient.ROClientException as e:
1586 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
1587 raise e
1588 else:
1589 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1590 if detailed_status != detailed_status_old:
1591 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1592 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1593
1594 await asyncio.sleep(5, loop=self.loop)
1595 deployment_timeout -= 5
1596 if deployment_timeout <= 0:
1597 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1598
1599 step = "Updating VNFRs"
1600 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1601 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1602
1603 # update VDU_SCALING_INFO with the obtained ip_addresses
1604 if vdu_scaling_info["scaling_direction"] == "OUT":
1605 for vdur in reversed(db_vnfr["vdur"]):
1606 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1607 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1608 vdu_scaling_info["vdu"].append({
1609 "name": vdur["name"],
1610 "vdu_id": vdur["vdu-id-ref"],
1611 "interface": []
1612 })
1613 for interface in vdur["interfaces"]:
1614 vdu_scaling_info["vdu"][-1]["interface"].append({
1615 "name": interface["name"],
1616 "ip_address": interface["ip-address"],
1617 "mac_address": interface.get("mac-address"),
1618 })
1619 del vdu_scaling_info["vdu-create"]
1620
1621 scale_process = None
1622 if db_nsr_update:
1623 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1624
1625 # execute primitive service POST-SCALING
1626 step = "Executing post-scale vnf-config-primitive"
1627 if scaling_descriptor.get("scaling-config-action"):
1628 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1629 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1630 and scaling_type == "SCALE_OUT":
1631 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1632 step = db_nslcmop_update["detailed-status"] = \
1633 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1634 # look for primitive
1635 primitive_params = {}
1636 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1637 if config_primitive["name"] == vnf_config_primitive:
1638 for parameter in config_primitive.get("parameter", ()):
1639 if 'default-value' in parameter and \
1640 parameter['default-value'] == "<VDU_SCALE_INFO>":
1641 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1642 default_flow_style=True,
1643 width=256)
1644 break
1645 else:
1646 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1647 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1648 "match any vnf-configuration:config-primitive".format(scaling_group,
1649 config_primitive))
1650 scale_process = "VCA"
1651 db_nsr_update["config-status"] = "configuring post-scaling"
1652
1653 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1654 vnf_config_primitive, primitive_params)
1655 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1656 vnf_config_primitive, result, result_detail))
1657 if result == "FAILED":
1658 raise LcmException(result_detail)
1659 db_nsr_update["config-status"] = old_config_status
1660 scale_process = None
1661
1662 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1663 db_nslcmop_update["statusEnteredTime"] = time()
1664 db_nslcmop_update["detailed-status"] = "done"
1665 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1666 db_nsr_update["operational-status"] = old_operational_status
1667 db_nsr_update["config-status"] = old_config_status
1668 return
1669 except (ROclient.ROClientException, DbException, LcmException) as e:
1670 self.logger.error(logging_text + "Exit Exception {}".format(e))
1671 exc = e
1672 except asyncio.CancelledError:
1673 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1674 exc = "Operation was cancelled"
1675 except Exception as e:
1676 exc = traceback.format_exc()
1677 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1678 finally:
1679 if exc:
1680 if db_nslcmop:
1681 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1682 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1683 db_nslcmop_update["statusEnteredTime"] = time()
1684 if db_nsr:
1685 db_nsr_update["operational-status"] = old_operational_status
1686 db_nsr_update["config-status"] = old_config_status
1687 db_nsr_update["detailed-status"] = ""
1688 db_nsr_update["_admin.nslcmop"] = None
1689 if scale_process:
1690 if "VCA" in scale_process:
1691 db_nsr_update["config-status"] = "failed"
1692 if "RO" in scale_process:
1693 db_nsr_update["operational-status"] = "failed"
1694 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1695 exc)
1696 if db_nslcmop_update:
1697 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1698 if db_nsr:
1699 db_nsr_update["_admin.nslcmop"] = None
1700 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1701 if nslcmop_operation_state:
1702 try:
1703 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1704 "operationState": nslcmop_operation_state})
1705 # if cooldown_time:
1706 # await asyncio.sleep(cooldown_time)
1707 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1708 except Exception as e:
1709 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1710 self.logger.debug(logging_text + "Exit")
1711 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")