bug 585. Adding vdu-name/count-index at VCA deployment information
[osm/LCM.git] / osm_lcm / ns.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import logging
7 import logging.handlers
8 import functools
9 import traceback
10
11 import ROclient
12 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
13
14 from osm_common.dbbase import DbException
15 from osm_common.fsbase import FsException
16 from n2vc.vnf import N2VC
17
18 from copy import copy, deepcopy
19 from http import HTTPStatus
20 from time import time
21 from uuid import uuid4
22
23 __author__ = "Alfonso Tierno"
24
25
26 def get_iterable(in_dict, in_key):
27 """
28 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
29 :param in_dict: a dictionary
30 :param in_key: the key to look for at in_dict
31 :return: in_dict[in_var] or () if it is None or not present
32 """
33 if not in_dict.get(in_key):
34 return ()
35 return in_dict[in_key]
36
37
38 def populate_dict(target_dict, key_list, value):
39 """
40 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
41 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
42 :param target_dict: dictionary to be changed
43 :param key_list: list of keys to insert at target_dict
44 :param value:
45 :return: None
46 """
47 for key in key_list[0:-1]:
48 if key not in target_dict:
49 target_dict[key] = {}
50 target_dict = target_dict[key]
51 target_dict[key_list[-1]] = value
52
53
54 class NsLcm(LcmBase):
55 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
56 total_deploy_timeout = 2 * 3600 # global timeout for deployment
57
58 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 # logging
65 self.logger = logging.getLogger('lcm.ns')
66 self.loop = loop
67 self.lcm_tasks = lcm_tasks
68
69 super().__init__(db, msg, fs, self.logger)
70
71 self.ro_config = ro_config
72
73 self.n2vc = N2VC(
74 log=self.logger,
75 server=vca_config['host'],
76 port=vca_config['port'],
77 user=vca_config['user'],
78 secret=vca_config['secret'],
79 # TODO: This should point to the base folder where charms are stored,
80 # if there is a common one (like object storage). Otherwise, leave
81 # it unset and pass it via DeployCharms
82 # artifacts=vca_config[''],
83 artifacts=None,
84 )
85
86 def vnfd2RO(self, vnfd, new_id=None):
87 """
88 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
89 :param vnfd: input vnfd
90 :param new_id: overrides vnf id if provided
91 :return: copy of vnfd
92 """
93 ci_file = None
94 try:
95 vnfd_RO = deepcopy(vnfd)
96 vnfd_RO.pop("_id", None)
97 vnfd_RO.pop("_admin", None)
98 if new_id:
99 vnfd_RO["id"] = new_id
100 for vdu in vnfd_RO.get("vdu", ()):
101 if "cloud-init-file" in vdu:
102 base_folder = vnfd["_admin"]["storage"]
103 clout_init_file = "{}/{}/cloud_init/{}".format(
104 base_folder["folder"],
105 base_folder["pkg-dir"],
106 vdu["cloud-init-file"]
107 )
108 ci_file = self.fs.file_open(clout_init_file, "r")
109 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
110 # convert to base 64 or similar
111 clout_init_content = ci_file.read()
112 ci_file.close()
113 ci_file = None
114 vdu.pop("cloud-init-file", None)
115 vdu["cloud-init"] = clout_init_content
116 # remnove unused by RO configuration, monitoring, scaling
117 vnfd_RO.pop("vnf-configuration", None)
118 vnfd_RO.pop("monitoring-param", None)
119 vnfd_RO.pop("scaling-group-descriptor", None)
120 return vnfd_RO
121 except FsException as e:
122 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
123 finally:
124 if ci_file:
125 ci_file.close()
126
127 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
128 """
129 Callback both for charm status change and task completion
130 :param model_name: Charm model name
131 :param application_name: Charm application name
132 :param status: Can be
133 - blocked: The unit needs manual intervention
134 - maintenance: The unit is actively deploying/configuring
135 - waiting: The unit is waiting for another charm to be ready
136 - active: The unit is deployed, configured, and ready
137 - error: The charm has failed and needs attention.
138 - terminated: The charm has been destroyed
139 - removing,
140 - removed
141 :param message: detailed message error
142 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
143 nsr_id:
144 nslcmop_id:
145 lcmOperationType: currently "instantiate"
146 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
147 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
148 n2vc_event: event used to notify instantiation task that some change has been produced
149 :param task: None for charm status change, or task for completion task callback
150 :return:
151 """
152 try:
153 nsr_id = n2vc_info["nsr_id"]
154 deployed = n2vc_info["deployed"]
155 db_nsr_update = n2vc_info["db_update"]
156 nslcmop_id = n2vc_info["nslcmop_id"]
157 ns_operation = n2vc_info["lcmOperationType"]
158 n2vc_event = n2vc_info["n2vc_event"]
159 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
160 application_name)
161 for vca_index, vca_deployed in enumerate(deployed):
162 if not vca_deployed:
163 continue
164 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
165 break
166 else:
167 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
168 return
169 if task:
170 if task.cancelled():
171 self.logger.debug(logging_text + " task Cancelled")
172 vca_deployed['operational-status'] = "error"
173 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
174 vca_deployed['detailed-status'] = "Task Cancelled"
175 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
176
177 elif task.done():
178 exc = task.exception()
179 if exc:
180 self.logger.error(logging_text + " task Exception={}".format(exc))
181 vca_deployed['operational-status'] = "error"
182 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
183 vca_deployed['detailed-status'] = str(exc)
184 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
185 else:
186 self.logger.debug(logging_text + " task Done")
187 # task is Done, but callback is still ongoing. So ignore
188 return
189 elif status:
190 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
191 if vca_deployed['operational-status'] == status:
192 return # same status, ignore
193 vca_deployed['operational-status'] = status
194 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
195 vca_deployed['detailed-status'] = str(message)
196 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
197 else:
198 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
199 return
200 # wake up instantiate task
201 n2vc_event.set()
202 except Exception as e:
203 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
204
205 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
206 """
207 Creates a RO ns descriptor from OSM ns_instantiate params
208 :param ns_params: OSM instantiate params
209 :return: The RO ns descriptor
210 """
211 vim_2_RO = {}
212 # TODO feature 1417: Check that no instantiation is set over PDU
213 # check if PDU forces a concrete vim-network-id and add it
214 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
215
216 def vim_account_2_RO(vim_account):
217 if vim_account in vim_2_RO:
218 return vim_2_RO[vim_account]
219
220 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
221 if db_vim["_admin"]["operationalState"] != "ENABLED":
222 raise LcmException("VIM={} is not available. operationalState={}".format(
223 vim_account, db_vim["_admin"]["operationalState"]))
224 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
225 vim_2_RO[vim_account] = RO_vim_id
226 return RO_vim_id
227
228 def ip_profile_2_RO(ip_profile):
229 RO_ip_profile = deepcopy((ip_profile))
230 if "dns-server" in RO_ip_profile:
231 if isinstance(RO_ip_profile["dns-server"], list):
232 RO_ip_profile["dns-address"] = []
233 for ds in RO_ip_profile.pop("dns-server"):
234 RO_ip_profile["dns-address"].append(ds['address'])
235 else:
236 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
237 if RO_ip_profile.get("ip-version") == "ipv4":
238 RO_ip_profile["ip-version"] = "IPv4"
239 if RO_ip_profile.get("ip-version") == "ipv6":
240 RO_ip_profile["ip-version"] = "IPv6"
241 if "dhcp-params" in RO_ip_profile:
242 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
243 return RO_ip_profile
244
245 if not ns_params:
246 return None
247 RO_ns_params = {
248 # "name": ns_params["nsName"],
249 # "description": ns_params.get("nsDescription"),
250 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
251 # "scenario": ns_params["nsdId"],
252 }
253 if n2vc_key_list:
254 for vnfd_ref, vnfd in vnfd_dict.items():
255 vdu_needed_access = []
256 mgmt_cp = None
257 if vnfd.get("vnf-configuration"):
258 if vnfd.get("mgmt-interface"):
259 if vnfd["mgmt-interface"].get("vdu-id"):
260 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
261 elif vnfd["mgmt-interface"].get("cp"):
262 mgmt_cp = vnfd["mgmt-interface"]["cp"]
263
264 for vdu in vnfd.get("vdu", ()):
265 if vdu.get("vdu-configuration"):
266 vdu_needed_access.append(vdu["id"])
267 elif mgmt_cp:
268 for vdu_interface in vdu.get("interface"):
269 if vdu_interface.get("external-connection-point-ref") and \
270 vdu_interface["external-connection-point-ref"] == mgmt_cp:
271 vdu_needed_access.append(vdu["id"])
272 mgmt_cp = None
273 break
274
275 if vdu_needed_access:
276 for vnf_member in nsd.get("constituent-vnfd"):
277 if vnf_member["vnfd-id-ref"] != vnfd_ref:
278 continue
279 for vdu in vdu_needed_access:
280 populate_dict(RO_ns_params,
281 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
282 n2vc_key_list)
283
284 if ns_params.get("vduImage"):
285 RO_ns_params["vduImage"] = ns_params["vduImage"]
286
287 if ns_params.get("ssh_keys"):
288 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
289 for vnf_params in get_iterable(ns_params, "vnf"):
290 for constituent_vnfd in nsd["constituent-vnfd"]:
291 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
292 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
293 break
294 else:
295 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
296 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
297 if vnf_params.get("vimAccountId"):
298 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
299 vim_account_2_RO(vnf_params["vimAccountId"]))
300
301 for vdu_params in get_iterable(vnf_params, "vdu"):
302 # TODO feature 1417: check that this VDU exist and it is not a PDU
303 if vdu_params.get("volume"):
304 for volume_params in vdu_params["volume"]:
305 if volume_params.get("vim-volume-id"):
306 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
307 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
308 volume_params["vim-volume-id"])
309 if vdu_params.get("interface"):
310 for interface_params in vdu_params["interface"]:
311 if interface_params.get("ip-address"):
312 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
313 vdu_params["id"], "interfaces", interface_params["name"],
314 "ip_address"),
315 interface_params["ip-address"])
316 if interface_params.get("mac-address"):
317 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
318 vdu_params["id"], "interfaces", interface_params["name"],
319 "mac_address"),
320 interface_params["mac-address"])
321 if interface_params.get("floating-ip-required"):
322 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
323 vdu_params["id"], "interfaces", interface_params["name"],
324 "floating-ip"),
325 interface_params["floating-ip-required"])
326
327 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
328 if internal_vld_params.get("vim-network-name"):
329 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
330 internal_vld_params["name"], "vim-network-name"),
331 internal_vld_params["vim-network-name"])
332 if internal_vld_params.get("ip-profile"):
333 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
334 internal_vld_params["name"], "ip-profile"),
335 ip_profile_2_RO(internal_vld_params["ip-profile"]))
336
337 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
338 # look for interface
339 iface_found = False
340 for vdu_descriptor in vnf_descriptor["vdu"]:
341 for vdu_interface in vdu_descriptor["interface"]:
342 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
343 if icp_params.get("ip-address"):
344 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
345 vdu_descriptor["id"], "interfaces",
346 vdu_interface["name"], "ip_address"),
347 icp_params["ip-address"])
348
349 if icp_params.get("mac-address"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_descriptor["id"], "interfaces",
352 vdu_interface["name"], "mac_address"),
353 icp_params["mac-address"])
354 iface_found = True
355 break
356 if iface_found:
357 break
358 else:
359 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
360 "internal-vld:id-ref={} is not present at vnfd:internal-"
361 "connection-point".format(vnf_params["member-vnf-index"],
362 icp_params["id-ref"]))
363
364 for vld_params in get_iterable(ns_params, "vld"):
365 if "ip-profile" in vld_params:
366 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
367 ip_profile_2_RO(vld_params["ip-profile"]))
368 if vld_params.get("vim-network-name"):
369 RO_vld_sites = []
370 if isinstance(vld_params["vim-network-name"], dict):
371 for vim_account, vim_net in vld_params["vim-network-name"].items():
372 RO_vld_sites.append({
373 "netmap-use": vim_net,
374 "datacenter": vim_account_2_RO(vim_account)
375 })
376 else: # isinstance str
377 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
378 if RO_vld_sites:
379 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
380 if "vnfd-connection-point-ref" in vld_params:
381 for cp_params in vld_params["vnfd-connection-point-ref"]:
382 # look for interface
383 for constituent_vnfd in nsd["constituent-vnfd"]:
384 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
385 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
386 break
387 else:
388 raise LcmException(
389 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
390 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
391 match_cp = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for interface_descriptor in vdu_descriptor["interface"]:
394 if interface_descriptor.get("external-connection-point-ref") == \
395 cp_params["vnfd-connection-point-ref"]:
396 match_cp = True
397 break
398 if match_cp:
399 break
400 else:
401 raise LcmException(
402 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
403 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
404 cp_params["member-vnf-index-ref"],
405 cp_params["vnfd-connection-point-ref"],
406 vnf_descriptor["id"]))
407 if cp_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
409 vdu_descriptor["id"], "interfaces",
410 interface_descriptor["name"], "ip_address"),
411 cp_params["ip-address"])
412 if cp_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
414 vdu_descriptor["id"], "interfaces",
415 interface_descriptor["name"], "mac_address"),
416 cp_params["mac-address"])
417 return RO_ns_params
418
419 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
420 # make a copy to do not change
421 vdu_create = copy(vdu_create)
422 vdu_delete = copy(vdu_delete)
423
424 vdurs = db_vnfr.get("vdur")
425 if vdurs is None:
426 vdurs = []
427 vdu_index = len(vdurs)
428 while vdu_index:
429 vdu_index -= 1
430 vdur = vdurs[vdu_index]
431 if vdur.get("pdu-type"):
432 continue
433 vdu_id_ref = vdur["vdu-id-ref"]
434 if vdu_create and vdu_create.get(vdu_id_ref):
435 for index in range(0, vdu_create[vdu_id_ref]):
436 vdur = deepcopy(vdur)
437 vdur["_id"] = str(uuid4())
438 vdur["count-index"] += 1
439 vdurs.insert(vdu_index+1+index, vdur)
440 del vdu_create[vdu_id_ref]
441 if vdu_delete and vdu_delete.get(vdu_id_ref):
442 del vdurs[vdu_index]
443 vdu_delete[vdu_id_ref] -= 1
444 if not vdu_delete[vdu_id_ref]:
445 del vdu_delete[vdu_id_ref]
446 # check all operations are done
447 if vdu_create or vdu_delete:
448 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
449 vdu_create))
450 if vdu_delete:
451 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
452 vdu_delete))
453
454 vnfr_update = {"vdur": vdurs}
455 db_vnfr["vdur"] = vdurs
456 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
457
458 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
459 """
460 Updates database nsr with the RO info for the created vld
461 :param ns_update_nsr: dictionary to be filled with the updated info
462 :param db_nsr: content of db_nsr. This is also modified
463 :param nsr_desc_RO: nsr descriptor from RO
464 :return: Nothing, LcmException is raised on errors
465 """
466
467 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
468 for net_RO in get_iterable(nsr_desc_RO, "nets"):
469 if vld["id"] != net_RO.get("ns_net_osm_id"):
470 continue
471 vld["vim-id"] = net_RO.get("vim_net_id")
472 vld["name"] = net_RO.get("vim_name")
473 vld["status"] = net_RO.get("status")
474 vld["status-detailed"] = net_RO.get("error_msg")
475 ns_update_nsr["vld.{}".format(vld_index)] = vld
476 break
477 else:
478 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
479
480 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
481 """
482 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
483 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
484 :param nsr_desc_RO: nsr descriptor from RO
485 :return: Nothing, LcmException is raised on errors
486 """
487 for vnf_index, db_vnfr in db_vnfrs.items():
488 for vnf_RO in nsr_desc_RO["vnfs"]:
489 if vnf_RO["member_vnf_index"] != vnf_index:
490 continue
491 vnfr_update = {}
492 if vnf_RO.get("ip_address"):
493 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
494 elif not db_vnfr.get("ip-address"):
495 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
496
497 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
498 vdur_RO_count_index = 0
499 if vdur.get("pdu-type"):
500 continue
501 for vdur_RO in get_iterable(vnf_RO, "vms"):
502 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
503 continue
504 if vdur["count-index"] != vdur_RO_count_index:
505 vdur_RO_count_index += 1
506 continue
507 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
508 vdur["ip-address"] = vdur_RO.get("ip_address")
509 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
510 vdur["name"] = vdur_RO.get("vim_name")
511 vdur["status"] = vdur_RO.get("status")
512 vdur["status-detailed"] = vdur_RO.get("error_msg")
513 for ifacer in get_iterable(vdur, "interfaces"):
514 for interface_RO in get_iterable(vdur_RO, "interfaces"):
515 if ifacer["name"] == interface_RO.get("internal_name"):
516 ifacer["ip-address"] = interface_RO.get("ip_address")
517 ifacer["mac-address"] = interface_RO.get("mac_address")
518 break
519 else:
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
521 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
522 vnfr_update["vdur.{}".format(vdu_index)] = vdur
523 break
524 else:
525 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
526 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
527
528 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
529 for net_RO in get_iterable(nsr_desc_RO, "nets"):
530 if vld["id"] != net_RO.get("vnf_net_osm_id"):
531 continue
532 vld["vim-id"] = net_RO.get("vim_net_id")
533 vld["name"] = net_RO.get("vim_name")
534 vld["status"] = net_RO.get("status")
535 vld["status-detailed"] = net_RO.get("error_msg")
536 vnfr_update["vld.{}".format(vld_index)] = vld
537 break
538 else:
539 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
540 vnf_index, vld["id"]))
541
542 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
543 break
544
545 else:
546 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
547
548 async def instantiate(self, nsr_id, nslcmop_id):
549 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
550 self.logger.debug(logging_text + "Enter")
551 # get all needed from database
552 start_deploy = time()
553 db_nsr = None
554 db_nslcmop = None
555 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
556 db_nslcmop_update = {}
557 nslcmop_operation_state = None
558 db_vnfrs = {}
559 RO_descriptor_number = 0 # number of descriptors created at RO
560 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
561 n2vc_info = {}
562 exc = None
563 try:
564 step = "Getting nslcmop={} from db".format(nslcmop_id)
565 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
566 step = "Getting nsr={} from db".format(nsr_id)
567 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
568 ns_params = db_nslcmop.get("operationParams")
569 nsd = db_nsr["nsd"]
570 nsr_name = db_nsr["name"] # TODO short-name??
571
572 # look if previous tasks in process
573 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
574 if task_dependency:
575 step = db_nslcmop_update["detailed-status"] = \
576 "Waiting for related tasks to be completed: {}".format(task_name)
577 self.logger.debug(logging_text + step)
578 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
579 _, pending = await asyncio.wait(task_dependency, timeout=3600)
580 if pending:
581 raise LcmException("Timeout waiting related tasks to be completed")
582
583 step = "Getting vnfrs from db"
584 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
585 db_vnfds_ref = {}
586 db_vnfds = {}
587 for vnfr in db_vnfrs_list:
588 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
589 vnfd_id = vnfr["vnfd-id"]
590 vnfd_ref = vnfr["vnfd-ref"]
591 if vnfd_id not in db_vnfds:
592 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
593 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
594 db_vnfds_ref[vnfd_ref] = vnfd
595 db_vnfds[vnfd_id] = vnfd
596
597 # Get or generates the _admin.deployed,VCA list
598 vca_deployed_list = None
599 if db_nsr["_admin"].get("deployed"):
600 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
601 if vca_deployed_list is None:
602 vca_deployed_list = []
603 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
604 elif isinstance(vca_deployed_list, dict):
605 # maintain backward compatibility. Change a dict to list at database
606 vca_deployed_list = list(vca_deployed_list.values())
607 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
608
609 db_nsr_update["detailed-status"] = "creating"
610 db_nsr_update["operational-status"] = "init"
611
612 RO = ROclient.ROClient(self.loop, **self.ro_config)
613
614 # get vnfds, instantiate at RO
615 for vnfd_id, vnfd in db_vnfds.items():
616 vnfd_ref = vnfd["id"]
617 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
618 # self.logger.debug(logging_text + step)
619 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
620 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
621 RO_descriptor_number += 1
622
623 # look if present
624 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
625 if vnfd_list:
626 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
627 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
628 vnfd_ref, vnfd_list[0]["uuid"]))
629 else:
630 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
631 desc = await RO.create("vnfd", descriptor=vnfd_RO)
632 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
633 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
634 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
635 vnfd_ref, desc["uuid"]))
636 self.update_db_2("nsrs", nsr_id, db_nsr_update)
637
638 # create nsd at RO
639 nsd_ref = nsd["id"]
640 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
641 # self.logger.debug(logging_text + step)
642
643 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
644 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
645 RO_descriptor_number += 1
646 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
647 if nsd_list:
648 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
649 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
650 nsd_ref, RO_nsd_uuid))
651 else:
652 nsd_RO = deepcopy(nsd)
653 nsd_RO["id"] = RO_osm_nsd_id
654 nsd_RO.pop("_id", None)
655 nsd_RO.pop("_admin", None)
656 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
657 vnfd_id = c_vnf["vnfd-id-ref"]
658 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
659 desc = await RO.create("nsd", descriptor=nsd_RO)
660 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
661 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
662 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
663 self.update_db_2("nsrs", nsr_id, db_nsr_update)
664
665 # Crate ns at RO
666 # if present use it unless in error status
667 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
668 if RO_nsr_id:
669 try:
670 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
671 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
672 desc = await RO.show("ns", RO_nsr_id)
673 except ROclient.ROClientException as e:
674 if e.http_code != HTTPStatus.NOT_FOUND:
675 raise
676 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
677 if RO_nsr_id:
678 ns_status, ns_status_info = RO.check_ns_status(desc)
679 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
680 if ns_status == "ERROR":
681 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
682 self.logger.debug(logging_text + step)
683 await RO.delete("ns", RO_nsr_id)
684 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
685 if not RO_nsr_id:
686 step = db_nsr_update["detailed-status"] = "Checking dependencies"
687 # self.logger.debug(logging_text + step)
688
689 # check if VIM is creating and wait look if previous tasks in process
690 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
691 if task_dependency:
692 step = "Waiting for related tasks to be completed: {}".format(task_name)
693 self.logger.debug(logging_text + step)
694 await asyncio.wait(task_dependency, timeout=3600)
695 if ns_params.get("vnf"):
696 for vnf in ns_params["vnf"]:
697 if "vimAccountId" in vnf:
698 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
699 vnf["vimAccountId"])
700 if task_dependency:
701 step = "Waiting for related tasks to be completed: {}".format(task_name)
702 self.logger.debug(logging_text + step)
703 await asyncio.wait(task_dependency, timeout=3600)
704
705 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
706
707 # feature 1429. Add n2vc public key to needed VMs
708 n2vc_key = await self.n2vc.GetPublicKey()
709 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
710
711 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
712 desc = await RO.create("ns", descriptor=RO_ns_params,
713 name=db_nsr["name"],
714 scenario=RO_nsd_uuid)
715 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
716 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
717 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
718 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
719 self.update_db_2("nsrs", nsr_id, db_nsr_update)
720
721 # wait until NS is ready
722 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
723 detailed_status_old = None
724 self.logger.debug(logging_text + step)
725
726 while time() <= start_deploy + self.total_deploy_timeout:
727 desc = await RO.show("ns", RO_nsr_id)
728 ns_status, ns_status_info = RO.check_ns_status(desc)
729 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
730 if ns_status == "ERROR":
731 raise ROclient.ROClientException(ns_status_info)
732 elif ns_status == "BUILD":
733 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
734 elif ns_status == "ACTIVE":
735 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
736 try:
737 self.ns_update_vnfr(db_vnfrs, desc)
738 break
739 except LcmExceptionNoMgmtIP:
740 pass
741 else:
742 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
743 if detailed_status != detailed_status_old:
744 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
745 self.update_db_2("nsrs", nsr_id, db_nsr_update)
746 await asyncio.sleep(5, loop=self.loop)
747 else: # total_deploy_timeout
748 raise ROclient.ROClientException("Timeout waiting ns to be ready")
749
750 step = "Updating NSR"
751 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
752
753 db_nsr["detailed-status"] = "Configuring vnfr"
754 self.update_db_2("nsrs", nsr_id, db_nsr_update)
755
756 # The parameters we'll need to deploy a charm
757 number_to_configure = 0
758
759 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
760 config_primitive=None):
761 """An inner function to deploy the charm from either vnf or vdu
762 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
763 """
764 if not mgmt_ip_address:
765 raise LcmException("vnfd/vdu has not management ip address to configure it")
766 # Login to the VCA.
767 # if number_to_configure == 0:
768 # self.logger.debug("Logging into N2VC...")
769 # task = asyncio.ensure_future(self.n2vc.login())
770 # yield from asyncio.wait_for(task, 30.0)
771 # self.logger.debug("Logged into N2VC!")
772
773 # # await self.n2vc.login()
774
775 # Note: The charm needs to exist on disk at the location
776 # specified by charm_path.
777 base_folder = vnfd["_admin"]["storage"]
778 storage_params = self.fs.get_params()
779 charm_path = "{}{}/{}/charms/{}".format(
780 storage_params["path"],
781 base_folder["folder"],
782 base_folder["pkg-dir"],
783 proxy_charm
784 )
785
786 # Setup the runtime parameters for this VNF
787 params = {'rw_mgmt_ip': mgmt_ip_address}
788 if config_primitive:
789 params["initial-config-primitive"] = config_primitive
790
791 # ns_name will be ignored in the current version of N2VC
792 # but will be implemented for the next point release.
793 model_name = 'default' # TODO bug 581 : change to nsr_id
794 if vdu_id:
795 vdu_id_text = vdu_id
796 else:
797 vdu_id_text = "vnfd" # TODO bug 581 remove and add just an empty string ""
798 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
799 # TODO bug 581 Add "-" as a final argument
800
801 vca_index = len(vca_deployed_list)
802 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
803 # 26*26 charm in the same NS
804 # TODO bug 581 uncoment
805 # application_name = application_name[0:48]
806 # application_name += chr(97 + vca_index / 26) + chr(97 + vca_index % 26)
807 vca_deployed_ = {
808 "member-vnf-index": vnf_index,
809 "vdu_id": vdu_id,
810 "model": model_name,
811 "application": application_name,
812 "operational-status": "init",
813 "detailed-status": "",
814 "vnfd_id": vnfd_id,
815 "vdu_name": vdu_name,
816 "vdu_count_index": vdu_count_index,
817 }
818 vca_deployed_list.append(vca_deployed_)
819 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
820 self.update_db_2("nsrs", nsr_id, db_nsr_update)
821
822 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
823 proxy_charm))
824 if not n2vc_info:
825 n2vc_info["nsr_id"] = nsr_id
826 n2vc_info["nslcmop_id"] = nslcmop_id
827 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
828 n2vc_info["lcmOperationType"] = "instantiate"
829 n2vc_info["deployed"] = vca_deployed_list
830 n2vc_info["db_update"] = db_nsr_update
831 task = asyncio.ensure_future(
832 self.n2vc.DeployCharms(
833 model_name, # The network service name
834 application_name, # The application name
835 vnfd, # The vnf descriptor
836 charm_path, # Path to charm
837 params, # Runtime params, like mgmt ip
838 {}, # for native charms only
839 self.n2vc_callback, # Callback for status changes
840 n2vc_info, # Callback parameter
841 None, # Callback parameter (task)
842 )
843 )
844 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
845 n2vc_info))
846 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
847
848 step = "Looking for needed vnfd to configure"
849 self.logger.debug(logging_text + step)
850
851 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
852 vnfd_id = c_vnf["vnfd-id-ref"]
853 vnf_index = str(c_vnf["member-vnf-index"])
854 vnfd = db_vnfds_ref[vnfd_id]
855
856 # Check if this VNF has a charm configuration
857 vnf_config = vnfd.get("vnf-configuration")
858
859 if vnf_config and vnf_config.get("juju"):
860 proxy_charm = vnf_config["juju"]["charm"]
861 config_primitive = None
862
863 if proxy_charm:
864 if 'initial-config-primitive' in vnf_config:
865 config_primitive = vnf_config['initial-config-primitive']
866
867 # Login to the VCA. If there are multiple calls to login(),
868 # subsequent calls will be a nop and return immediately.
869 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
870 await self.n2vc.login()
871 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
872 config_primitive)
873 number_to_configure += 1
874
875 # Deploy charms for each VDU that supports one.
876 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
877 vdu_config = vdu.get('vdu-configuration')
878 proxy_charm = None
879 config_primitive = None
880
881 if vdu_config and vdu_config.get("juju"):
882 proxy_charm = vdu_config["juju"]["charm"]
883
884 if 'initial-config-primitive' in vdu_config:
885 config_primitive = vdu_config['initial-config-primitive']
886
887 if proxy_charm:
888 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
889 await self.n2vc.login()
890 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
891 # TODO for the moment only first vdu_id contains a charm deployed
892 if vdur["vdu-id-ref"] != vdu["id"]:
893 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
894 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
895 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
896 vdur["ip-address"], n2vc_info, config_primitive)
897 number_to_configure += 1
898
899 db_nsr_update["operational-status"] = "running"
900 configuration_failed = False
901 if number_to_configure:
902 old_status = "configuring: init: {}".format(number_to_configure)
903 db_nsr_update["config-status"] = old_status
904 db_nsr_update["detailed-status"] = old_status
905 db_nslcmop_update["detailed-status"] = old_status
906
907 # wait until all are configured.
908 while time() <= start_deploy + self.total_deploy_timeout:
909 if db_nsr_update:
910 self.update_db_2("nsrs", nsr_id, db_nsr_update)
911 if db_nslcmop_update:
912 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
913 # TODO add a fake tast that set n2vc_event after some time
914 await n2vc_info["n2vc_event"].wait()
915 n2vc_info["n2vc_event"].clear()
916 all_active = True
917 status_map = {}
918 n2vc_error_text = [] # contain text error list. If empty no one is in error status
919 now = time()
920 for vca_deployed in vca_deployed_list:
921 vca_status = vca_deployed["operational-status"]
922 if vca_status not in status_map:
923 # Initialize it
924 status_map[vca_status] = 0
925 status_map[vca_status] += 1
926
927 if vca_status == "active":
928 vca_deployed.pop("time_first_error", None)
929 vca_deployed.pop("status_first_error", None)
930 continue
931
932 all_active = False
933 if vca_status in ("error", "blocked"):
934 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
935 # if not first time in this status error
936 if not vca_deployed.get("time_first_error"):
937 vca_deployed["time_first_error"] = now
938 continue
939 if vca_deployed.get("time_first_error") and \
940 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
941 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
942 .format(vca_deployed["member-vnf-index"],
943 vca_deployed["vdu_id"], vca_status,
944 vca_deployed["detailed-status-error"]))
945
946 if all_active:
947 break
948 elif n2vc_error_text:
949 db_nsr_update["config-status"] = "failed"
950 error_text = "fail configuring " + ";".join(n2vc_error_text)
951 db_nsr_update["detailed-status"] = error_text
952 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
953 db_nslcmop_update["detailed-status"] = error_text
954 db_nslcmop_update["statusEnteredTime"] = time()
955 configuration_failed = True
956 break
957 else:
958 cs = "configuring: "
959 separator = ""
960 for status, num in status_map.items():
961 cs += separator + "{}: {}".format(status, num)
962 separator = ", "
963 if old_status != cs:
964 db_nsr_update["config-status"] = cs
965 db_nsr_update["detailed-status"] = cs
966 db_nslcmop_update["detailed-status"] = cs
967 old_status = cs
968 else: # total_deploy_timeout
969 raise LcmException("Timeout waiting ns to be configured")
970
971 if not configuration_failed:
972 # all is done
973 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
974 db_nslcmop_update["statusEnteredTime"] = time()
975 db_nslcmop_update["detailed-status"] = "done"
976 db_nsr_update["config-status"] = "configured"
977 db_nsr_update["detailed-status"] = "done"
978
979 return
980
981 except (ROclient.ROClientException, DbException, LcmException) as e:
982 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
983 exc = e
984 except asyncio.CancelledError:
985 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
986 exc = "Operation was cancelled"
987 except Exception as e:
988 exc = traceback.format_exc()
989 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
990 exc_info=True)
991 finally:
992 if exc:
993 if db_nsr:
994 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
995 db_nsr_update["operational-status"] = "failed"
996 if db_nslcmop:
997 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
998 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
999 db_nslcmop_update["statusEnteredTime"] = time()
1000 if db_nsr:
1001 db_nsr_update["_admin.nslcmop"] = None
1002 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1003 if db_nslcmop_update:
1004 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1005 if nslcmop_operation_state:
1006 try:
1007 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1008 "operationState": nslcmop_operation_state})
1009 except Exception as e:
1010 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1011
1012 self.logger.debug(logging_text + "Exit")
1013 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1014
1015 async def terminate(self, nsr_id, nslcmop_id):
1016 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1017 self.logger.debug(logging_text + "Enter")
1018 db_nsr = None
1019 db_nslcmop = None
1020 exc = None
1021 failed_detail = [] # annotates all failed error messages
1022 vca_task_list = []
1023 vca_task_dict = {}
1024 vca_application_name2index = {}
1025 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1026 db_nslcmop_update = {}
1027 nslcmop_operation_state = None
1028 try:
1029 step = "Getting nslcmop={} from db".format(nslcmop_id)
1030 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1031 step = "Getting nsr={} from db".format(nsr_id)
1032 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1033 # nsd = db_nsr["nsd"]
1034 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1035 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1036 return
1037 # TODO ALF remove
1038 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1039 # #TODO check if VIM is creating and wait
1040 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1041
1042 db_nsr_update["operational-status"] = "terminating"
1043 db_nsr_update["config-status"] = "terminating"
1044
1045 if nsr_deployed and nsr_deployed.get("VCA"):
1046 try:
1047 step = "Scheduling configuration charms removing"
1048 db_nsr_update["detailed-status"] = "Deleting charms"
1049 self.logger.debug(logging_text + step)
1050 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1051 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1052 if vca_deployed: # TODO it would be desirable having a and deploy_info.get("deployed"):
1053 task = asyncio.ensure_future(
1054 self.n2vc.RemoveCharms(
1055 vca_deployed['model'],
1056 vca_deployed["application"],
1057 # self.n2vc_callback,
1058 # db_nsr,
1059 # db_nslcmop,
1060 )
1061 )
1062 vca_application_name2index[vca_deployed["application"]] = vca_index
1063 vca_task_list.append(task)
1064 vca_task_dict[vca_deployed["application"]] = task
1065 # task.add_done_callback(functools.partial(self.n2vc_callback, vca_deployed['model'],
1066 # vca_deployed['application'], None, db_nsr,
1067 # db_nslcmop, vnf_index))
1068 self.lcm_tasks.register("ns", nsr_id, nslcmop_id,
1069 "delete_charm:" + vca_deployed["application"], task)
1070 except Exception as e:
1071 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1072
1073 # remove from RO
1074 RO_fail = False
1075 RO = ROclient.ROClient(self.loop, **self.ro_config)
1076
1077 # Delete ns
1078 RO_nsr_id = RO_delete_action = None
1079 if nsr_deployed and nsr_deployed.get("RO"):
1080 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1081 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1082 try:
1083 if RO_nsr_id:
1084 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1085 self.logger.debug(logging_text + step)
1086 desc = await RO.delete("ns", RO_nsr_id)
1087 RO_delete_action = desc["action_id"]
1088 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1089 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1090 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1091 if RO_delete_action:
1092 # wait until NS is deleted from VIM
1093 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1094 detailed_status_old = None
1095 self.logger.debug(logging_text + step)
1096
1097 delete_timeout = 20 * 60 # 20 minutes
1098 while delete_timeout > 0:
1099 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1100 extra_item_id=RO_delete_action)
1101 ns_status, ns_status_info = RO.check_action_status(desc)
1102 if ns_status == "ERROR":
1103 raise ROclient.ROClientException(ns_status_info)
1104 elif ns_status == "BUILD":
1105 detailed_status = step + "; {}".format(ns_status_info)
1106 elif ns_status == "ACTIVE":
1107 break
1108 else:
1109 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1110 await asyncio.sleep(5, loop=self.loop)
1111 delete_timeout -= 5
1112 if detailed_status != detailed_status_old:
1113 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1114 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1115 else: # delete_timeout <= 0:
1116 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1117
1118 except ROclient.ROClientException as e:
1119 if e.http_code == 404: # not found
1120 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1121 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1122 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1123 elif e.http_code == 409: # conflict
1124 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1125 self.logger.debug(logging_text + failed_detail[-1])
1126 RO_fail = True
1127 else:
1128 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1129 self.logger.error(logging_text + failed_detail[-1])
1130 RO_fail = True
1131
1132 # Delete nsd
1133 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1134 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1135 try:
1136 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1137 "Deleting nsd at RO"
1138 await RO.delete("nsd", RO_nsd_id)
1139 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1140 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1141 except ROclient.ROClientException as e:
1142 if e.http_code == 404: # not found
1143 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1144 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1145 elif e.http_code == 409: # conflict
1146 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1147 self.logger.debug(logging_text + failed_detail[-1])
1148 RO_fail = True
1149 else:
1150 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1151 self.logger.error(logging_text + failed_detail[-1])
1152 RO_fail = True
1153
1154 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1155 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1156 if not RO_vnfd_id:
1157 continue
1158 try:
1159 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1160 "Deleting vnfd={} at RO".format(vnf_id)
1161 await RO.delete("vnfd", RO_vnfd_id)
1162 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1163 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1164 except ROclient.ROClientException as e:
1165 if e.http_code == 404: # not found
1166 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1167 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1168 elif e.http_code == 409: # conflict
1169 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1170 self.logger.debug(logging_text + failed_detail[-1])
1171 else:
1172 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1173 self.logger.error(logging_text + failed_detail[-1])
1174
1175 if vca_task_list:
1176 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1177 "Waiting for deletion of configuration charms"
1178 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1179 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1180 await asyncio.wait(vca_task_list, timeout=300)
1181 for application_name, task in vca_task_dict.items():
1182 if task.cancelled():
1183 failed_detail.append("VCA[application_name={}] Deletion has been cancelled"
1184 .format(application_name))
1185 elif task.done():
1186 exc = task.exception()
1187 if exc:
1188 failed_detail.append("VCA[application_name={}] Deletion exception: {}"
1189 .format(application_name, exc))
1190 else:
1191 vca_index = vca_application_name2index[application_name]
1192 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None
1193 else: # timeout
1194 # TODO Should it be cancelled?!!
1195 task.cancel()
1196 failed_detail.append("VCA[application_name={}] Deletion timeout".format(application_name))
1197
1198 if failed_detail:
1199 self.logger.error(logging_text + " ;".join(failed_detail))
1200 db_nsr_update["operational-status"] = "failed"
1201 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1202 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1203 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1204 db_nslcmop_update["statusEnteredTime"] = time()
1205 elif db_nslcmop["operationParams"].get("autoremove"):
1206 self.db.del_one("nsrs", {"_id": nsr_id})
1207 db_nsr_update.clear()
1208 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1209 nslcmop_operation_state = "COMPLETED"
1210 db_nslcmop_update.clear()
1211 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1212 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1213 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1214 self.logger.debug(logging_text + "Delete from database")
1215 else:
1216 db_nsr_update["operational-status"] = "terminated"
1217 db_nsr_update["detailed-status"] = "Done"
1218 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1219 db_nslcmop_update["detailed-status"] = "Done"
1220 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1221 db_nslcmop_update["statusEnteredTime"] = time()
1222
1223 except (ROclient.ROClientException, DbException) as e:
1224 self.logger.error(logging_text + "Exit Exception {}".format(e))
1225 exc = e
1226 except asyncio.CancelledError:
1227 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1228 exc = "Operation was cancelled"
1229 except Exception as e:
1230 exc = traceback.format_exc()
1231 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1232 finally:
1233 if exc and db_nslcmop:
1234 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1235 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1236 db_nslcmop_update["statusEnteredTime"] = time()
1237 if db_nslcmop_update:
1238 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1239 if db_nsr:
1240 db_nsr_update["_admin.nslcmop"] = None
1241 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1242 if nslcmop_operation_state:
1243 try:
1244 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1245 "operationState": nslcmop_operation_state})
1246 except Exception as e:
1247 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1248 self.logger.debug(logging_text + "Exit")
1249 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1250
1251 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1252 primitive, primitive_params):
1253
1254 for vca_deployed in db_deployed["VCA"]:
1255 if not vca_deployed:
1256 continue
1257 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1258 continue
1259 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1260 continue
1261 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1262 continue
1263 break
1264 else:
1265 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1266 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1267 model_name = vca_deployed.get("model")
1268 application_name = vca_deployed.get("application")
1269 if not model_name or not application_name:
1270 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1271 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1272 if vca_deployed["operational-status"] != "active":
1273 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1274 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1275 callback = None # self.n2vc_callback
1276 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1277 await self.n2vc.login()
1278 task = asyncio.ensure_future(
1279 self.n2vc.ExecutePrimitive(
1280 model_name,
1281 application_name,
1282 primitive, callback,
1283 *callback_args,
1284 **primitive_params
1285 )
1286 )
1287 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1288 # db_nsr, db_nslcmop, member_vnf_index))
1289 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1290 # wait until completed with timeout
1291 await asyncio.wait((task,), timeout=600)
1292
1293 result = "FAILED" # by default
1294 result_detail = ""
1295 if task.cancelled():
1296 result_detail = "Task has been cancelled"
1297 elif task.done():
1298 exc = task.exception()
1299 if exc:
1300 result_detail = str(exc)
1301 else:
1302 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1303 result = "COMPLETED"
1304 result_detail = "Done"
1305 else: # timeout
1306 # TODO Should it be cancelled?!!
1307 task.cancel()
1308 result_detail = "timeout"
1309 return result, result_detail
1310
1311 async def action(self, nsr_id, nslcmop_id):
1312 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1313 self.logger.debug(logging_text + "Enter")
1314 # get all needed from database
1315 db_nsr = None
1316 db_nslcmop = None
1317 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1318 db_nslcmop_update = {}
1319 nslcmop_operation_state = None
1320 exc = None
1321 try:
1322 step = "Getting information from database"
1323 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1324 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1325 nsr_deployed = db_nsr["_admin"].get("deployed")
1326 nsr_name = db_nsr["name"]
1327 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1328 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1329 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1330 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1331
1332 # look if previous tasks in process
1333 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1334 if task_dependency:
1335 step = db_nslcmop_update["detailed-status"] = \
1336 "Waiting for related tasks to be completed: {}".format(task_name)
1337 self.logger.debug(logging_text + step)
1338 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1339 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1340 if pending:
1341 raise LcmException("Timeout waiting related tasks to be completed")
1342
1343 # TODO check if ns is in a proper status
1344 primitive = db_nslcmop["operationParams"]["primitive"]
1345 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1346 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1347 vdu_name, vdu_count_index, primitive,
1348 primitive_params)
1349 db_nslcmop_update["detailed-status"] = result_detail
1350 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1351 db_nslcmop_update["statusEnteredTime"] = time()
1352 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1353 return # database update is called inside finally
1354
1355 except (DbException, LcmException) as e:
1356 self.logger.error(logging_text + "Exit Exception {}".format(e))
1357 exc = e
1358 except asyncio.CancelledError:
1359 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1360 exc = "Operation was cancelled"
1361 except Exception as e:
1362 exc = traceback.format_exc()
1363 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1364 finally:
1365 if exc and db_nslcmop:
1366 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1367 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1368 db_nslcmop_update["statusEnteredTime"] = time()
1369 if db_nslcmop_update:
1370 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1371 if db_nsr:
1372 db_nsr_update["_admin.nslcmop"] = None
1373 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1374 self.logger.debug(logging_text + "Exit")
1375 if nslcmop_operation_state:
1376 try:
1377 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1378 "operationState": nslcmop_operation_state})
1379 except Exception as e:
1380 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1381 self.logger.debug(logging_text + "Exit")
1382 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1383
1384 async def scale(self, nsr_id, nslcmop_id):
1385 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1386 self.logger.debug(logging_text + "Enter")
1387 # get all needed from database
1388 db_nsr = None
1389 db_nslcmop = None
1390 db_nslcmop_update = {}
1391 nslcmop_operation_state = None
1392 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1393 exc = None
1394 # in case of error, indicates what part of scale was failed to put nsr at error status
1395 scale_process = None
1396 old_operational_status = ""
1397 old_config_status = ""
1398 vnfr_scaled = False
1399 try:
1400 step = "Getting nslcmop from database"
1401 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1402 step = "Getting nsr from database"
1403 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1404 nsr_name = db_nsr["name"]
1405 old_operational_status = db_nsr["operational-status"]
1406 old_config_status = db_nsr["config-status"]
1407
1408 # look if previous tasks in process
1409 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1410 if task_dependency:
1411 step = db_nslcmop_update["detailed-status"] = \
1412 "Waiting for related tasks to be completed: {}".format(task_name)
1413 self.logger.debug(logging_text + step)
1414 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1415 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1416 if pending:
1417 raise LcmException("Timeout waiting related tasks to be completed")
1418
1419 step = "Parsing scaling parameters"
1420 db_nsr_update["operational-status"] = "scaling"
1421 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1422 nsr_deployed = db_nsr["_admin"].get("deployed")
1423 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1424 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1425 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1426 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1427 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1428
1429 step = "Getting vnfr from database"
1430 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1431 step = "Getting vnfd from database"
1432 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1433 step = "Getting scaling-group-descriptor"
1434 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1435 if scaling_descriptor["name"] == scaling_group:
1436 break
1437 else:
1438 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1439 "at vnfd:scaling-group-descriptor".format(scaling_group))
1440 # cooldown_time = 0
1441 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1442 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1443 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1444 # break
1445
1446 # TODO check if ns is in a proper status
1447 step = "Sending scale order to RO"
1448 nb_scale_op = 0
1449 if not db_nsr["_admin"].get("scaling-group"):
1450 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1451 admin_scale_index = 0
1452 else:
1453 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1454 if admin_scale_info["name"] == scaling_group:
1455 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1456 break
1457 else: # not found, set index one plus last element and add new entry with the name
1458 admin_scale_index += 1
1459 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1460 RO_scaling_info = []
1461 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1462 if scaling_type == "SCALE_OUT":
1463 # count if max-instance-count is reached
1464 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1465 max_instance_count = int(scaling_descriptor["max-instance-count"])
1466 if nb_scale_op >= max_instance_count:
1467 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1468 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1469 nb_scale_op = nb_scale_op + 1
1470 vdu_scaling_info["scaling_direction"] = "OUT"
1471 vdu_scaling_info["vdu-create"] = {}
1472 for vdu_scale_info in scaling_descriptor["vdu"]:
1473 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1474 "type": "create", "count": vdu_scale_info.get("count", 1)})
1475 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1476 elif scaling_type == "SCALE_IN":
1477 # count if min-instance-count is reached
1478 min_instance_count = 0
1479 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1480 min_instance_count = int(scaling_descriptor["min-instance-count"])
1481 if nb_scale_op <= min_instance_count:
1482 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1483 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1484 nb_scale_op = nb_scale_op - 1
1485 vdu_scaling_info["scaling_direction"] = "IN"
1486 vdu_scaling_info["vdu-delete"] = {}
1487 for vdu_scale_info in scaling_descriptor["vdu"]:
1488 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1489 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1490 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1491
1492 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1493 vdu_create = vdu_scaling_info.get("vdu-create")
1494 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1495 if vdu_scaling_info["scaling_direction"] == "IN":
1496 for vdur in reversed(db_vnfr["vdur"]):
1497 if vdu_delete.get(vdur["vdu-id-ref"]):
1498 vdu_delete[vdur["vdu-id-ref"]] -= 1
1499 vdu_scaling_info["vdu"].append({
1500 "name": vdur["name"],
1501 "vdu_id": vdur["vdu-id-ref"],
1502 "interface": []
1503 })
1504 for interface in vdur["interfaces"]:
1505 vdu_scaling_info["vdu"][-1]["interface"].append({
1506 "name": interface["name"],
1507 "ip_address": interface["ip-address"],
1508 "mac_address": interface.get("mac-address"),
1509 })
1510 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1511
1512 # execute primitive service PRE-SCALING
1513 step = "Executing pre-scale vnf-config-primitive"
1514 if scaling_descriptor.get("scaling-config-action"):
1515 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1516 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1517 and scaling_type == "SCALE_IN":
1518 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1519 step = db_nslcmop_update["detailed-status"] = \
1520 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1521 # look for primitive
1522 primitive_params = {}
1523 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1524 if config_primitive["name"] == vnf_config_primitive:
1525 for parameter in config_primitive.get("parameter", ()):
1526 if 'default-value' in parameter and \
1527 parameter['default-value'] == "<VDU_SCALE_INFO>":
1528 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1529 default_flow_style=True,
1530 width=256)
1531 break
1532 else:
1533 raise LcmException(
1534 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1535 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1536 "primitive".format(scaling_group, config_primitive))
1537 scale_process = "VCA"
1538 db_nsr_update["config-status"] = "configuring pre-scaling"
1539 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1540 None, None, None, vnf_config_primitive,
1541 primitive_params)
1542 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1543 vnf_config_primitive, result, result_detail))
1544 if result == "FAILED":
1545 raise LcmException(result_detail)
1546 db_nsr_update["config-status"] = old_config_status
1547 scale_process = None
1548
1549 if RO_scaling_info:
1550 scale_process = "RO"
1551 RO = ROclient.ROClient(self.loop, **self.ro_config)
1552 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1553 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1554 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1555 # wait until ready
1556 RO_nslcmop_id = RO_desc["instance_action_id"]
1557 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1558
1559 RO_task_done = False
1560 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1561 detailed_status_old = None
1562 self.logger.debug(logging_text + step)
1563
1564 deployment_timeout = 1 * 3600 # One hour
1565 while deployment_timeout > 0:
1566 if not RO_task_done:
1567 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1568 extra_item_id=RO_nslcmop_id)
1569 ns_status, ns_status_info = RO.check_action_status(desc)
1570 if ns_status == "ERROR":
1571 raise ROclient.ROClientException(ns_status_info)
1572 elif ns_status == "BUILD":
1573 detailed_status = step + "; {}".format(ns_status_info)
1574 elif ns_status == "ACTIVE":
1575 RO_task_done = True
1576 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1577 self.logger.debug(logging_text + step)
1578 else:
1579 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1580 else:
1581 desc = await RO.show("ns", RO_nsr_id)
1582 ns_status, ns_status_info = RO.check_ns_status(desc)
1583 if ns_status == "ERROR":
1584 raise ROclient.ROClientException(ns_status_info)
1585 elif ns_status == "BUILD":
1586 detailed_status = step + "; {}".format(ns_status_info)
1587 elif ns_status == "ACTIVE":
1588 step = detailed_status = \
1589 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1590 if not vnfr_scaled:
1591 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1592 vnfr_scaled = True
1593 try:
1594 desc = await RO.show("ns", RO_nsr_id)
1595 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1596 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1597 break
1598 except LcmExceptionNoMgmtIP:
1599 pass
1600 else:
1601 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1602 if detailed_status != detailed_status_old:
1603 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1604 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1605
1606 await asyncio.sleep(5, loop=self.loop)
1607 deployment_timeout -= 5
1608 if deployment_timeout <= 0:
1609 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1610
1611 # update VDU_SCALING_INFO with the obtained ip_addresses
1612 if vdu_scaling_info["scaling_direction"] == "OUT":
1613 for vdur in reversed(db_vnfr["vdur"]):
1614 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1615 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1616 vdu_scaling_info["vdu"].append({
1617 "name": vdur["name"],
1618 "vdu_id": vdur["vdu-id-ref"],
1619 "interface": []
1620 })
1621 for interface in vdur["interfaces"]:
1622 vdu_scaling_info["vdu"][-1]["interface"].append({
1623 "name": interface["name"],
1624 "ip_address": interface["ip-address"],
1625 "mac_address": interface.get("mac-address"),
1626 })
1627 del vdu_scaling_info["vdu-create"]
1628
1629 scale_process = None
1630 if db_nsr_update:
1631 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1632
1633 # execute primitive service POST-SCALING
1634 step = "Executing post-scale vnf-config-primitive"
1635 if scaling_descriptor.get("scaling-config-action"):
1636 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1637 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1638 and scaling_type == "SCALE_OUT":
1639 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1640 step = db_nslcmop_update["detailed-status"] = \
1641 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1642 # look for primitive
1643 primitive_params = {}
1644 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1645 if config_primitive["name"] == vnf_config_primitive:
1646 for parameter in config_primitive.get("parameter", ()):
1647 if 'default-value' in parameter and \
1648 parameter['default-value'] == "<VDU_SCALE_INFO>":
1649 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1650 default_flow_style=True,
1651 width=256)
1652 break
1653 else:
1654 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1655 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1656 "match any vnf-configuration:config-primitive".format(scaling_group,
1657 config_primitive))
1658 scale_process = "VCA"
1659 db_nsr_update["config-status"] = "configuring post-scaling"
1660
1661 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1662 None, None, None, vnf_config_primitive,
1663 primitive_params)
1664 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1665 vnf_config_primitive, result, result_detail))
1666 if result == "FAILED":
1667 raise LcmException(result_detail)
1668 db_nsr_update["config-status"] = old_config_status
1669 scale_process = None
1670
1671 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1672 db_nslcmop_update["statusEnteredTime"] = time()
1673 db_nslcmop_update["detailed-status"] = "done"
1674 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1675 db_nsr_update["operational-status"] = old_operational_status
1676 db_nsr_update["config-status"] = old_config_status
1677 return
1678 except (ROclient.ROClientException, DbException, LcmException) as e:
1679 self.logger.error(logging_text + "Exit Exception {}".format(e))
1680 exc = e
1681 except asyncio.CancelledError:
1682 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1683 exc = "Operation was cancelled"
1684 except Exception as e:
1685 exc = traceback.format_exc()
1686 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1687 finally:
1688 if exc:
1689 if db_nslcmop:
1690 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1691 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1692 db_nslcmop_update["statusEnteredTime"] = time()
1693 if db_nsr:
1694 db_nsr_update["operational-status"] = old_operational_status
1695 db_nsr_update["config-status"] = old_config_status
1696 db_nsr_update["detailed-status"] = ""
1697 db_nsr_update["_admin.nslcmop"] = None
1698 if scale_process:
1699 if "VCA" in scale_process:
1700 db_nsr_update["config-status"] = "failed"
1701 if "RO" in scale_process:
1702 db_nsr_update["operational-status"] = "failed"
1703 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1704 exc)
1705 if db_nslcmop_update:
1706 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1707 if db_nsr:
1708 db_nsr_update["_admin.nslcmop"] = None
1709 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1710 if nslcmop_operation_state:
1711 try:
1712 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1713 "operationState": nslcmop_operation_state})
1714 # if cooldown_time:
1715 # await asyncio.sleep(cooldown_time)
1716 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1717 except Exception as e:
1718 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1719 self.logger.debug(logging_text + "Exit")
1720 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")