Fix bug 954: Dump ns_config_info into a string
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107
108 # create N2VC connector
109 self.n2vc = N2VCJujuConnector(
110 db=self.db,
111 fs=self.fs,
112 log=self.logger,
113 loop=self.loop,
114 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
115 username=self.vca_config.get('user', None),
116 vca_config=self.vca_config,
117 on_update_db=self._on_update_n2vc_db,
118 # ca_cert=self.vca_config.get('cacert'),
119 # api_proxy=self.vca_config.get('apiproxy'),
120 )
121
122 self.k8sclusterhelm = K8sHelmConnector(
123 kubectl_command=self.vca_config.get("kubectlpath"),
124 helm_command=self.vca_config.get("helmpath"),
125 fs=self.fs,
126 log=self.logger,
127 db=self.db,
128 on_update_db=None,
129 )
130
131 self.k8sclusterjuju = K8sJujuConnector(
132 kubectl_command=self.vca_config.get("kubectlpath"),
133 juju_command=self.vca_config.get("jujupath"),
134 fs=self.fs,
135 log=self.logger,
136 db=self.db,
137 on_update_db=None,
138 )
139
140 # create RO client
141 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
142
143 def _on_update_n2vc_db(self, table, filter, path, updated_data):
144
145 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
146 .format(table, filter, path, updated_data))
147
148 return
149 # write NS status to database
150 # try:
151 # # nsrs_id = filter.get('_id')
152 # # print(nsrs_id)
153 # # get ns record
154 # nsr = self.db.get_one(table=table, q_filter=filter)
155 # # get VCA deployed list
156 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
157 # # get RO deployed
158 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
159 # for vca in vca_list:
160 # # status = vca.get('status')
161 # # print(status)
162 # # detailed_status = vca.get('detailed-status')
163 # # print(detailed_status)
164 # # for ro in ro_list:
165 # # print(ro)
166 #
167 # except Exception as e:
168 # self.logger.error('Error writing NS status to db: {}'.format(e))
169
170 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
171 """
172 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
173 :param vnfd: input vnfd
174 :param new_id: overrides vnf id if provided
175 :param additionalParams: Instantiation params for VNFs provided
176 :param nsrId: Id of the NSR
177 :return: copy of vnfd
178 """
179 try:
180 vnfd_RO = deepcopy(vnfd)
181 # remove unused by RO configuration, monitoring, scaling and internal keys
182 vnfd_RO.pop("_id", None)
183 vnfd_RO.pop("_admin", None)
184 vnfd_RO.pop("vnf-configuration", None)
185 vnfd_RO.pop("monitoring-param", None)
186 vnfd_RO.pop("scaling-group-descriptor", None)
187 vnfd_RO.pop("kdu", None)
188 vnfd_RO.pop("k8s-cluster", None)
189 if new_id:
190 vnfd_RO["id"] = new_id
191
192 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
193 for vdu in get_iterable(vnfd_RO, "vdu"):
194 cloud_init_file = None
195 if vdu.get("cloud-init-file"):
196 base_folder = vnfd["_admin"]["storage"]
197 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
198 vdu["cloud-init-file"])
199 with self.fs.file_open(cloud_init_file, "r") as ci_file:
200 cloud_init_content = ci_file.read()
201 vdu.pop("cloud-init-file", None)
202 elif vdu.get("cloud-init"):
203 cloud_init_content = vdu["cloud-init"]
204 else:
205 continue
206
207 env = Environment()
208 ast = env.parse(cloud_init_content)
209 mandatory_vars = meta.find_undeclared_variables(ast)
210 if mandatory_vars:
211 for var in mandatory_vars:
212 if not additionalParams or var not in additionalParams.keys():
213 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
214 "file, must be provided in the instantiation parameters inside the "
215 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
216 template = Template(cloud_init_content)
217 cloud_init_content = template.render(additionalParams or {})
218 vdu["cloud-init"] = cloud_init_content
219
220 return vnfd_RO
221 except FsException as e:
222 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
223 format(vnfd["id"], vdu["id"], cloud_init_file, e))
224 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
225 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
226 format(vnfd["id"], vdu["id"], e))
227
228 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
229 """
230 Creates a RO ns descriptor from OSM ns_instantiate params
231 :param ns_params: OSM instantiate params
232 :return: The RO ns descriptor
233 """
234 vim_2_RO = {}
235 wim_2_RO = {}
236 # TODO feature 1417: Check that no instantiation is set over PDU
237 # check if PDU forces a concrete vim-network-id and add it
238 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
239
240 def vim_account_2_RO(vim_account):
241 if vim_account in vim_2_RO:
242 return vim_2_RO[vim_account]
243
244 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
245 if db_vim["_admin"]["operationalState"] != "ENABLED":
246 raise LcmException("VIM={} is not available. operationalState={}".format(
247 vim_account, db_vim["_admin"]["operationalState"]))
248 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
249 vim_2_RO[vim_account] = RO_vim_id
250 return RO_vim_id
251
252 def wim_account_2_RO(wim_account):
253 if isinstance(wim_account, str):
254 if wim_account in wim_2_RO:
255 return wim_2_RO[wim_account]
256
257 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
258 if db_wim["_admin"]["operationalState"] != "ENABLED":
259 raise LcmException("WIM={} is not available. operationalState={}".format(
260 wim_account, db_wim["_admin"]["operationalState"]))
261 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
262 wim_2_RO[wim_account] = RO_wim_id
263 return RO_wim_id
264 else:
265 return wim_account
266
267 def ip_profile_2_RO(ip_profile):
268 RO_ip_profile = deepcopy((ip_profile))
269 if "dns-server" in RO_ip_profile:
270 if isinstance(RO_ip_profile["dns-server"], list):
271 RO_ip_profile["dns-address"] = []
272 for ds in RO_ip_profile.pop("dns-server"):
273 RO_ip_profile["dns-address"].append(ds['address'])
274 else:
275 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
276 if RO_ip_profile.get("ip-version") == "ipv4":
277 RO_ip_profile["ip-version"] = "IPv4"
278 if RO_ip_profile.get("ip-version") == "ipv6":
279 RO_ip_profile["ip-version"] = "IPv6"
280 if "dhcp-params" in RO_ip_profile:
281 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
282 return RO_ip_profile
283
284 if not ns_params:
285 return None
286 RO_ns_params = {
287 # "name": ns_params["nsName"],
288 # "description": ns_params.get("nsDescription"),
289 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
290 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
291 # "scenario": ns_params["nsdId"],
292 }
293
294 n2vc_key_list = n2vc_key_list or []
295 for vnfd_ref, vnfd in vnfd_dict.items():
296 vdu_needed_access = []
297 mgmt_cp = None
298 if vnfd.get("vnf-configuration"):
299 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
300 if ssh_required and vnfd.get("mgmt-interface"):
301 if vnfd["mgmt-interface"].get("vdu-id"):
302 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
303 elif vnfd["mgmt-interface"].get("cp"):
304 mgmt_cp = vnfd["mgmt-interface"]["cp"]
305
306 for vdu in vnfd.get("vdu", ()):
307 if vdu.get("vdu-configuration"):
308 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
309 if ssh_required:
310 vdu_needed_access.append(vdu["id"])
311 elif mgmt_cp:
312 for vdu_interface in vdu.get("interface"):
313 if vdu_interface.get("external-connection-point-ref") and \
314 vdu_interface["external-connection-point-ref"] == mgmt_cp:
315 vdu_needed_access.append(vdu["id"])
316 mgmt_cp = None
317 break
318
319 if vdu_needed_access:
320 for vnf_member in nsd.get("constituent-vnfd"):
321 if vnf_member["vnfd-id-ref"] != vnfd_ref:
322 continue
323 for vdu in vdu_needed_access:
324 populate_dict(RO_ns_params,
325 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
326 n2vc_key_list)
327
328 if ns_params.get("vduImage"):
329 RO_ns_params["vduImage"] = ns_params["vduImage"]
330
331 if ns_params.get("ssh_keys"):
332 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
333 for vnf_params in get_iterable(ns_params, "vnf"):
334 for constituent_vnfd in nsd["constituent-vnfd"]:
335 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
336 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
337 break
338 else:
339 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
340 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
341 if vnf_params.get("vimAccountId"):
342 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
343 vim_account_2_RO(vnf_params["vimAccountId"]))
344
345 for vdu_params in get_iterable(vnf_params, "vdu"):
346 # TODO feature 1417: check that this VDU exist and it is not a PDU
347 if vdu_params.get("volume"):
348 for volume_params in vdu_params["volume"]:
349 if volume_params.get("vim-volume-id"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
352 volume_params["vim-volume-id"])
353 if vdu_params.get("interface"):
354 for interface_params in vdu_params["interface"]:
355 if interface_params.get("ip-address"):
356 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
357 vdu_params["id"], "interfaces", interface_params["name"],
358 "ip_address"),
359 interface_params["ip-address"])
360 if interface_params.get("mac-address"):
361 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
362 vdu_params["id"], "interfaces", interface_params["name"],
363 "mac_address"),
364 interface_params["mac-address"])
365 if interface_params.get("floating-ip-required"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
367 vdu_params["id"], "interfaces", interface_params["name"],
368 "floating-ip"),
369 interface_params["floating-ip-required"])
370
371 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
372 if internal_vld_params.get("vim-network-name"):
373 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
374 internal_vld_params["name"], "vim-network-name"),
375 internal_vld_params["vim-network-name"])
376 if internal_vld_params.get("vim-network-id"):
377 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
378 internal_vld_params["name"], "vim-network-id"),
379 internal_vld_params["vim-network-id"])
380 if internal_vld_params.get("ip-profile"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
382 internal_vld_params["name"], "ip-profile"),
383 ip_profile_2_RO(internal_vld_params["ip-profile"]))
384 if internal_vld_params.get("provider-network"):
385
386 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
387 internal_vld_params["name"], "provider-network"),
388 internal_vld_params["provider-network"].copy())
389
390 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
391 # look for interface
392 iface_found = False
393 for vdu_descriptor in vnf_descriptor["vdu"]:
394 for vdu_interface in vdu_descriptor["interface"]:
395 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
396 if icp_params.get("ip-address"):
397 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
398 vdu_descriptor["id"], "interfaces",
399 vdu_interface["name"], "ip_address"),
400 icp_params["ip-address"])
401
402 if icp_params.get("mac-address"):
403 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
404 vdu_descriptor["id"], "interfaces",
405 vdu_interface["name"], "mac_address"),
406 icp_params["mac-address"])
407 iface_found = True
408 break
409 if iface_found:
410 break
411 else:
412 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
413 "internal-vld:id-ref={} is not present at vnfd:internal-"
414 "connection-point".format(vnf_params["member-vnf-index"],
415 icp_params["id-ref"]))
416
417 for vld_params in get_iterable(ns_params, "vld"):
418 if "ip-profile" in vld_params:
419 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
420 ip_profile_2_RO(vld_params["ip-profile"]))
421
422 if vld_params.get("provider-network"):
423
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
425 vld_params["provider-network"].copy())
426
427 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
429 wim_account_2_RO(vld_params["wimAccountId"])),
430 if vld_params.get("vim-network-name"):
431 RO_vld_sites = []
432 if isinstance(vld_params["vim-network-name"], dict):
433 for vim_account, vim_net in vld_params["vim-network-name"].items():
434 RO_vld_sites.append({
435 "netmap-use": vim_net,
436 "datacenter": vim_account_2_RO(vim_account)
437 })
438 else: # isinstance str
439 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
440 if RO_vld_sites:
441 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
442
443 if vld_params.get("vim-network-id"):
444 RO_vld_sites = []
445 if isinstance(vld_params["vim-network-id"], dict):
446 for vim_account, vim_net in vld_params["vim-network-id"].items():
447 RO_vld_sites.append({
448 "netmap-use": vim_net,
449 "datacenter": vim_account_2_RO(vim_account)
450 })
451 else: # isinstance str
452 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
453 if RO_vld_sites:
454 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
455 if vld_params.get("ns-net"):
456 if isinstance(vld_params["ns-net"], dict):
457 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
458 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
459 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
460 if "vnfd-connection-point-ref" in vld_params:
461 for cp_params in vld_params["vnfd-connection-point-ref"]:
462 # look for interface
463 for constituent_vnfd in nsd["constituent-vnfd"]:
464 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
465 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
466 break
467 else:
468 raise LcmException(
469 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
470 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
471 match_cp = False
472 for vdu_descriptor in vnf_descriptor["vdu"]:
473 for interface_descriptor in vdu_descriptor["interface"]:
474 if interface_descriptor.get("external-connection-point-ref") == \
475 cp_params["vnfd-connection-point-ref"]:
476 match_cp = True
477 break
478 if match_cp:
479 break
480 else:
481 raise LcmException(
482 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
483 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
484 cp_params["member-vnf-index-ref"],
485 cp_params["vnfd-connection-point-ref"],
486 vnf_descriptor["id"]))
487 if cp_params.get("ip-address"):
488 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
489 vdu_descriptor["id"], "interfaces",
490 interface_descriptor["name"], "ip_address"),
491 cp_params["ip-address"])
492 if cp_params.get("mac-address"):
493 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
494 vdu_descriptor["id"], "interfaces",
495 interface_descriptor["name"], "mac_address"),
496 cp_params["mac-address"])
497 return RO_ns_params
498
499 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
500 # make a copy to do not change
501 vdu_create = copy(vdu_create)
502 vdu_delete = copy(vdu_delete)
503
504 vdurs = db_vnfr.get("vdur")
505 if vdurs is None:
506 vdurs = []
507 vdu_index = len(vdurs)
508 while vdu_index:
509 vdu_index -= 1
510 vdur = vdurs[vdu_index]
511 if vdur.get("pdu-type"):
512 continue
513 vdu_id_ref = vdur["vdu-id-ref"]
514 if vdu_create and vdu_create.get(vdu_id_ref):
515 for index in range(0, vdu_create[vdu_id_ref]):
516 vdur = deepcopy(vdur)
517 vdur["_id"] = str(uuid4())
518 vdur["count-index"] += 1
519 vdurs.insert(vdu_index+1+index, vdur)
520 del vdu_create[vdu_id_ref]
521 if vdu_delete and vdu_delete.get(vdu_id_ref):
522 del vdurs[vdu_index]
523 vdu_delete[vdu_id_ref] -= 1
524 if not vdu_delete[vdu_id_ref]:
525 del vdu_delete[vdu_id_ref]
526 # check all operations are done
527 if vdu_create or vdu_delete:
528 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_create))
530 if vdu_delete:
531 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
532 vdu_delete))
533
534 vnfr_update = {"vdur": vdurs}
535 db_vnfr["vdur"] = vdurs
536 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
537
538 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
539 """
540 Updates database nsr with the RO info for the created vld
541 :param ns_update_nsr: dictionary to be filled with the updated info
542 :param db_nsr: content of db_nsr. This is also modified
543 :param nsr_desc_RO: nsr descriptor from RO
544 :return: Nothing, LcmException is raised on errors
545 """
546
547 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
548 for net_RO in get_iterable(nsr_desc_RO, "nets"):
549 if vld["id"] != net_RO.get("ns_net_osm_id"):
550 continue
551 vld["vim-id"] = net_RO.get("vim_net_id")
552 vld["name"] = net_RO.get("vim_name")
553 vld["status"] = net_RO.get("status")
554 vld["status-detailed"] = net_RO.get("error_msg")
555 ns_update_nsr["vld.{}".format(vld_index)] = vld
556 break
557 else:
558 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
559
560 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
561 """
562 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
563 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
564 :param nsr_desc_RO: nsr descriptor from RO
565 :return: Nothing, LcmException is raised on errors
566 """
567 for vnf_index, db_vnfr in db_vnfrs.items():
568 for vnf_RO in nsr_desc_RO["vnfs"]:
569 if vnf_RO["member_vnf_index"] != vnf_index:
570 continue
571 vnfr_update = {}
572 if vnf_RO.get("ip_address"):
573 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
574 elif not db_vnfr.get("ip-address"):
575 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
576
577 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
578 vdur_RO_count_index = 0
579 if vdur.get("pdu-type"):
580 continue
581 for vdur_RO in get_iterable(vnf_RO, "vms"):
582 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
583 continue
584 if vdur["count-index"] != vdur_RO_count_index:
585 vdur_RO_count_index += 1
586 continue
587 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
588 if vdur_RO.get("ip_address"):
589 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
590 else:
591 vdur["ip-address"] = None
592 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
593 vdur["name"] = vdur_RO.get("vim_name")
594 vdur["status"] = vdur_RO.get("status")
595 vdur["status-detailed"] = vdur_RO.get("error_msg")
596 for ifacer in get_iterable(vdur, "interfaces"):
597 for interface_RO in get_iterable(vdur_RO, "interfaces"):
598 if ifacer["name"] == interface_RO.get("internal_name"):
599 ifacer["ip-address"] = interface_RO.get("ip_address")
600 ifacer["mac-address"] = interface_RO.get("mac_address")
601 break
602 else:
603 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
604 "from VIM info"
605 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
606 vnfr_update["vdur.{}".format(vdu_index)] = vdur
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
610 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
611
612 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
613 for net_RO in get_iterable(nsr_desc_RO, "nets"):
614 if vld["id"] != net_RO.get("vnf_net_osm_id"):
615 continue
616 vld["vim-id"] = net_RO.get("vim_net_id")
617 vld["name"] = net_RO.get("vim_name")
618 vld["status"] = net_RO.get("status")
619 vld["status-detailed"] = net_RO.get("error_msg")
620 vnfr_update["vld.{}".format(vld_index)] = vld
621 break
622 else:
623 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
624 vnf_index, vld["id"]))
625
626 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
627 break
628
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
631
632 def _get_ns_config_info(self, nsr_id):
633 """
634 Generates a mapping between vnf,vdu elements and the N2VC id
635 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
636 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
637 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
638 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
639 """
640 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
641 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
642 mapping = {}
643 ns_config_info = {"osm-config-mapping": mapping}
644 for vca in vca_deployed_list:
645 if not vca["member-vnf-index"]:
646 continue
647 if not vca["vdu_id"]:
648 mapping[vca["member-vnf-index"]] = vca["application"]
649 else:
650 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
651 vca["application"]
652 return ns_config_info
653
654 @staticmethod
655 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
656 """
657 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
658 primitives as verify-ssh-credentials, or config when needed
659 :param desc_primitive_list: information of the descriptor
660 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
661 this element contains a ssh public key
662 :return: The modified list. Can ba an empty list, but always a list
663 """
664 if desc_primitive_list:
665 primitive_list = desc_primitive_list.copy()
666 else:
667 primitive_list = []
668 # look for primitive config, and get the position. None if not present
669 config_position = None
670 for index, primitive in enumerate(primitive_list):
671 if primitive["name"] == "config":
672 config_position = index
673 break
674
675 # for NS, add always a config primitive if not present (bug 874)
676 if not vca_deployed["member-vnf-index"] and config_position is None:
677 primitive_list.insert(0, {"name": "config", "parameter": []})
678 config_position = 0
679 # for VNF/VDU add verify-ssh-credentials after config
680 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
681 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
682 return primitive_list
683
684 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
685 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
686
687 db_nsr_update = {}
688 RO_descriptor_number = 0 # number of descriptors created at RO
689 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
690 start_deploy = time()
691 vdu_flag = False # If any of the VNFDs has VDUs
692 ns_params = db_nslcmop.get("operationParams")
693
694 # deploy RO
695
696 # get vnfds, instantiate at RO
697
698 for c_vnf in nsd.get("constituent-vnfd", ()):
699 member_vnf_index = c_vnf["member-vnf-index"]
700 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
701 if vnfd.get("vdu"):
702 vdu_flag = True
703 vnfd_ref = vnfd["id"]
704 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
705 " RO".format(vnfd_ref, member_vnf_index)
706 # self.logger.debug(logging_text + step)
707 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
708 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
709 RO_descriptor_number += 1
710
711 # look position at deployed.RO.vnfd if not present it will be appended at the end
712 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
713 if vnf_deployed["member-vnf-index"] == member_vnf_index:
714 break
715 else:
716 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
717 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
718
719 # look if present
720 RO_update = {"member-vnf-index": member_vnf_index}
721 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
722 if vnfd_list:
723 RO_update["id"] = vnfd_list[0]["uuid"]
724 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
725 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
726 else:
727 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
728 get("additionalParamsForVnf"), nsr_id)
729 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
730 RO_update["id"] = desc["uuid"]
731 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
732 vnfd_ref, member_vnf_index, desc["uuid"]))
733 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
734 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
735 self.update_db_2("nsrs", nsr_id, db_nsr_update)
736 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
737
738 # create nsd at RO
739 nsd_ref = nsd["id"]
740 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
741 # self.logger.debug(logging_text + step)
742
743 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
744 RO_descriptor_number += 1
745 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
746 if nsd_list:
747 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
748 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
749 nsd_ref, RO_nsd_uuid))
750 else:
751 nsd_RO = deepcopy(nsd)
752 nsd_RO["id"] = RO_osm_nsd_id
753 nsd_RO.pop("_id", None)
754 nsd_RO.pop("_admin", None)
755 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
756 member_vnf_index = c_vnf["member-vnf-index"]
757 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
758 for c_vld in nsd_RO.get("vld", ()):
759 for cp in c_vld.get("vnfd-connection-point-ref", ()):
760 member_vnf_index = cp["member-vnf-index-ref"]
761 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
762
763 desc = await self.RO.create("nsd", descriptor=nsd_RO)
764 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
765 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
766 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
767 self.update_db_2("nsrs", nsr_id, db_nsr_update)
768 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
769
770 # Crate ns at RO
771 # if present use it unless in error status
772 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
773 if RO_nsr_id:
774 try:
775 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
776 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
777 desc = await self.RO.show("ns", RO_nsr_id)
778 except ROclient.ROClientException as e:
779 if e.http_code != HTTPStatus.NOT_FOUND:
780 raise
781 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
782 if RO_nsr_id:
783 ns_status, ns_status_info = self.RO.check_ns_status(desc)
784 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
787 .format(RO_nsr_id)
788 self.logger.debug(logging_text + step)
789 await self.RO.delete("ns", RO_nsr_id)
790 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
791 if not RO_nsr_id:
792 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
793 # self.logger.debug(logging_text + step)
794
795 # check if VIM is creating and wait look if previous tasks in process
796 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
797 if task_dependency:
798 step = "Waiting for related tasks to be completed: {}".format(task_name)
799 self.logger.debug(logging_text + step)
800 await asyncio.wait(task_dependency, timeout=3600)
801 if ns_params.get("vnf"):
802 for vnf in ns_params["vnf"]:
803 if "vimAccountId" in vnf:
804 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
805 vnf["vimAccountId"])
806 if task_dependency:
807 step = "Waiting for related tasks to be completed: {}".format(task_name)
808 self.logger.debug(logging_text + step)
809 await asyncio.wait(task_dependency, timeout=3600)
810
811 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
812
813 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
814
815 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
816 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
817 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
818 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
819 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
820 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
821 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
822 self.update_db_2("nsrs", nsr_id, db_nsr_update)
823 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
824
825 # wait until NS is ready
826 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
827 detailed_status_old = None
828 self.logger.debug(logging_text + step)
829
830 while time() <= start_deploy + self.total_deploy_timeout:
831 desc = await self.RO.show("ns", RO_nsr_id)
832 ns_status, ns_status_info = self.RO.check_ns_status(desc)
833 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
834 if ns_status == "ERROR":
835 raise ROclient.ROClientException(ns_status_info)
836 elif ns_status == "BUILD":
837 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
838 elif ns_status == "ACTIVE":
839 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
840 try:
841 if vdu_flag:
842 self.ns_update_vnfr(db_vnfrs, desc)
843 break
844 except LcmExceptionNoMgmtIP:
845 pass
846 else:
847 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
848 if detailed_status != detailed_status_old:
849 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
852 await asyncio.sleep(5, loop=self.loop)
853 else: # total_deploy_timeout
854 raise ROclient.ROClientException("Timeout waiting ns to be ready")
855
856 step = "Updating NSR"
857 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
858
859 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
860 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self.update_db_2("nsrs", nsr_id, db_nsr_update)
863 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
864
865 step = "Deployed at VIM"
866 self.logger.debug(logging_text + step)
867
868 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
869 """
870 Wait for ip addres at RO, and optionally, insert public key in virtual machine
871 :param logging_text: prefix use for logging
872 :param nsr_id:
873 :param vnfr_id:
874 :param vdu_id:
875 :param vdu_index:
876 :param pub_key: public ssh key to inject, None to skip
877 :param user: user to apply the public ssh key
878 :return: IP address
879 """
880
881 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
882 ro_nsr_id = None
883 ip_address = None
884 nb_tries = 0
885 target_vdu_id = None
886 ro_retries = 0
887
888 while True:
889
890 ro_retries += 1
891 if ro_retries >= 360: # 1 hour
892 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
893
894 await asyncio.sleep(10, loop=self.loop)
895 # wait until NS is deployed at RO
896 if not ro_nsr_id:
897 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
898 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
899 if not ro_nsr_id:
900 continue
901
902 # get ip address
903 if not target_vdu_id:
904 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
905
906 if not vdu_id: # for the VNF case
907 ip_address = db_vnfr.get("ip-address")
908 if not ip_address:
909 continue
910 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
911 else: # VDU case
912 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
913 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
914
915 if not vdur:
916 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
917 vnfr_id, vdu_id, vdu_index
918 ))
919
920 if vdur.get("status") == "ACTIVE":
921 ip_address = vdur.get("ip-address")
922 if not ip_address:
923 continue
924 target_vdu_id = vdur["vdu-id-ref"]
925 elif vdur.get("status") == "ERROR":
926 raise LcmException("Cannot inject ssh-key because target VM is in error state")
927
928 if not target_vdu_id:
929 continue
930
931 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
932
933 # inject public key into machine
934 if pub_key and user:
935 # self.logger.debug(logging_text + "Inserting RO key")
936 try:
937 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
938 result_dict = await self.RO.create_action(
939 item="ns",
940 item_id_name=ro_nsr_id,
941 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
942 )
943 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
944 if not result_dict or not isinstance(result_dict, dict):
945 raise LcmException("Unknown response from RO when injecting key")
946 for result in result_dict.values():
947 if result.get("vim_result") == 200:
948 break
949 else:
950 raise ROclient.ROClientException("error injecting key: {}".format(
951 result.get("description")))
952 break
953 except ROclient.ROClientException as e:
954 if not nb_tries:
955 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
956 format(e, 20*10))
957 nb_tries += 1
958 if nb_tries >= 20:
959 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
960 else:
961 break
962
963 return ip_address
964
965 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
966 """
967 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
968 """
969 my_vca = vca_deployed_list[vca_index]
970 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
971 return
972 timeout = 300
973 while timeout >= 0:
974 for index, vca_deployed in enumerate(vca_deployed_list):
975 if index == vca_index:
976 continue
977 if not my_vca.get("member-vnf-index") or \
978 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
979 if not vca_deployed.get("instantiation"):
980 break # wait
981 if vca_deployed["instantiation"] == "FAILED":
982 raise LcmException("Configuration aborted because dependent charm/s has failed")
983 else:
984 return
985 await asyncio.sleep(10)
986 timeout -= 1
987 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
988 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
989
990 raise LcmException("Configuration aborted because dependent charm/s timeout")
991
992 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
993 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
994 nsr_id = db_nsr["_id"]
995 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
996 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
997 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
998 db_dict = {
999 'collection': 'nsrs',
1000 'filter': {'_id': nsr_id},
1001 'path': db_update_entry
1002 }
1003 step = ""
1004 try:
1005 vnfr_id = None
1006 if db_vnfr:
1007 vnfr_id = db_vnfr["_id"]
1008
1009 namespace = "{nsi}.{ns}".format(
1010 nsi=nsi_id if nsi_id else "",
1011 ns=nsr_id)
1012 if vnfr_id:
1013 namespace += "." + vnfr_id
1014 if vdu_id:
1015 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1016
1017 # Get artifact path
1018 artifact_path = "{}/{}/charms/{}".format(
1019 base_folder["folder"],
1020 base_folder["pkg-dir"],
1021 config_descriptor["juju"]["charm"]
1022 )
1023
1024 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1025 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1026 is_proxy_charm = False
1027
1028 # n2vc_redesign STEP 3.1
1029
1030 # find old ee_id if exists
1031 ee_id = vca_deployed.get("ee_id")
1032
1033 # create or register execution environment in VCA
1034 if is_proxy_charm:
1035 step = "create execution environment"
1036 self.logger.debug(logging_text + step)
1037 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1038 reuse_ee_id=ee_id,
1039 db_dict=db_dict)
1040 else:
1041 step = "Waiting to VM being up and getting IP address"
1042 self.logger.debug(logging_text + step)
1043 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1044 user=None, pub_key=None)
1045 credentials = {"hostname": rw_mgmt_ip}
1046 # get username
1047 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1048 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1049 # merged. Meanwhile let's get username from initial-config-primitive
1050 if not username and config_descriptor.get("initial-config-primitive"):
1051 for config_primitive in config_descriptor["initial-config-primitive"]:
1052 for param in config_primitive.get("parameter", ()):
1053 if param["name"] == "ssh-username":
1054 username = param["value"]
1055 break
1056 if not username:
1057 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1058 "'config-access.ssh-access.default-user'")
1059 credentials["username"] = username
1060 # n2vc_redesign STEP 3.2
1061
1062 step = "register execution environment {}".format(credentials)
1063 self.logger.debug(logging_text + step)
1064 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1065 db_dict=db_dict)
1066
1067 # for compatibility with MON/POL modules, the need model and application name at database
1068 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1069 ee_id_parts = ee_id.split('.')
1070 model_name = ee_id_parts[0]
1071 application_name = ee_id_parts[1]
1072 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1073 db_update_entry + "application": application_name,
1074 db_update_entry + "ee_id": ee_id})
1075
1076 # n2vc_redesign STEP 3.3
1077
1078 step = "Install configuration Software"
1079 # TODO check if already done
1080 self.logger.debug(logging_text + step)
1081 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1082
1083 # if SSH access is required, then get execution environment SSH public
1084 if is_proxy_charm: # if native charm we have waited already to VM be UP
1085 pub_key = None
1086 user = None
1087 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1088 # Needed to inject a ssh key
1089 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1090 step = "Install configuration Software, getting public ssh key"
1091 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1092
1093 step = "Insert public key into VM"
1094 else:
1095 step = "Waiting to VM being up and getting IP address"
1096 self.logger.debug(logging_text + step)
1097
1098 # n2vc_redesign STEP 5.1
1099 # wait for RO (ip-address) Insert pub_key into VM
1100 if vnfr_id:
1101 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1102 user=user, pub_key=pub_key)
1103 else:
1104 rw_mgmt_ip = None # This is for a NS configuration
1105
1106 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1107
1108 # store rw_mgmt_ip in deploy params for later replacement
1109 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1110
1111 # n2vc_redesign STEP 6 Execute initial config primitive
1112 step = 'execute initial config primitive'
1113 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1114
1115 # sort initial config primitives by 'seq'
1116 try:
1117 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1118 except Exception as e:
1119 self.logger.error(logging_text + step + ": " + str(e))
1120
1121 # add config if not present for NS charm
1122 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1123 vca_deployed)
1124 if initial_config_primitive_list:
1125 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1126 for initial_config_primitive in initial_config_primitive_list:
1127 # adding information on the vca_deployed if it is a NS execution environment
1128 if not vca_deployed["member-vnf-index"]:
1129 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1130 # TODO check if already done
1131 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1132
1133 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1134 self.logger.debug(logging_text + step)
1135 await self.n2vc.exec_primitive(
1136 ee_id=ee_id,
1137 primitive_name=initial_config_primitive["name"],
1138 params_dict=primitive_params_,
1139 db_dict=db_dict
1140 )
1141 # TODO register in database that primitive is done
1142
1143 step = "instantiated at VCA"
1144 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "COMPLETED"})
1145 self.logger.debug(logging_text + step)
1146
1147 except Exception as e: # TODO not use Exception but N2VC exception
1148 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1149 raise Exception("{} {}".format(step, e)) from e
1150 # TODO raise N2VC exception with 'step' extra information
1151
1152 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1153 error_description: str = None, error_detail: str = None):
1154 try:
1155 db_dict = dict()
1156 if ns_state:
1157 db_dict["nsState"] = ns_state
1158 db_dict["currentOperation"] = current_operation
1159 db_dict["currentOperationID"] = current_operation_id
1160 db_dict["errorDescription"] = error_description
1161 db_dict["errorDetail"] = error_detail
1162 self.update_db_2("nsrs", nsr_id, db_dict)
1163 except Exception as e:
1164 self.logger.warn('Error writing NS status: {}'.format(e))
1165
1166 async def instantiate(self, nsr_id, nslcmop_id):
1167 """
1168
1169 :param nsr_id: ns instance to deploy
1170 :param nslcmop_id: operation to run
1171 :return:
1172 """
1173
1174 # Try to lock HA task here
1175 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1176 if not task_is_locked_by_me:
1177 self.logger.debug('instantiate() task is not locked by me')
1178 return
1179
1180 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1181 self.logger.debug(logging_text + "Enter")
1182
1183 # get all needed from database
1184
1185 # database nsrs record
1186 db_nsr = None
1187
1188 # database nslcmops record
1189 db_nslcmop = None
1190
1191 # update operation on nsrs
1192 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1193 "_admin.current-operation": nslcmop_id,
1194 "_admin.operation-type": "instantiate"}
1195 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1196
1197 # update operation on nslcmops
1198 db_nslcmop_update = {}
1199
1200 nslcmop_operation_state = None
1201 db_vnfrs = {} # vnf's info indexed by member-index
1202 # n2vc_info = {}
1203 task_instantiation_list = []
1204 task_instantiation_info = {} # from task to info text
1205 exc = None
1206 try:
1207 # wait for any previous tasks in process
1208 step = "Waiting for previous operations to terminate"
1209 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1210
1211 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1212
1213 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1214 self._write_ns_status(
1215 nsr_id=nsr_id,
1216 ns_state="BUILDING",
1217 current_operation="INSTANTIATING",
1218 current_operation_id=nslcmop_id
1219 )
1220
1221 # read from db: operation
1222 step = "Getting nslcmop={} from db".format(nslcmop_id)
1223 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1224
1225 # read from db: ns
1226 step = "Getting nsr={} from db".format(nsr_id)
1227 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1228 # nsd is replicated into ns (no db read)
1229 nsd = db_nsr["nsd"]
1230 # nsr_name = db_nsr["name"] # TODO short-name??
1231
1232 # read from db: vnf's of this ns
1233 step = "Getting vnfrs from db"
1234 self.logger.debug(logging_text + step)
1235 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1236
1237 # read from db: vnfd's for every vnf
1238 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1239 db_vnfds = {} # every vnfd data indexed by vnf id
1240 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1241
1242 # for each vnf in ns, read vnfd
1243 for vnfr in db_vnfrs_list:
1244 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1245 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1246 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1247 # if we haven't this vnfd, read it from db
1248 if vnfd_id not in db_vnfds:
1249 # read from cb
1250 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1251 self.logger.debug(logging_text + step)
1252 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1253
1254 # store vnfd
1255 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1256 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1257 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1258
1259 # Get or generates the _admin.deployed.VCA list
1260 vca_deployed_list = None
1261 if db_nsr["_admin"].get("deployed"):
1262 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1263 if vca_deployed_list is None:
1264 vca_deployed_list = []
1265 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1266 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1267 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1268 elif isinstance(vca_deployed_list, dict):
1269 # maintain backward compatibility. Change a dict to list at database
1270 vca_deployed_list = list(vca_deployed_list.values())
1271 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1272 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1273
1274 db_nsr_update["detailed-status"] = "creating"
1275 db_nsr_update["operational-status"] = "init"
1276
1277 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1278 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1279 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1280
1281 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1282 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1283 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1284 self.logger.debug(logging_text + "Before deploy_kdus")
1285 # Call to deploy_kdus in case exists the "vdu:kdu" param
1286 task_kdu = asyncio.ensure_future(
1287 self.deploy_kdus(
1288 logging_text=logging_text,
1289 nsr_id=nsr_id,
1290 db_nsr=db_nsr,
1291 db_vnfrs=db_vnfrs,
1292 )
1293 )
1294 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1295 task_instantiation_info[task_kdu] = "Deploy KDUs"
1296 task_instantiation_list.append(task_kdu)
1297 # n2vc_redesign STEP 1 Get VCA public ssh-key
1298 # feature 1429. Add n2vc public key to needed VMs
1299 n2vc_key = self.n2vc.get_public_key()
1300 n2vc_key_list = [n2vc_key]
1301 if self.vca_config.get("public_key"):
1302 n2vc_key_list.append(self.vca_config["public_key"])
1303
1304 # n2vc_redesign STEP 2 Deploy Network Scenario
1305 task_ro = asyncio.ensure_future(
1306 self.instantiate_RO(
1307 logging_text=logging_text,
1308 nsr_id=nsr_id,
1309 nsd=nsd,
1310 db_nsr=db_nsr,
1311 db_nslcmop=db_nslcmop,
1312 db_vnfrs=db_vnfrs,
1313 db_vnfds_ref=db_vnfds_ref,
1314 n2vc_key_list=n2vc_key_list
1315 )
1316 )
1317 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1318 task_instantiation_info[task_ro] = "Deploy at VIM"
1319 task_instantiation_list.append(task_ro)
1320
1321 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1322 step = "Looking for needed vnfd to configure with proxy charm"
1323 self.logger.debug(logging_text + step)
1324
1325 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1326 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1327 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1328 vnfd_id = c_vnf["vnfd-id-ref"]
1329 vnfd = db_vnfds_ref[vnfd_id]
1330 member_vnf_index = str(c_vnf["member-vnf-index"])
1331 db_vnfr = db_vnfrs[member_vnf_index]
1332 base_folder = vnfd["_admin"]["storage"]
1333 vdu_id = None
1334 vdu_index = 0
1335 vdu_name = None
1336 kdu_name = None
1337
1338 # Get additional parameters
1339 deploy_params = {}
1340 if db_vnfr.get("additionalParamsForVnf"):
1341 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1342
1343 descriptor_config = vnfd.get("vnf-configuration")
1344 if descriptor_config and descriptor_config.get("juju"):
1345 self._deploy_n2vc(
1346 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1347 db_nsr=db_nsr,
1348 db_vnfr=db_vnfr,
1349 nslcmop_id=nslcmop_id,
1350 nsr_id=nsr_id,
1351 nsi_id=nsi_id,
1352 vnfd_id=vnfd_id,
1353 vdu_id=vdu_id,
1354 kdu_name=kdu_name,
1355 member_vnf_index=member_vnf_index,
1356 vdu_index=vdu_index,
1357 vdu_name=vdu_name,
1358 deploy_params=deploy_params,
1359 descriptor_config=descriptor_config,
1360 base_folder=base_folder,
1361 task_instantiation_list=task_instantiation_list,
1362 task_instantiation_info=task_instantiation_info
1363 )
1364
1365 # Deploy charms for each VDU that supports one.
1366 for vdud in get_iterable(vnfd, 'vdu'):
1367 vdu_id = vdud["id"]
1368 descriptor_config = vdud.get('vdu-configuration')
1369 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1370 if vdur.get("additionalParams"):
1371 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1372 else:
1373 deploy_params_vdu = deploy_params
1374 if descriptor_config and descriptor_config.get("juju"):
1375 # look for vdu index in the db_vnfr["vdu"] section
1376 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1377 # if vdur["vdu-id-ref"] == vdu_id:
1378 # break
1379 # else:
1380 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1381 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1382 # vdu_name = vdur.get("name")
1383 vdu_name = None
1384 kdu_name = None
1385 for vdu_index in range(int(vdud.get("count", 1))):
1386 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1387 self._deploy_n2vc(
1388 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1389 member_vnf_index, vdu_id, vdu_index),
1390 db_nsr=db_nsr,
1391 db_vnfr=db_vnfr,
1392 nslcmop_id=nslcmop_id,
1393 nsr_id=nsr_id,
1394 nsi_id=nsi_id,
1395 vnfd_id=vnfd_id,
1396 vdu_id=vdu_id,
1397 kdu_name=kdu_name,
1398 member_vnf_index=member_vnf_index,
1399 vdu_index=vdu_index,
1400 vdu_name=vdu_name,
1401 deploy_params=deploy_params_vdu,
1402 descriptor_config=descriptor_config,
1403 base_folder=base_folder,
1404 task_instantiation_list=task_instantiation_list,
1405 task_instantiation_info=task_instantiation_info
1406 )
1407 for kdud in get_iterable(vnfd, 'kdu'):
1408 kdu_name = kdud["name"]
1409 descriptor_config = kdud.get('kdu-configuration')
1410 if descriptor_config and descriptor_config.get("juju"):
1411 vdu_id = None
1412 vdu_index = 0
1413 vdu_name = None
1414 # look for vdu index in the db_vnfr["vdu"] section
1415 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1416 # if vdur["vdu-id-ref"] == vdu_id:
1417 # break
1418 # else:
1419 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1420 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1421 # vdu_name = vdur.get("name")
1422 # vdu_name = None
1423
1424 self._deploy_n2vc(
1425 logging_text=logging_text,
1426 db_nsr=db_nsr,
1427 db_vnfr=db_vnfr,
1428 nslcmop_id=nslcmop_id,
1429 nsr_id=nsr_id,
1430 nsi_id=nsi_id,
1431 vnfd_id=vnfd_id,
1432 vdu_id=vdu_id,
1433 kdu_name=kdu_name,
1434 member_vnf_index=member_vnf_index,
1435 vdu_index=vdu_index,
1436 vdu_name=vdu_name,
1437 deploy_params=deploy_params,
1438 descriptor_config=descriptor_config,
1439 base_folder=base_folder,
1440 task_instantiation_list=task_instantiation_list,
1441 task_instantiation_info=task_instantiation_info
1442 )
1443
1444 # Check if this NS has a charm configuration
1445 descriptor_config = nsd.get("ns-configuration")
1446 if descriptor_config and descriptor_config.get("juju"):
1447 vnfd_id = None
1448 db_vnfr = None
1449 member_vnf_index = None
1450 vdu_id = None
1451 kdu_name = None
1452 vdu_index = 0
1453 vdu_name = None
1454
1455 # Get additional parameters
1456 deploy_params = {}
1457 if db_nsr.get("additionalParamsForNs"):
1458 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1459 base_folder = nsd["_admin"]["storage"]
1460 self._deploy_n2vc(
1461 logging_text=logging_text,
1462 db_nsr=db_nsr,
1463 db_vnfr=db_vnfr,
1464 nslcmop_id=nslcmop_id,
1465 nsr_id=nsr_id,
1466 nsi_id=nsi_id,
1467 vnfd_id=vnfd_id,
1468 vdu_id=vdu_id,
1469 kdu_name=kdu_name,
1470 member_vnf_index=member_vnf_index,
1471 vdu_index=vdu_index,
1472 vdu_name=vdu_name,
1473 deploy_params=deploy_params,
1474 descriptor_config=descriptor_config,
1475 base_folder=base_folder,
1476 task_instantiation_list=task_instantiation_list,
1477 task_instantiation_info=task_instantiation_info
1478 )
1479
1480 # Wait until all tasks of "task_instantiation_list" have been finished
1481
1482 # while time() <= start_deploy + self.total_deploy_timeout:
1483 error_text_list = []
1484 timeout = 3600
1485
1486 # let's begin with all OK
1487 instantiated_ok = True
1488 # let's begin with RO 'running' status (later we can change it)
1489 db_nsr_update["operational-status"] = "running"
1490 # let's begin with VCA 'configured' status (later we can change it)
1491 db_nsr_update["config-status"] = "configured"
1492
1493 if task_instantiation_list:
1494 # wait for all tasks completion
1495 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1496
1497 for task in pending:
1498 instantiated_ok = False
1499 if task == task_ro:
1500 db_nsr_update["operational-status"] = "failed"
1501 else:
1502 db_nsr_update["config-status"] = "failed"
1503 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1504 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1505 for task in done:
1506 if task.cancelled():
1507 instantiated_ok = False
1508 if task == task_ro:
1509 db_nsr_update["operational-status"] = "failed"
1510 else:
1511 db_nsr_update["config-status"] = "failed"
1512 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1513 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1514 else:
1515 exc = task.exception()
1516 if exc:
1517 instantiated_ok = False
1518 if task == task_ro:
1519 db_nsr_update["operational-status"] = "failed"
1520 else:
1521 db_nsr_update["config-status"] = "failed"
1522 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1523 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1524 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1525 else:
1526 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1527 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1528 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1529 else:
1530 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1531
1532 if error_text_list:
1533 error_text = "\n".join(error_text_list)
1534 db_nsr_update["detailed-status"] = error_text
1535 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1536 db_nslcmop_update["detailed-status"] = error_text
1537 db_nslcmop_update["statusEnteredTime"] = time()
1538 else:
1539 # all is done
1540 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1541 db_nslcmop_update["statusEnteredTime"] = time()
1542 db_nslcmop_update["detailed-status"] = "done"
1543 db_nsr_update["detailed-status"] = "done"
1544
1545 except (ROclient.ROClientException, DbException, LcmException) as e:
1546 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1547 exc = e
1548 except asyncio.CancelledError:
1549 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1550 exc = "Operation was cancelled"
1551 except Exception as e:
1552 exc = traceback.format_exc()
1553 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1554 exc_info=True)
1555 finally:
1556 if exc:
1557 if db_nsr:
1558 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1559 db_nsr_update["operational-status"] = "failed"
1560 db_nsr_update["config-status"] = "failed"
1561 if db_nslcmop:
1562 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1563 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1564 db_nslcmop_update["statusEnteredTime"] = time()
1565 try:
1566 if db_nsr:
1567 db_nsr_update["_admin.nslcmop"] = None
1568 db_nsr_update["_admin.current-operation"] = None
1569 db_nsr_update["_admin.operation-type"] = None
1570 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1571
1572 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1573 ns_state = None
1574 error_description = None
1575 error_detail = None
1576 if instantiated_ok:
1577 ns_state = "READY"
1578 else:
1579 ns_state = "BROKEN"
1580 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1581 error_detail = error_text
1582 self._write_ns_status(
1583 nsr_id=nsr_id,
1584 ns_state=ns_state,
1585 current_operation="IDLE",
1586 current_operation_id=None,
1587 error_description=error_description,
1588 error_detail=error_detail
1589 )
1590
1591 if db_nslcmop_update:
1592 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1593 except DbException as e:
1594 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1595 if nslcmop_operation_state:
1596 try:
1597 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1598 "operationState": nslcmop_operation_state},
1599 loop=self.loop)
1600 except Exception as e:
1601 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1602
1603 self.logger.debug(logging_text + "Exit")
1604 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1605
1606 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1607 # Launch kdus if present in the descriptor
1608
1609 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1610
1611 def _get_cluster_id(cluster_id, cluster_type):
1612 nonlocal k8scluster_id_2_uuic
1613 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1614 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1615
1616 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1617 if not db_k8scluster:
1618 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1619 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1620 if not k8s_id:
1621 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1622 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1623 return k8s_id
1624
1625 logging_text += "Deploy kdus: "
1626 try:
1627 db_nsr_update = {"_admin.deployed.K8s": []}
1628 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1629
1630 # Look for all vnfds
1631 pending_tasks = {}
1632 index = 0
1633 for vnfr_data in db_vnfrs.values():
1634 for kdur in get_iterable(vnfr_data, "kdur"):
1635 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1636 kdumodel = None
1637 k8sclustertype = None
1638 error_text = None
1639 cluster_uuid = None
1640 if kdur.get("helm-chart"):
1641 kdumodel = kdur["helm-chart"]
1642 k8sclustertype = "chart"
1643 k8sclustertype_full = "helm-chart"
1644 elif kdur.get("juju-bundle"):
1645 kdumodel = kdur["juju-bundle"]
1646 k8sclustertype = "juju"
1647 k8sclustertype_full = "juju-bundle"
1648 else:
1649 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1650 " running"
1651 try:
1652 if not error_text:
1653 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1654 except LcmException as e:
1655 error_text = str(e)
1656 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1657
1658 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1659 "k8scluster-type": k8sclustertype,
1660 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1661 if error_text:
1662 k8s_instace_info["detailed-status"] = error_text
1663 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1665 if error_text:
1666 continue
1667
1668 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1669 "{}".format(index)}
1670 if k8sclustertype == "chart":
1671 task = asyncio.ensure_future(
1672 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1673 params=desc_params, db_dict=db_dict, timeout=3600)
1674 )
1675 else:
1676 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1677 atomic=True, params=desc_params,
1678 db_dict=db_dict, timeout=600)
1679
1680 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1681 index += 1
1682 if not pending_tasks:
1683 return
1684 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1685 pending_list = list(pending_tasks.keys())
1686 while pending_list:
1687 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1688 return_when=asyncio.FIRST_COMPLETED)
1689 if not done_list: # timeout
1690 for task in pending_list:
1691 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1692 break
1693 for task in done_list:
1694 exc = task.exception()
1695 if exc:
1696 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1697 else:
1698 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1699
1700 except Exception as e:
1701 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1702 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1703 finally:
1704 # TODO Write in data base
1705 if db_nsr_update:
1706 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1707
1708 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1709 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1710 base_folder, task_instantiation_list, task_instantiation_info):
1711 # launch instantiate_N2VC in a asyncio task and register task object
1712 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1713 # if not found, create one entry and update database
1714
1715 # fill db_nsr._admin.deployed.VCA.<index>
1716 vca_index = -1
1717 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1718 if not vca_deployed:
1719 continue
1720 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1721 vca_deployed.get("vdu_id") == vdu_id and \
1722 vca_deployed.get("kdu_name") == kdu_name and \
1723 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1724 break
1725 else:
1726 # not found, create one.
1727 vca_deployed = {
1728 "member-vnf-index": member_vnf_index,
1729 "vdu_id": vdu_id,
1730 "kdu_name": kdu_name,
1731 "vdu_count_index": vdu_index,
1732 "operational-status": "init", # TODO revise
1733 "detailed-status": "", # TODO revise
1734 "step": "initial-deploy", # TODO revise
1735 "vnfd_id": vnfd_id,
1736 "vdu_name": vdu_name,
1737 }
1738 vca_index += 1
1739 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1740 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1741
1742 # Launch task
1743 task_n2vc = asyncio.ensure_future(
1744 self.instantiate_N2VC(
1745 logging_text=logging_text,
1746 vca_index=vca_index,
1747 nsi_id=nsi_id,
1748 db_nsr=db_nsr,
1749 db_vnfr=db_vnfr,
1750 vdu_id=vdu_id,
1751 kdu_name=kdu_name,
1752 vdu_index=vdu_index,
1753 deploy_params=deploy_params,
1754 config_descriptor=descriptor_config,
1755 base_folder=base_folder,
1756 )
1757 )
1758 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1759 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1760 task_instantiation_list.append(task_n2vc)
1761
1762 # Check if this VNFD has a configured terminate action
1763 def _has_terminate_config_primitive(self, vnfd):
1764 vnf_config = vnfd.get("vnf-configuration")
1765 if vnf_config and vnf_config.get("terminate-config-primitive"):
1766 return True
1767 else:
1768 return False
1769
1770 @staticmethod
1771 def _get_terminate_config_primitive_seq_list(vnfd):
1772 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1773 # No need to check for existing primitive twice, already done before
1774 vnf_config = vnfd.get("vnf-configuration")
1775 seq_list = vnf_config.get("terminate-config-primitive")
1776 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1777 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1778 return seq_list_sorted
1779
1780 @staticmethod
1781 def _create_nslcmop(nsr_id, operation, params):
1782 """
1783 Creates a ns-lcm-opp content to be stored at database.
1784 :param nsr_id: internal id of the instance
1785 :param operation: instantiate, terminate, scale, action, ...
1786 :param params: user parameters for the operation
1787 :return: dictionary following SOL005 format
1788 """
1789 # Raise exception if invalid arguments
1790 if not (nsr_id and operation and params):
1791 raise LcmException(
1792 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1793 now = time()
1794 _id = str(uuid4())
1795 nslcmop = {
1796 "id": _id,
1797 "_id": _id,
1798 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1799 "operationState": "PROCESSING",
1800 "statusEnteredTime": now,
1801 "nsInstanceId": nsr_id,
1802 "lcmOperationType": operation,
1803 "startTime": now,
1804 "isAutomaticInvocation": False,
1805 "operationParams": params,
1806 "isCancelPending": False,
1807 "links": {
1808 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1809 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1810 }
1811 }
1812 return nslcmop
1813
1814 def _format_additional_params(self, params):
1815 params = params or {}
1816 for key, value in params.items():
1817 if str(value).startswith("!!yaml "):
1818 params[key] = yaml.safe_load(value[7:])
1819 return params
1820
1821 def _get_terminate_primitive_params(self, seq, vnf_index):
1822 primitive = seq.get('name')
1823 primitive_params = {}
1824 params = {
1825 "member_vnf_index": vnf_index,
1826 "primitive": primitive,
1827 "primitive_params": primitive_params,
1828 }
1829 desc_params = {}
1830 return self._map_primitive_params(seq, params, desc_params)
1831
1832 # sub-operations
1833
1834 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1835 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1836 if (op.get('operationState') == 'COMPLETED'):
1837 # b. Skip sub-operation
1838 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1839 return self.SUBOPERATION_STATUS_SKIP
1840 else:
1841 # c. Reintent executing sub-operation
1842 # The sub-operation exists, and operationState != 'COMPLETED'
1843 # Update operationState = 'PROCESSING' to indicate a reintent.
1844 operationState = 'PROCESSING'
1845 detailed_status = 'In progress'
1846 self._update_suboperation_status(
1847 db_nslcmop, op_index, operationState, detailed_status)
1848 # Return the sub-operation index
1849 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1850 # with arguments extracted from the sub-operation
1851 return op_index
1852
1853 # Find a sub-operation where all keys in a matching dictionary must match
1854 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1855 def _find_suboperation(self, db_nslcmop, match):
1856 if (db_nslcmop and match):
1857 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1858 for i, op in enumerate(op_list):
1859 if all(op.get(k) == match[k] for k in match):
1860 return i
1861 return self.SUBOPERATION_STATUS_NOT_FOUND
1862
1863 # Update status for a sub-operation given its index
1864 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1865 # Update DB for HA tasks
1866 q_filter = {'_id': db_nslcmop['_id']}
1867 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1868 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1869 self.db.set_one("nslcmops",
1870 q_filter=q_filter,
1871 update_dict=update_dict,
1872 fail_on_empty=False)
1873
1874 # Add sub-operation, return the index of the added sub-operation
1875 # Optionally, set operationState, detailed-status, and operationType
1876 # Status and type are currently set for 'scale' sub-operations:
1877 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1878 # 'detailed-status' : status message
1879 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1880 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1881 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1882 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1883 RO_nsr_id=None, RO_scaling_info=None):
1884 if not (db_nslcmop):
1885 return self.SUBOPERATION_STATUS_NOT_FOUND
1886 # Get the "_admin.operations" list, if it exists
1887 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1888 op_list = db_nslcmop_admin.get('operations')
1889 # Create or append to the "_admin.operations" list
1890 new_op = {'member_vnf_index': vnf_index,
1891 'vdu_id': vdu_id,
1892 'vdu_count_index': vdu_count_index,
1893 'primitive': primitive,
1894 'primitive_params': mapped_primitive_params}
1895 if operationState:
1896 new_op['operationState'] = operationState
1897 if detailed_status:
1898 new_op['detailed-status'] = detailed_status
1899 if operationType:
1900 new_op['lcmOperationType'] = operationType
1901 if RO_nsr_id:
1902 new_op['RO_nsr_id'] = RO_nsr_id
1903 if RO_scaling_info:
1904 new_op['RO_scaling_info'] = RO_scaling_info
1905 if not op_list:
1906 # No existing operations, create key 'operations' with current operation as first list element
1907 db_nslcmop_admin.update({'operations': [new_op]})
1908 op_list = db_nslcmop_admin.get('operations')
1909 else:
1910 # Existing operations, append operation to list
1911 op_list.append(new_op)
1912
1913 db_nslcmop_update = {'_admin.operations': op_list}
1914 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1915 op_index = len(op_list) - 1
1916 return op_index
1917
1918 # Helper methods for scale() sub-operations
1919
1920 # pre-scale/post-scale:
1921 # Check for 3 different cases:
1922 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1923 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1924 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1925 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1926 operationType, RO_nsr_id=None, RO_scaling_info=None):
1927 # Find this sub-operation
1928 if (RO_nsr_id and RO_scaling_info):
1929 operationType = 'SCALE-RO'
1930 match = {
1931 'member_vnf_index': vnf_index,
1932 'RO_nsr_id': RO_nsr_id,
1933 'RO_scaling_info': RO_scaling_info,
1934 }
1935 else:
1936 match = {
1937 'member_vnf_index': vnf_index,
1938 'primitive': vnf_config_primitive,
1939 'primitive_params': primitive_params,
1940 'lcmOperationType': operationType
1941 }
1942 op_index = self._find_suboperation(db_nslcmop, match)
1943 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1944 # a. New sub-operation
1945 # The sub-operation does not exist, add it.
1946 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1947 # The following parameters are set to None for all kind of scaling:
1948 vdu_id = None
1949 vdu_count_index = None
1950 vdu_name = None
1951 if (RO_nsr_id and RO_scaling_info):
1952 vnf_config_primitive = None
1953 primitive_params = None
1954 else:
1955 RO_nsr_id = None
1956 RO_scaling_info = None
1957 # Initial status for sub-operation
1958 operationState = 'PROCESSING'
1959 detailed_status = 'In progress'
1960 # Add sub-operation for pre/post-scaling (zero or more operations)
1961 self._add_suboperation(db_nslcmop,
1962 vnf_index,
1963 vdu_id,
1964 vdu_count_index,
1965 vdu_name,
1966 vnf_config_primitive,
1967 primitive_params,
1968 operationState,
1969 detailed_status,
1970 operationType,
1971 RO_nsr_id,
1972 RO_scaling_info)
1973 return self.SUBOPERATION_STATUS_NEW
1974 else:
1975 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1976 # or op_index (operationState != 'COMPLETED')
1977 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1978
1979 # Helper methods for terminate()
1980
1981 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1982 """ Create a primitive with params from VNFD
1983 Called from terminate() before deleting instance
1984 Calls action() to execute the primitive """
1985 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1986 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1987 db_vnfds = {}
1988 # Loop over VNFRs
1989 for vnfr in db_vnfrs_list:
1990 vnfd_id = vnfr["vnfd-id"]
1991 vnf_index = vnfr["member-vnf-index-ref"]
1992 if vnfd_id not in db_vnfds:
1993 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1994 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1995 db_vnfds[vnfd_id] = vnfd
1996 vnfd = db_vnfds[vnfd_id]
1997 if not self._has_terminate_config_primitive(vnfd):
1998 continue
1999 # Get the primitive's sorted sequence list
2000 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2001 for seq in seq_list:
2002 # For each sequence in list, get primitive and call _ns_execute_primitive()
2003 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2004 vnf_index, seq.get("name"))
2005 self.logger.debug(logging_text + step)
2006 # Create the primitive for each sequence, i.e. "primitive": "touch"
2007 primitive = seq.get('name')
2008 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2009 # The following 3 parameters are currently set to None for 'terminate':
2010 # vdu_id, vdu_count_index, vdu_name
2011 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2012 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2013 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2014 # Add sub-operation
2015 self._add_suboperation(db_nslcmop,
2016 nslcmop_id,
2017 vnf_index,
2018 vdu_id,
2019 vdu_count_index,
2020 vdu_name,
2021 primitive,
2022 mapped_primitive_params)
2023 # Sub-operations: Call _ns_execute_primitive() instead of action()
2024 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2025 # nsr_deployed = db_nsr["_admin"]["deployed"]
2026
2027 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2028 # nsr_id, nslcmop_terminate_action_id)
2029 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2030 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2031 # if result not in result_ok:
2032 # raise LcmException(
2033 # "terminate_primitive_action for vnf_member_index={}",
2034 # " primitive={} fails with error {}".format(
2035 # vnf_index, seq.get("name"), result_detail))
2036
2037 # TODO: find ee_id
2038 ee_id = None
2039 try:
2040 await self.n2vc.exec_primitive(
2041 ee_id=ee_id,
2042 primitive_name=primitive,
2043 params_dict=mapped_primitive_params
2044 )
2045 except Exception as e:
2046 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2047 raise LcmException(
2048 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2049 .format(vnf_index, seq.get("name"), e),
2050 )
2051
2052 async def terminate(self, nsr_id, nslcmop_id):
2053
2054 # Try to lock HA task here
2055 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2056 if not task_is_locked_by_me:
2057 return
2058
2059 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2060 self.logger.debug(logging_text + "Enter")
2061 db_nsr = None
2062 db_nslcmop = None
2063 exc = None
2064 failed_detail = [] # annotates all failed error messages
2065 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2066 "_admin.current-operation": nslcmop_id,
2067 "_admin.operation-type": "terminate"}
2068 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2069 db_nslcmop_update = {}
2070 nslcmop_operation_state = None
2071 autoremove = False # autoremove after terminated
2072 pending_tasks = []
2073 try:
2074 # wait for any previous tasks in process
2075 step = "Waiting for previous operations to terminate"
2076 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2077
2078 self._write_ns_status(
2079 nsr_id=nsr_id,
2080 ns_state="TERMINATING",
2081 current_operation="TERMINATING",
2082 current_operation_id=nslcmop_id
2083 )
2084
2085 step = "Getting nslcmop={} from db".format(nslcmop_id)
2086 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2087 step = "Getting nsr={} from db".format(nsr_id)
2088 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2089 # nsd = db_nsr["nsd"]
2090 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2091 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2092 return
2093 # #TODO check if VIM is creating and wait
2094 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2095 # Call internal terminate action
2096 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2097
2098 pending_tasks = []
2099
2100 db_nsr_update["operational-status"] = "terminating"
2101 db_nsr_update["config-status"] = "terminating"
2102
2103 # remove NS
2104 try:
2105 step = "delete execution environment"
2106 self.logger.debug(logging_text + step)
2107
2108 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2109 pending_tasks.append(task_delete_ee)
2110 except Exception as e:
2111 msg = "Failed while deleting NS in VCA: {}".format(e)
2112 self.logger.error(msg)
2113 failed_detail.append(msg)
2114
2115 try:
2116 # Delete from k8scluster
2117 step = "delete kdus"
2118 self.logger.debug(logging_text + step)
2119 # print(nsr_deployed)
2120 if nsr_deployed:
2121 for kdu in nsr_deployed.get("K8s", ()):
2122 kdu_instance = kdu.get("kdu-instance")
2123 if not kdu_instance:
2124 continue
2125 if kdu.get("k8scluster-type") == "chart":
2126 task_delete_kdu_instance = asyncio.ensure_future(
2127 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2128 kdu_instance=kdu_instance))
2129 elif kdu.get("k8scluster-type") == "juju":
2130 task_delete_kdu_instance = asyncio.ensure_future(
2131 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2132 kdu_instance=kdu_instance))
2133 else:
2134 self.error(logging_text + "Unknown k8s deployment type {}".
2135 format(kdu.get("k8scluster-type")))
2136 continue
2137 pending_tasks.append(task_delete_kdu_instance)
2138 except LcmException as e:
2139 msg = "Failed while deleting KDUs from NS: {}".format(e)
2140 self.logger.error(msg)
2141 failed_detail.append(msg)
2142
2143 # remove from RO
2144 RO_fail = False
2145
2146 # Delete ns
2147 RO_nsr_id = RO_delete_action = None
2148 if nsr_deployed and nsr_deployed.get("RO"):
2149 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2150 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2151 try:
2152 if RO_nsr_id:
2153 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2154 "Deleting ns from VIM"
2155 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2156 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2157 self.logger.debug(logging_text + step)
2158 desc = await self.RO.delete("ns", RO_nsr_id)
2159 RO_delete_action = desc["action_id"]
2160 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2161 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2162 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2163 if RO_delete_action:
2164 # wait until NS is deleted from VIM
2165 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2166 format(RO_nsr_id, RO_delete_action)
2167 detailed_status_old = None
2168 self.logger.debug(logging_text + step)
2169
2170 delete_timeout = 20 * 60 # 20 minutes
2171 while delete_timeout > 0:
2172 desc = await self.RO.show(
2173 "ns",
2174 item_id_name=RO_nsr_id,
2175 extra_item="action",
2176 extra_item_id=RO_delete_action)
2177 ns_status, ns_status_info = self.RO.check_action_status(desc)
2178 if ns_status == "ERROR":
2179 raise ROclient.ROClientException(ns_status_info)
2180 elif ns_status == "BUILD":
2181 detailed_status = step + "; {}".format(ns_status_info)
2182 elif ns_status == "ACTIVE":
2183 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2184 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2185 break
2186 else:
2187 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2188 if detailed_status != detailed_status_old:
2189 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2190 db_nsr_update["detailed-status"] = detailed_status
2191 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2192 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2193 await asyncio.sleep(5, loop=self.loop)
2194 delete_timeout -= 5
2195 else: # delete_timeout <= 0:
2196 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2197
2198 except ROclient.ROClientException as e:
2199 if e.http_code == 404: # not found
2200 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2201 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2202 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2203 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2204 elif e.http_code == 409: # conflict
2205 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2206 self.logger.debug(logging_text + failed_detail[-1])
2207 RO_fail = True
2208 else:
2209 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2210 self.logger.error(logging_text + failed_detail[-1])
2211 RO_fail = True
2212
2213 # Delete nsd
2214 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2215 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2216 try:
2217 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2218 "Deleting nsd from RO"
2219 await self.RO.delete("nsd", RO_nsd_id)
2220 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2221 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2222 except ROclient.ROClientException as e:
2223 if e.http_code == 404: # not found
2224 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2225 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2226 elif e.http_code == 409: # conflict
2227 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2228 self.logger.debug(logging_text + failed_detail[-1])
2229 RO_fail = True
2230 else:
2231 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2232 self.logger.error(logging_text + failed_detail[-1])
2233 RO_fail = True
2234
2235 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2236 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2237 if not vnf_deployed or not vnf_deployed["id"]:
2238 continue
2239 try:
2240 RO_vnfd_id = vnf_deployed["id"]
2241 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2242 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2243 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2244 await self.RO.delete("vnfd", RO_vnfd_id)
2245 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2246 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2247 except ROclient.ROClientException as e:
2248 if e.http_code == 404: # not found
2249 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2250 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2251 elif e.http_code == 409: # conflict
2252 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2253 self.logger.debug(logging_text + failed_detail[-1])
2254 else:
2255 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2256 self.logger.error(logging_text + failed_detail[-1])
2257
2258 if failed_detail:
2259 terminate_ok = False
2260 self.logger.error(logging_text + " ;".join(failed_detail))
2261 db_nsr_update["operational-status"] = "failed"
2262 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2263 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2264 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2265 db_nslcmop_update["statusEnteredTime"] = time()
2266 else:
2267 terminate_ok = True
2268 db_nsr_update["operational-status"] = "terminated"
2269 db_nsr_update["detailed-status"] = "Done"
2270 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2271 db_nslcmop_update["detailed-status"] = "Done"
2272 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2273 db_nslcmop_update["statusEnteredTime"] = time()
2274 if db_nslcmop["operationParams"].get("autoremove"):
2275 autoremove = True
2276
2277 except (ROclient.ROClientException, DbException, LcmException) as e:
2278 self.logger.error(logging_text + "Exit Exception {}".format(e))
2279 exc = e
2280 except asyncio.CancelledError:
2281 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2282 exc = "Operation was cancelled"
2283 except Exception as e:
2284 exc = traceback.format_exc()
2285 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2286 finally:
2287 if exc and db_nslcmop:
2288 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2289 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2290 db_nslcmop_update["statusEnteredTime"] = time()
2291 try:
2292 if db_nslcmop and db_nslcmop_update:
2293 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2294 if db_nsr:
2295 db_nsr_update["_admin.nslcmop"] = None
2296 db_nsr_update["_admin.current-operation"] = None
2297 db_nsr_update["_admin.operation-type"] = None
2298 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2299
2300 if terminate_ok:
2301 ns_state = "IDLE"
2302 error_description = None
2303 error_detail = None
2304 else:
2305 ns_state = "BROKEN"
2306 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2307 error_detail = "; ".join(failed_detail)
2308
2309 self._write_ns_status(
2310 nsr_id=nsr_id,
2311 ns_state=ns_state,
2312 current_operation="IDLE",
2313 current_operation_id=None,
2314 error_description=error_description,
2315 error_detail=error_detail
2316 )
2317
2318 except DbException as e:
2319 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2320 if nslcmop_operation_state:
2321 try:
2322 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2323 "operationState": nslcmop_operation_state,
2324 "autoremove": autoremove},
2325 loop=self.loop)
2326 except Exception as e:
2327 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2328
2329 # wait for pending tasks
2330 done = None
2331 pending = None
2332 if pending_tasks:
2333 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2334 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2335 if not pending:
2336 self.logger.debug(logging_text + 'All tasks finished...')
2337 else:
2338 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2339
2340 self.logger.debug(logging_text + "Exit")
2341 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2342
2343 @staticmethod
2344 def _map_primitive_params(primitive_desc, params, instantiation_params):
2345 """
2346 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2347 The default-value is used. If it is between < > it look for a value at instantiation_params
2348 :param primitive_desc: portion of VNFD/NSD that describes primitive
2349 :param params: Params provided by user
2350 :param instantiation_params: Instantiation params provided by user
2351 :return: a dictionary with the calculated params
2352 """
2353 calculated_params = {}
2354 for parameter in primitive_desc.get("parameter", ()):
2355 param_name = parameter["name"]
2356 if param_name in params:
2357 calculated_params[param_name] = params[param_name]
2358 elif "default-value" in parameter or "value" in parameter:
2359 if "value" in parameter:
2360 calculated_params[param_name] = parameter["value"]
2361 else:
2362 calculated_params[param_name] = parameter["default-value"]
2363 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2364 and calculated_params[param_name].endswith(">"):
2365 if calculated_params[param_name][1:-1] in instantiation_params:
2366 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2367 else:
2368 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2369 format(calculated_params[param_name], primitive_desc["name"]))
2370 else:
2371 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2372 format(param_name, primitive_desc["name"]))
2373
2374 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2375 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2376 width=256)
2377 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2378 calculated_params[param_name] = calculated_params[param_name][7:]
2379
2380 # add always ns_config_info if primitive name is config
2381 if primitive_desc["name"] == "config":
2382 if "ns_config_info" in instantiation_params:
2383 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2384 return calculated_params
2385
2386 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2387 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2388
2389 # find vca_deployed record for this action
2390 try:
2391 for vca_deployed in db_deployed["VCA"]:
2392 if not vca_deployed:
2393 continue
2394 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2395 continue
2396 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2397 continue
2398 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2399 continue
2400 break
2401 else:
2402 # vca_deployed not found
2403 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2404 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2405
2406 # get ee_id
2407 ee_id = vca_deployed.get("ee_id")
2408 if not ee_id:
2409 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2410 "execution environment"
2411 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2412
2413 if primitive == "config":
2414 primitive_params = {"params": primitive_params}
2415
2416 while retries >= 0:
2417 try:
2418 output = await self.n2vc.exec_primitive(
2419 ee_id=ee_id,
2420 primitive_name=primitive,
2421 params_dict=primitive_params
2422 )
2423 # execution was OK
2424 break
2425 except Exception as e:
2426 retries -= 1
2427 if retries >= 0:
2428 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2429 # wait and retry
2430 await asyncio.sleep(retries_interval, loop=self.loop)
2431 else:
2432 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2433
2434 return output, 'OK'
2435
2436 except Exception as e:
2437 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2438
2439 async def action(self, nsr_id, nslcmop_id):
2440
2441 # Try to lock HA task here
2442 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2443 if not task_is_locked_by_me:
2444 return
2445
2446 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2447 self.logger.debug(logging_text + "Enter")
2448 # get all needed from database
2449 db_nsr = None
2450 db_nslcmop = None
2451 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2452 "_admin.current-operation": nslcmop_id,
2453 "_admin.operation-type": "action"}
2454 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2455 db_nslcmop_update = {}
2456 nslcmop_operation_state = None
2457 nslcmop_operation_state_detail = None
2458 exc = None
2459 try:
2460 # wait for any previous tasks in process
2461 step = "Waiting for previous operations to terminate"
2462 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2463
2464 self._write_ns_status(
2465 nsr_id=nsr_id,
2466 ns_state=None,
2467 current_operation="RUNNING ACTION",
2468 current_operation_id=nslcmop_id
2469 )
2470
2471 step = "Getting information from database"
2472 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2473 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2474
2475 nsr_deployed = db_nsr["_admin"].get("deployed")
2476 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2477 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2478 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2479 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2480 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2481
2482 if vnf_index:
2483 step = "Getting vnfr from database"
2484 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2485 step = "Getting vnfd from database"
2486 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2487 else:
2488 if db_nsr.get("nsd"):
2489 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2490 else:
2491 step = "Getting nsd from database"
2492 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2493
2494 # for backward compatibility
2495 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2496 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2497 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2498 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2499
2500 primitive = db_nslcmop["operationParams"]["primitive"]
2501 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2502
2503 # look for primitive
2504 config_primitive_desc = None
2505 if vdu_id:
2506 for vdu in get_iterable(db_vnfd, "vdu"):
2507 if vdu_id == vdu["id"]:
2508 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2509 if config_primitive["name"] == primitive:
2510 config_primitive_desc = config_primitive
2511 break
2512 elif kdu_name:
2513 self.logger.debug(logging_text + "Checking actions in KDUs")
2514 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2515 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2516 if primitive_params:
2517 desc_params.update(primitive_params)
2518 # TODO Check if we will need something at vnf level
2519 index = 0
2520 for kdu in get_iterable(nsr_deployed, "K8s"):
2521 if kdu_name == kdu["kdu-name"]:
2522 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2523 "path": "_admin.deployed.K8s.{}".format(index)}
2524 if primitive == "upgrade":
2525 if desc_params.get("kdu_model"):
2526 kdu_model = desc_params.get("kdu_model")
2527 del desc_params["kdu_model"]
2528 else:
2529 kdu_model = kdu.get("kdu-model")
2530 parts = kdu_model.split(sep=":")
2531 if len(parts) == 2:
2532 kdu_model = parts[0]
2533
2534 if kdu.get("k8scluster-type") == "chart":
2535 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2536 kdu_instance=kdu.get("kdu-instance"),
2537 atomic=True, kdu_model=kdu_model,
2538 params=desc_params, db_dict=db_dict,
2539 timeout=300)
2540 elif kdu.get("k8scluster-type") == "juju":
2541 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2542 kdu_instance=kdu.get("kdu-instance"),
2543 atomic=True, kdu_model=kdu_model,
2544 params=desc_params, db_dict=db_dict,
2545 timeout=300)
2546
2547 else:
2548 msg = "k8scluster-type not defined"
2549 raise LcmException(msg)
2550
2551 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2552 break
2553 elif primitive == "rollback":
2554 if kdu.get("k8scluster-type") == "chart":
2555 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2556 kdu_instance=kdu.get("kdu-instance"),
2557 db_dict=db_dict)
2558 elif kdu.get("k8scluster-type") == "juju":
2559 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2560 kdu_instance=kdu.get("kdu-instance"),
2561 db_dict=db_dict)
2562 else:
2563 msg = "k8scluster-type not defined"
2564 raise LcmException(msg)
2565 break
2566 elif primitive == "status":
2567 if kdu.get("k8scluster-type") == "chart":
2568 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2569 kdu_instance=kdu.get("kdu-instance"))
2570 elif kdu.get("k8scluster-type") == "juju":
2571 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2572 kdu_instance=kdu.get("kdu-instance"))
2573 else:
2574 msg = "k8scluster-type not defined"
2575 raise LcmException(msg)
2576 break
2577 index += 1
2578
2579 else:
2580 raise LcmException("KDU '{}' not found".format(kdu_name))
2581 if output:
2582 db_nslcmop_update["detailed-status"] = output
2583 db_nslcmop_update["operationState"] = 'COMPLETED'
2584 db_nslcmop_update["statusEnteredTime"] = time()
2585 else:
2586 db_nslcmop_update["detailed-status"] = ''
2587 db_nslcmop_update["operationState"] = 'FAILED'
2588 db_nslcmop_update["statusEnteredTime"] = time()
2589 return
2590 elif vnf_index:
2591 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2592 if config_primitive["name"] == primitive:
2593 config_primitive_desc = config_primitive
2594 break
2595 else:
2596 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2597 if config_primitive["name"] == primitive:
2598 config_primitive_desc = config_primitive
2599 break
2600
2601 if not config_primitive_desc:
2602 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2603 format(primitive))
2604
2605 desc_params = {}
2606 if vnf_index:
2607 if db_vnfr.get("additionalParamsForVnf"):
2608 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2609 if vdu_id:
2610 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2611 if vdur.get("additionalParams"):
2612 desc_params = self._format_additional_params(vdur["additionalParams"])
2613 else:
2614 if db_nsr.get("additionalParamsForNs"):
2615 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2616
2617 # TODO check if ns is in a proper status
2618 output, detail = await self._ns_execute_primitive(
2619 db_deployed=nsr_deployed,
2620 member_vnf_index=vnf_index,
2621 vdu_id=vdu_id,
2622 vdu_name=vdu_name,
2623 vdu_count_index=vdu_count_index,
2624 primitive=primitive,
2625 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2626
2627 detailed_status = output
2628 if detail == 'OK':
2629 result = 'COMPLETED'
2630 else:
2631 result = 'FAILED'
2632
2633 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2634 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2635 db_nslcmop_update["statusEnteredTime"] = time()
2636 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2637 return # database update is called inside finally
2638
2639 except (DbException, LcmException) as e:
2640 self.logger.error(logging_text + "Exit Exception {}".format(e))
2641 exc = e
2642 except asyncio.CancelledError:
2643 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2644 exc = "Operation was cancelled"
2645 except Exception as e:
2646 exc = traceback.format_exc()
2647 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2648 finally:
2649 if exc and db_nslcmop:
2650 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2651 "FAILED {}: {}".format(step, exc)
2652 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2653 db_nslcmop_update["statusEnteredTime"] = time()
2654 try:
2655 if db_nslcmop_update:
2656 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2657 if db_nsr:
2658 db_nsr_update["_admin.nslcmop"] = None
2659 db_nsr_update["_admin.operation-type"] = None
2660 db_nsr_update["_admin.nslcmop"] = None
2661 db_nsr_update["_admin.current-operation"] = None
2662 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2663 self._write_ns_status(
2664 nsr_id=nsr_id,
2665 ns_state=None,
2666 current_operation="IDLE",
2667 current_operation_id=None
2668 )
2669 except DbException as e:
2670 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2671 self.logger.debug(logging_text + "Exit")
2672 if nslcmop_operation_state:
2673 try:
2674 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2675 "operationState": nslcmop_operation_state},
2676 loop=self.loop)
2677 except Exception as e:
2678 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2679 self.logger.debug(logging_text + "Exit")
2680 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2681 return nslcmop_operation_state, nslcmop_operation_state_detail
2682
2683 async def scale(self, nsr_id, nslcmop_id):
2684
2685 # Try to lock HA task here
2686 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2687 if not task_is_locked_by_me:
2688 return
2689
2690 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2691 self.logger.debug(logging_text + "Enter")
2692 # get all needed from database
2693 db_nsr = None
2694 db_nslcmop = None
2695 db_nslcmop_update = {}
2696 nslcmop_operation_state = None
2697 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2698 "_admin.current-operation": nslcmop_id,
2699 "_admin.operation-type": "scale"}
2700 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2701 exc = None
2702 # in case of error, indicates what part of scale was failed to put nsr at error status
2703 scale_process = None
2704 old_operational_status = ""
2705 old_config_status = ""
2706 vnfr_scaled = False
2707 try:
2708 # wait for any previous tasks in process
2709 step = "Waiting for previous operations to terminate"
2710 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2711
2712 self._write_ns_status(
2713 nsr_id=nsr_id,
2714 ns_state=None,
2715 current_operation="SCALING",
2716 current_operation_id=nslcmop_id
2717 )
2718
2719 step = "Getting nslcmop from database"
2720 self.logger.debug(step + " after having waited for previous tasks to be completed")
2721 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2722 step = "Getting nsr from database"
2723 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2724
2725 old_operational_status = db_nsr["operational-status"]
2726 old_config_status = db_nsr["config-status"]
2727 step = "Parsing scaling parameters"
2728 # self.logger.debug(step)
2729 db_nsr_update["operational-status"] = "scaling"
2730 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2731 nsr_deployed = db_nsr["_admin"].get("deployed")
2732
2733 #######
2734 nsr_deployed = db_nsr["_admin"].get("deployed")
2735 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2736 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2737 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2738 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2739 #######
2740
2741 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2742 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2743 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2744 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2745 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2746
2747 # for backward compatibility
2748 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2749 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2750 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2751 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2752
2753 step = "Getting vnfr from database"
2754 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2755 step = "Getting vnfd from database"
2756 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2757
2758 step = "Getting scaling-group-descriptor"
2759 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2760 if scaling_descriptor["name"] == scaling_group:
2761 break
2762 else:
2763 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2764 "at vnfd:scaling-group-descriptor".format(scaling_group))
2765
2766 # cooldown_time = 0
2767 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2768 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2769 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2770 # break
2771
2772 # TODO check if ns is in a proper status
2773 step = "Sending scale order to VIM"
2774 nb_scale_op = 0
2775 if not db_nsr["_admin"].get("scaling-group"):
2776 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2777 admin_scale_index = 0
2778 else:
2779 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2780 if admin_scale_info["name"] == scaling_group:
2781 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2782 break
2783 else: # not found, set index one plus last element and add new entry with the name
2784 admin_scale_index += 1
2785 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2786 RO_scaling_info = []
2787 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2788 if scaling_type == "SCALE_OUT":
2789 # count if max-instance-count is reached
2790 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2791 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2792 if nb_scale_op >= max_instance_count:
2793 raise LcmException("reached the limit of {} (max-instance-count) "
2794 "scaling-out operations for the "
2795 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2796
2797 nb_scale_op += 1
2798 vdu_scaling_info["scaling_direction"] = "OUT"
2799 vdu_scaling_info["vdu-create"] = {}
2800 for vdu_scale_info in scaling_descriptor["vdu"]:
2801 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2802 "type": "create", "count": vdu_scale_info.get("count", 1)})
2803 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2804
2805 elif scaling_type == "SCALE_IN":
2806 # count if min-instance-count is reached
2807 min_instance_count = 0
2808 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2809 min_instance_count = int(scaling_descriptor["min-instance-count"])
2810 if nb_scale_op <= min_instance_count:
2811 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2812 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2813 nb_scale_op -= 1
2814 vdu_scaling_info["scaling_direction"] = "IN"
2815 vdu_scaling_info["vdu-delete"] = {}
2816 for vdu_scale_info in scaling_descriptor["vdu"]:
2817 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2818 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2819 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2820
2821 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2822 vdu_create = vdu_scaling_info.get("vdu-create")
2823 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2824 if vdu_scaling_info["scaling_direction"] == "IN":
2825 for vdur in reversed(db_vnfr["vdur"]):
2826 if vdu_delete.get(vdur["vdu-id-ref"]):
2827 vdu_delete[vdur["vdu-id-ref"]] -= 1
2828 vdu_scaling_info["vdu"].append({
2829 "name": vdur["name"],
2830 "vdu_id": vdur["vdu-id-ref"],
2831 "interface": []
2832 })
2833 for interface in vdur["interfaces"]:
2834 vdu_scaling_info["vdu"][-1]["interface"].append({
2835 "name": interface["name"],
2836 "ip_address": interface["ip-address"],
2837 "mac_address": interface.get("mac-address"),
2838 })
2839 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2840
2841 # PRE-SCALE BEGIN
2842 step = "Executing pre-scale vnf-config-primitive"
2843 if scaling_descriptor.get("scaling-config-action"):
2844 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2845 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2846 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2847 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2848 step = db_nslcmop_update["detailed-status"] = \
2849 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2850
2851 # look for primitive
2852 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2853 if config_primitive["name"] == vnf_config_primitive:
2854 break
2855 else:
2856 raise LcmException(
2857 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2858 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2859 "primitive".format(scaling_group, config_primitive))
2860
2861 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2862 if db_vnfr.get("additionalParamsForVnf"):
2863 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2864
2865 scale_process = "VCA"
2866 db_nsr_update["config-status"] = "configuring pre-scaling"
2867 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2868
2869 # Pre-scale reintent check: Check if this sub-operation has been executed before
2870 op_index = self._check_or_add_scale_suboperation(
2871 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2872 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2873 # Skip sub-operation
2874 result = 'COMPLETED'
2875 result_detail = 'Done'
2876 self.logger.debug(logging_text +
2877 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2878 vnf_config_primitive, result, result_detail))
2879 else:
2880 if (op_index == self.SUBOPERATION_STATUS_NEW):
2881 # New sub-operation: Get index of this sub-operation
2882 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2883 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2884 format(vnf_config_primitive))
2885 else:
2886 # Reintent: Get registered params for this existing sub-operation
2887 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2888 vnf_index = op.get('member_vnf_index')
2889 vnf_config_primitive = op.get('primitive')
2890 primitive_params = op.get('primitive_params')
2891 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2892 format(vnf_config_primitive))
2893 # Execute the primitive, either with new (first-time) or registered (reintent) args
2894 result, result_detail = await self._ns_execute_primitive(
2895 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2896 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2897 vnf_config_primitive, result, result_detail))
2898 # Update operationState = COMPLETED | FAILED
2899 self._update_suboperation_status(
2900 db_nslcmop, op_index, result, result_detail)
2901
2902 if result == "FAILED":
2903 raise LcmException(result_detail)
2904 db_nsr_update["config-status"] = old_config_status
2905 scale_process = None
2906 # PRE-SCALE END
2907
2908 # SCALE RO - BEGIN
2909 # Should this block be skipped if 'RO_nsr_id' == None ?
2910 # if (RO_nsr_id and RO_scaling_info):
2911 if RO_scaling_info:
2912 scale_process = "RO"
2913 # Scale RO reintent check: Check if this sub-operation has been executed before
2914 op_index = self._check_or_add_scale_suboperation(
2915 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2916 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2917 # Skip sub-operation
2918 result = 'COMPLETED'
2919 result_detail = 'Done'
2920 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2921 result, result_detail))
2922 else:
2923 if (op_index == self.SUBOPERATION_STATUS_NEW):
2924 # New sub-operation: Get index of this sub-operation
2925 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2926 self.logger.debug(logging_text + "New sub-operation RO")
2927 else:
2928 # Reintent: Get registered params for this existing sub-operation
2929 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2930 RO_nsr_id = op.get('RO_nsr_id')
2931 RO_scaling_info = op.get('RO_scaling_info')
2932 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2933 vnf_config_primitive))
2934
2935 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2936 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2937 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2938 # wait until ready
2939 RO_nslcmop_id = RO_desc["instance_action_id"]
2940 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2941
2942 RO_task_done = False
2943 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2944 detailed_status_old = None
2945 self.logger.debug(logging_text + step)
2946
2947 deployment_timeout = 1 * 3600 # One hour
2948 while deployment_timeout > 0:
2949 if not RO_task_done:
2950 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2951 extra_item_id=RO_nslcmop_id)
2952 ns_status, ns_status_info = self.RO.check_action_status(desc)
2953 if ns_status == "ERROR":
2954 raise ROclient.ROClientException(ns_status_info)
2955 elif ns_status == "BUILD":
2956 detailed_status = step + "; {}".format(ns_status_info)
2957 elif ns_status == "ACTIVE":
2958 RO_task_done = True
2959 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2960 self.logger.debug(logging_text + step)
2961 else:
2962 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2963 else:
2964
2965 if ns_status == "ERROR":
2966 raise ROclient.ROClientException(ns_status_info)
2967 elif ns_status == "BUILD":
2968 detailed_status = step + "; {}".format(ns_status_info)
2969 elif ns_status == "ACTIVE":
2970 step = detailed_status = \
2971 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2972 if not vnfr_scaled:
2973 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2974 vnfr_scaled = True
2975 try:
2976 desc = await self.RO.show("ns", RO_nsr_id)
2977 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2978 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2979 break
2980 except LcmExceptionNoMgmtIP:
2981 pass
2982 else:
2983 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2984 if detailed_status != detailed_status_old:
2985 self._update_suboperation_status(
2986 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2987 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2988 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2989
2990 await asyncio.sleep(5, loop=self.loop)
2991 deployment_timeout -= 5
2992 if deployment_timeout <= 0:
2993 self._update_suboperation_status(
2994 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2995 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2996
2997 # update VDU_SCALING_INFO with the obtained ip_addresses
2998 if vdu_scaling_info["scaling_direction"] == "OUT":
2999 for vdur in reversed(db_vnfr["vdur"]):
3000 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3001 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3002 vdu_scaling_info["vdu"].append({
3003 "name": vdur["name"],
3004 "vdu_id": vdur["vdu-id-ref"],
3005 "interface": []
3006 })
3007 for interface in vdur["interfaces"]:
3008 vdu_scaling_info["vdu"][-1]["interface"].append({
3009 "name": interface["name"],
3010 "ip_address": interface["ip-address"],
3011 "mac_address": interface.get("mac-address"),
3012 })
3013 del vdu_scaling_info["vdu-create"]
3014
3015 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3016 # SCALE RO - END
3017
3018 scale_process = None
3019 if db_nsr_update:
3020 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3021
3022 # POST-SCALE BEGIN
3023 # execute primitive service POST-SCALING
3024 step = "Executing post-scale vnf-config-primitive"
3025 if scaling_descriptor.get("scaling-config-action"):
3026 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3027 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3028 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3029 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3030 step = db_nslcmop_update["detailed-status"] = \
3031 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3032
3033 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3034 if db_vnfr.get("additionalParamsForVnf"):
3035 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3036
3037 # look for primitive
3038 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3039 if config_primitive["name"] == vnf_config_primitive:
3040 break
3041 else:
3042 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3043 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3044 "match any vnf-configuration:config-primitive".format(scaling_group,
3045 config_primitive))
3046 scale_process = "VCA"
3047 db_nsr_update["config-status"] = "configuring post-scaling"
3048 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3049
3050 # Post-scale reintent check: Check if this sub-operation has been executed before
3051 op_index = self._check_or_add_scale_suboperation(
3052 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3053 if op_index == self.SUBOPERATION_STATUS_SKIP:
3054 # Skip sub-operation
3055 result = 'COMPLETED'
3056 result_detail = 'Done'
3057 self.logger.debug(logging_text +
3058 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3059 format(vnf_config_primitive, result, result_detail))
3060 else:
3061 if op_index == self.SUBOPERATION_STATUS_NEW:
3062 # New sub-operation: Get index of this sub-operation
3063 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3064 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3065 format(vnf_config_primitive))
3066 else:
3067 # Reintent: Get registered params for this existing sub-operation
3068 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3069 vnf_index = op.get('member_vnf_index')
3070 vnf_config_primitive = op.get('primitive')
3071 primitive_params = op.get('primitive_params')
3072 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3073 format(vnf_config_primitive))
3074 # Execute the primitive, either with new (first-time) or registered (reintent) args
3075 result, result_detail = await self._ns_execute_primitive(
3076 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3077 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3078 vnf_config_primitive, result, result_detail))
3079 # Update operationState = COMPLETED | FAILED
3080 self._update_suboperation_status(
3081 db_nslcmop, op_index, result, result_detail)
3082
3083 if result == "FAILED":
3084 raise LcmException(result_detail)
3085 db_nsr_update["config-status"] = old_config_status
3086 scale_process = None
3087 # POST-SCALE END
3088
3089 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3090 db_nslcmop_update["statusEnteredTime"] = time()
3091 db_nslcmop_update["detailed-status"] = "done"
3092 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3093 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3094 else old_operational_status
3095 db_nsr_update["config-status"] = old_config_status
3096 return
3097 except (ROclient.ROClientException, DbException, LcmException) as e:
3098 self.logger.error(logging_text + "Exit Exception {}".format(e))
3099 exc = e
3100 except asyncio.CancelledError:
3101 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3102 exc = "Operation was cancelled"
3103 except Exception as e:
3104 exc = traceback.format_exc()
3105 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3106 finally:
3107 if exc:
3108 if db_nslcmop:
3109 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3110 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3111 db_nslcmop_update["statusEnteredTime"] = time()
3112 if db_nsr:
3113 db_nsr_update["operational-status"] = old_operational_status
3114 db_nsr_update["config-status"] = old_config_status
3115 db_nsr_update["detailed-status"] = ""
3116 db_nsr_update["_admin.nslcmop"] = None
3117 if scale_process:
3118 if "VCA" in scale_process:
3119 db_nsr_update["config-status"] = "failed"
3120 if "RO" in scale_process:
3121 db_nsr_update["operational-status"] = "failed"
3122 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3123 exc)
3124 try:
3125 if db_nslcmop and db_nslcmop_update:
3126 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3127 if db_nsr:
3128 db_nsr_update["_admin.current-operation"] = None
3129 db_nsr_update["_admin.operation-type"] = None
3130 db_nsr_update["_admin.nslcmop"] = None
3131 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3132
3133 self._write_ns_status(
3134 nsr_id=nsr_id,
3135 ns_state=None,
3136 current_operation="IDLE",
3137 current_operation_id=None
3138 )
3139
3140 except DbException as e:
3141 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3142 if nslcmop_operation_state:
3143 try:
3144 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3145 "operationState": nslcmop_operation_state},
3146 loop=self.loop)
3147 # if cooldown_time:
3148 # await asyncio.sleep(cooldown_time, loop=self.loop)
3149 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3150 except Exception as e:
3151 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3152 self.logger.debug(logging_text + "Exit")
3153 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")