87eaecd578b9b6f2fdf50d2b1343eb7a8f8a38c4
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107
108 # create N2VC connector
109 self.n2vc = N2VCJujuConnector(
110 db=self.db,
111 fs=self.fs,
112 log=self.logger,
113 loop=self.loop,
114 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
115 username=self.vca_config.get('user', None),
116 vca_config=self.vca_config,
117 on_update_db=self._on_update_n2vc_db,
118 # ca_cert=self.vca_config.get('cacert'),
119 # api_proxy=self.vca_config.get('apiproxy'),
120 )
121
122 self.k8sclusterhelm = K8sHelmConnector(
123 kubectl_command=self.vca_config.get("kubectlpath"),
124 helm_command=self.vca_config.get("helmpath"),
125 fs=self.fs,
126 log=self.logger,
127 db=self.db,
128 on_update_db=None,
129 )
130
131 self.k8sclusterjuju = K8sJujuConnector(
132 kubectl_command=self.vca_config.get("kubectlpath"),
133 juju_command=self.vca_config.get("jujupath"),
134 fs=self.fs,
135 log=self.logger,
136 db=self.db,
137 on_update_db=None,
138 )
139
140 # create RO client
141 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
142
143 def _on_update_n2vc_db(self, table, filter, path, updated_data):
144
145 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
146 .format(table, filter, path, updated_data))
147
148 return
149 # write NS status to database
150 # try:
151 # # nsrs_id = filter.get('_id')
152 # # print(nsrs_id)
153 # # get ns record
154 # nsr = self.db.get_one(table=table, q_filter=filter)
155 # # get VCA deployed list
156 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
157 # # get RO deployed
158 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
159 # for vca in vca_list:
160 # # status = vca.get('status')
161 # # print(status)
162 # # detailed_status = vca.get('detailed-status')
163 # # print(detailed_status)
164 # # for ro in ro_list:
165 # # print(ro)
166 #
167 # except Exception as e:
168 # self.logger.error('Error writing NS status to db: {}'.format(e))
169
170 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
171 """
172 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
173 :param vnfd: input vnfd
174 :param new_id: overrides vnf id if provided
175 :param additionalParams: Instantiation params for VNFs provided
176 :param nsrId: Id of the NSR
177 :return: copy of vnfd
178 """
179 try:
180 vnfd_RO = deepcopy(vnfd)
181 # remove unused by RO configuration, monitoring, scaling and internal keys
182 vnfd_RO.pop("_id", None)
183 vnfd_RO.pop("_admin", None)
184 vnfd_RO.pop("vnf-configuration", None)
185 vnfd_RO.pop("monitoring-param", None)
186 vnfd_RO.pop("scaling-group-descriptor", None)
187 vnfd_RO.pop("kdu", None)
188 vnfd_RO.pop("k8s-cluster", None)
189 if new_id:
190 vnfd_RO["id"] = new_id
191
192 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
193 for vdu in get_iterable(vnfd_RO, "vdu"):
194 cloud_init_file = None
195 if vdu.get("cloud-init-file"):
196 base_folder = vnfd["_admin"]["storage"]
197 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
198 vdu["cloud-init-file"])
199 with self.fs.file_open(cloud_init_file, "r") as ci_file:
200 cloud_init_content = ci_file.read()
201 vdu.pop("cloud-init-file", None)
202 elif vdu.get("cloud-init"):
203 cloud_init_content = vdu["cloud-init"]
204 else:
205 continue
206
207 env = Environment()
208 ast = env.parse(cloud_init_content)
209 mandatory_vars = meta.find_undeclared_variables(ast)
210 if mandatory_vars:
211 for var in mandatory_vars:
212 if not additionalParams or var not in additionalParams.keys():
213 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
214 "file, must be provided in the instantiation parameters inside the "
215 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
216 template = Template(cloud_init_content)
217 cloud_init_content = template.render(additionalParams or {})
218 vdu["cloud-init"] = cloud_init_content
219
220 return vnfd_RO
221 except FsException as e:
222 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
223 format(vnfd["id"], vdu["id"], cloud_init_file, e))
224 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
225 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
226 format(vnfd["id"], vdu["id"], e))
227
228 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
229 """
230 Creates a RO ns descriptor from OSM ns_instantiate params
231 :param ns_params: OSM instantiate params
232 :return: The RO ns descriptor
233 """
234 vim_2_RO = {}
235 wim_2_RO = {}
236 # TODO feature 1417: Check that no instantiation is set over PDU
237 # check if PDU forces a concrete vim-network-id and add it
238 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
239
240 def vim_account_2_RO(vim_account):
241 if vim_account in vim_2_RO:
242 return vim_2_RO[vim_account]
243
244 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
245 if db_vim["_admin"]["operationalState"] != "ENABLED":
246 raise LcmException("VIM={} is not available. operationalState={}".format(
247 vim_account, db_vim["_admin"]["operationalState"]))
248 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
249 vim_2_RO[vim_account] = RO_vim_id
250 return RO_vim_id
251
252 def wim_account_2_RO(wim_account):
253 if isinstance(wim_account, str):
254 if wim_account in wim_2_RO:
255 return wim_2_RO[wim_account]
256
257 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
258 if db_wim["_admin"]["operationalState"] != "ENABLED":
259 raise LcmException("WIM={} is not available. operationalState={}".format(
260 wim_account, db_wim["_admin"]["operationalState"]))
261 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
262 wim_2_RO[wim_account] = RO_wim_id
263 return RO_wim_id
264 else:
265 return wim_account
266
267 def ip_profile_2_RO(ip_profile):
268 RO_ip_profile = deepcopy((ip_profile))
269 if "dns-server" in RO_ip_profile:
270 if isinstance(RO_ip_profile["dns-server"], list):
271 RO_ip_profile["dns-address"] = []
272 for ds in RO_ip_profile.pop("dns-server"):
273 RO_ip_profile["dns-address"].append(ds['address'])
274 else:
275 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
276 if RO_ip_profile.get("ip-version") == "ipv4":
277 RO_ip_profile["ip-version"] = "IPv4"
278 if RO_ip_profile.get("ip-version") == "ipv6":
279 RO_ip_profile["ip-version"] = "IPv6"
280 if "dhcp-params" in RO_ip_profile:
281 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
282 return RO_ip_profile
283
284 if not ns_params:
285 return None
286 RO_ns_params = {
287 # "name": ns_params["nsName"],
288 # "description": ns_params.get("nsDescription"),
289 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
290 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
291 # "scenario": ns_params["nsdId"],
292 }
293
294 n2vc_key_list = n2vc_key_list or []
295 for vnfd_ref, vnfd in vnfd_dict.items():
296 vdu_needed_access = []
297 mgmt_cp = None
298 if vnfd.get("vnf-configuration"):
299 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
300 if ssh_required and vnfd.get("mgmt-interface"):
301 if vnfd["mgmt-interface"].get("vdu-id"):
302 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
303 elif vnfd["mgmt-interface"].get("cp"):
304 mgmt_cp = vnfd["mgmt-interface"]["cp"]
305
306 for vdu in vnfd.get("vdu", ()):
307 if vdu.get("vdu-configuration"):
308 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
309 if ssh_required:
310 vdu_needed_access.append(vdu["id"])
311 elif mgmt_cp:
312 for vdu_interface in vdu.get("interface"):
313 if vdu_interface.get("external-connection-point-ref") and \
314 vdu_interface["external-connection-point-ref"] == mgmt_cp:
315 vdu_needed_access.append(vdu["id"])
316 mgmt_cp = None
317 break
318
319 if vdu_needed_access:
320 for vnf_member in nsd.get("constituent-vnfd"):
321 if vnf_member["vnfd-id-ref"] != vnfd_ref:
322 continue
323 for vdu in vdu_needed_access:
324 populate_dict(RO_ns_params,
325 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
326 n2vc_key_list)
327
328 if ns_params.get("vduImage"):
329 RO_ns_params["vduImage"] = ns_params["vduImage"]
330
331 if ns_params.get("ssh_keys"):
332 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
333 for vnf_params in get_iterable(ns_params, "vnf"):
334 for constituent_vnfd in nsd["constituent-vnfd"]:
335 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
336 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
337 break
338 else:
339 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
340 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
341 if vnf_params.get("vimAccountId"):
342 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
343 vim_account_2_RO(vnf_params["vimAccountId"]))
344
345 for vdu_params in get_iterable(vnf_params, "vdu"):
346 # TODO feature 1417: check that this VDU exist and it is not a PDU
347 if vdu_params.get("volume"):
348 for volume_params in vdu_params["volume"]:
349 if volume_params.get("vim-volume-id"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
352 volume_params["vim-volume-id"])
353 if vdu_params.get("interface"):
354 for interface_params in vdu_params["interface"]:
355 if interface_params.get("ip-address"):
356 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
357 vdu_params["id"], "interfaces", interface_params["name"],
358 "ip_address"),
359 interface_params["ip-address"])
360 if interface_params.get("mac-address"):
361 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
362 vdu_params["id"], "interfaces", interface_params["name"],
363 "mac_address"),
364 interface_params["mac-address"])
365 if interface_params.get("floating-ip-required"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
367 vdu_params["id"], "interfaces", interface_params["name"],
368 "floating-ip"),
369 interface_params["floating-ip-required"])
370
371 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
372 if internal_vld_params.get("vim-network-name"):
373 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
374 internal_vld_params["name"], "vim-network-name"),
375 internal_vld_params["vim-network-name"])
376 if internal_vld_params.get("vim-network-id"):
377 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
378 internal_vld_params["name"], "vim-network-id"),
379 internal_vld_params["vim-network-id"])
380 if internal_vld_params.get("ip-profile"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
382 internal_vld_params["name"], "ip-profile"),
383 ip_profile_2_RO(internal_vld_params["ip-profile"]))
384 if internal_vld_params.get("provider-network"):
385
386 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
387 internal_vld_params["name"], "provider-network"),
388 internal_vld_params["provider-network"].copy())
389
390 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
391 # look for interface
392 iface_found = False
393 for vdu_descriptor in vnf_descriptor["vdu"]:
394 for vdu_interface in vdu_descriptor["interface"]:
395 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
396 if icp_params.get("ip-address"):
397 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
398 vdu_descriptor["id"], "interfaces",
399 vdu_interface["name"], "ip_address"),
400 icp_params["ip-address"])
401
402 if icp_params.get("mac-address"):
403 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
404 vdu_descriptor["id"], "interfaces",
405 vdu_interface["name"], "mac_address"),
406 icp_params["mac-address"])
407 iface_found = True
408 break
409 if iface_found:
410 break
411 else:
412 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
413 "internal-vld:id-ref={} is not present at vnfd:internal-"
414 "connection-point".format(vnf_params["member-vnf-index"],
415 icp_params["id-ref"]))
416
417 for vld_params in get_iterable(ns_params, "vld"):
418 if "ip-profile" in vld_params:
419 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
420 ip_profile_2_RO(vld_params["ip-profile"]))
421
422 if vld_params.get("provider-network"):
423
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
425 vld_params["provider-network"].copy())
426
427 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
429 wim_account_2_RO(vld_params["wimAccountId"])),
430 if vld_params.get("vim-network-name"):
431 RO_vld_sites = []
432 if isinstance(vld_params["vim-network-name"], dict):
433 for vim_account, vim_net in vld_params["vim-network-name"].items():
434 RO_vld_sites.append({
435 "netmap-use": vim_net,
436 "datacenter": vim_account_2_RO(vim_account)
437 })
438 else: # isinstance str
439 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
440 if RO_vld_sites:
441 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
442
443 if vld_params.get("vim-network-id"):
444 RO_vld_sites = []
445 if isinstance(vld_params["vim-network-id"], dict):
446 for vim_account, vim_net in vld_params["vim-network-id"].items():
447 RO_vld_sites.append({
448 "netmap-use": vim_net,
449 "datacenter": vim_account_2_RO(vim_account)
450 })
451 else: # isinstance str
452 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
453 if RO_vld_sites:
454 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
455 if vld_params.get("ns-net"):
456 if isinstance(vld_params["ns-net"], dict):
457 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
458 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
459 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
460 if "vnfd-connection-point-ref" in vld_params:
461 for cp_params in vld_params["vnfd-connection-point-ref"]:
462 # look for interface
463 for constituent_vnfd in nsd["constituent-vnfd"]:
464 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
465 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
466 break
467 else:
468 raise LcmException(
469 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
470 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
471 match_cp = False
472 for vdu_descriptor in vnf_descriptor["vdu"]:
473 for interface_descriptor in vdu_descriptor["interface"]:
474 if interface_descriptor.get("external-connection-point-ref") == \
475 cp_params["vnfd-connection-point-ref"]:
476 match_cp = True
477 break
478 if match_cp:
479 break
480 else:
481 raise LcmException(
482 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
483 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
484 cp_params["member-vnf-index-ref"],
485 cp_params["vnfd-connection-point-ref"],
486 vnf_descriptor["id"]))
487 if cp_params.get("ip-address"):
488 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
489 vdu_descriptor["id"], "interfaces",
490 interface_descriptor["name"], "ip_address"),
491 cp_params["ip-address"])
492 if cp_params.get("mac-address"):
493 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
494 vdu_descriptor["id"], "interfaces",
495 interface_descriptor["name"], "mac_address"),
496 cp_params["mac-address"])
497 return RO_ns_params
498
499 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
500 # make a copy to do not change
501 vdu_create = copy(vdu_create)
502 vdu_delete = copy(vdu_delete)
503
504 vdurs = db_vnfr.get("vdur")
505 if vdurs is None:
506 vdurs = []
507 vdu_index = len(vdurs)
508 while vdu_index:
509 vdu_index -= 1
510 vdur = vdurs[vdu_index]
511 if vdur.get("pdu-type"):
512 continue
513 vdu_id_ref = vdur["vdu-id-ref"]
514 if vdu_create and vdu_create.get(vdu_id_ref):
515 for index in range(0, vdu_create[vdu_id_ref]):
516 vdur = deepcopy(vdur)
517 vdur["_id"] = str(uuid4())
518 vdur["count-index"] += 1
519 vdurs.insert(vdu_index+1+index, vdur)
520 del vdu_create[vdu_id_ref]
521 if vdu_delete and vdu_delete.get(vdu_id_ref):
522 del vdurs[vdu_index]
523 vdu_delete[vdu_id_ref] -= 1
524 if not vdu_delete[vdu_id_ref]:
525 del vdu_delete[vdu_id_ref]
526 # check all operations are done
527 if vdu_create or vdu_delete:
528 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_create))
530 if vdu_delete:
531 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
532 vdu_delete))
533
534 vnfr_update = {"vdur": vdurs}
535 db_vnfr["vdur"] = vdurs
536 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
537
538 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
539 """
540 Updates database nsr with the RO info for the created vld
541 :param ns_update_nsr: dictionary to be filled with the updated info
542 :param db_nsr: content of db_nsr. This is also modified
543 :param nsr_desc_RO: nsr descriptor from RO
544 :return: Nothing, LcmException is raised on errors
545 """
546
547 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
548 for net_RO in get_iterable(nsr_desc_RO, "nets"):
549 if vld["id"] != net_RO.get("ns_net_osm_id"):
550 continue
551 vld["vim-id"] = net_RO.get("vim_net_id")
552 vld["name"] = net_RO.get("vim_name")
553 vld["status"] = net_RO.get("status")
554 vld["status-detailed"] = net_RO.get("error_msg")
555 ns_update_nsr["vld.{}".format(vld_index)] = vld
556 break
557 else:
558 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
559
560 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
561 """
562 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
563 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
564 :param nsr_desc_RO: nsr descriptor from RO
565 :return: Nothing, LcmException is raised on errors
566 """
567 for vnf_index, db_vnfr in db_vnfrs.items():
568 for vnf_RO in nsr_desc_RO["vnfs"]:
569 if vnf_RO["member_vnf_index"] != vnf_index:
570 continue
571 vnfr_update = {}
572 if vnf_RO.get("ip_address"):
573 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
574 elif not db_vnfr.get("ip-address"):
575 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
576
577 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
578 vdur_RO_count_index = 0
579 if vdur.get("pdu-type"):
580 continue
581 for vdur_RO in get_iterable(vnf_RO, "vms"):
582 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
583 continue
584 if vdur["count-index"] != vdur_RO_count_index:
585 vdur_RO_count_index += 1
586 continue
587 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
588 if vdur_RO.get("ip_address"):
589 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
590 else:
591 vdur["ip-address"] = None
592 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
593 vdur["name"] = vdur_RO.get("vim_name")
594 vdur["status"] = vdur_RO.get("status")
595 vdur["status-detailed"] = vdur_RO.get("error_msg")
596 for ifacer in get_iterable(vdur, "interfaces"):
597 for interface_RO in get_iterable(vdur_RO, "interfaces"):
598 if ifacer["name"] == interface_RO.get("internal_name"):
599 ifacer["ip-address"] = interface_RO.get("ip_address")
600 ifacer["mac-address"] = interface_RO.get("mac_address")
601 break
602 else:
603 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
604 "from VIM info"
605 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
606 vnfr_update["vdur.{}".format(vdu_index)] = vdur
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
610 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
611
612 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
613 for net_RO in get_iterable(nsr_desc_RO, "nets"):
614 if vld["id"] != net_RO.get("vnf_net_osm_id"):
615 continue
616 vld["vim-id"] = net_RO.get("vim_net_id")
617 vld["name"] = net_RO.get("vim_name")
618 vld["status"] = net_RO.get("status")
619 vld["status-detailed"] = net_RO.get("error_msg")
620 vnfr_update["vld.{}".format(vld_index)] = vld
621 break
622 else:
623 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
624 vnf_index, vld["id"]))
625
626 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
627 break
628
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
631
632 def _get_ns_config_info(self, nsr_id):
633 """
634 Generates a mapping between vnf,vdu elements and the N2VC id
635 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
636 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
637 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
638 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
639 """
640 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
641 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
642 mapping = {}
643 ns_config_info = {"osm-config-mapping": mapping}
644 for vca in vca_deployed_list:
645 if not vca["member-vnf-index"]:
646 continue
647 if not vca["vdu_id"]:
648 mapping[vca["member-vnf-index"]] = vca["application"]
649 else:
650 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
651 vca["application"]
652 return ns_config_info
653
654 @staticmethod
655 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
656 """
657 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
658 primitives as verify-ssh-credentials, or config when needed
659 :param desc_primitive_list: information of the descriptor
660 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
661 this element contains a ssh public key
662 :return: The modified list. Can ba an empty list, but always a list
663 """
664 if desc_primitive_list:
665 primitive_list = desc_primitive_list.copy()
666 else:
667 primitive_list = []
668 # look for primitive config, and get the position. None if not present
669 config_position = None
670 for index, primitive in enumerate(primitive_list):
671 if primitive["name"] == "config":
672 config_position = index
673 break
674
675 # for NS, add always a config primitive if not present (bug 874)
676 if not vca_deployed["member-vnf-index"] and config_position is None:
677 primitive_list.insert(0, {"name": "config", "parameter": []})
678 config_position = 0
679 # for VNF/VDU add verify-ssh-credentials after config
680 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
681 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
682 return primitive_list
683
684 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
685 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
686
687 db_nsr_update = {}
688 RO_descriptor_number = 0 # number of descriptors created at RO
689 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
690 start_deploy = time()
691 vdu_flag = False # If any of the VNFDs has VDUs
692 ns_params = db_nslcmop.get("operationParams")
693
694 # deploy RO
695
696 # get vnfds, instantiate at RO
697
698 for c_vnf in nsd.get("constituent-vnfd", ()):
699 member_vnf_index = c_vnf["member-vnf-index"]
700 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
701 if vnfd.get("vdu"):
702 vdu_flag = True
703 vnfd_ref = vnfd["id"]
704 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
705 " RO".format(vnfd_ref, member_vnf_index)
706 # self.logger.debug(logging_text + step)
707 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
708 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
709 RO_descriptor_number += 1
710
711 # look position at deployed.RO.vnfd if not present it will be appended at the end
712 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
713 if vnf_deployed["member-vnf-index"] == member_vnf_index:
714 break
715 else:
716 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
717 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
718
719 # look if present
720 RO_update = {"member-vnf-index": member_vnf_index}
721 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
722 if vnfd_list:
723 RO_update["id"] = vnfd_list[0]["uuid"]
724 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
725 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
726 else:
727 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
728 get("additionalParamsForVnf"), nsr_id)
729 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
730 RO_update["id"] = desc["uuid"]
731 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
732 vnfd_ref, member_vnf_index, desc["uuid"]))
733 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
734 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
735 self.update_db_2("nsrs", nsr_id, db_nsr_update)
736 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
737
738 # create nsd at RO
739 nsd_ref = nsd["id"]
740 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
741 # self.logger.debug(logging_text + step)
742
743 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
744 RO_descriptor_number += 1
745 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
746 if nsd_list:
747 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
748 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
749 nsd_ref, RO_nsd_uuid))
750 else:
751 nsd_RO = deepcopy(nsd)
752 nsd_RO["id"] = RO_osm_nsd_id
753 nsd_RO.pop("_id", None)
754 nsd_RO.pop("_admin", None)
755 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
756 member_vnf_index = c_vnf["member-vnf-index"]
757 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
758 for c_vld in nsd_RO.get("vld", ()):
759 for cp in c_vld.get("vnfd-connection-point-ref", ()):
760 member_vnf_index = cp["member-vnf-index-ref"]
761 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
762
763 desc = await self.RO.create("nsd", descriptor=nsd_RO)
764 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
765 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
766 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
767 self.update_db_2("nsrs", nsr_id, db_nsr_update)
768 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
769
770 # Crate ns at RO
771 # if present use it unless in error status
772 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
773 if RO_nsr_id:
774 try:
775 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
776 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
777 desc = await self.RO.show("ns", RO_nsr_id)
778 except ROclient.ROClientException as e:
779 if e.http_code != HTTPStatus.NOT_FOUND:
780 raise
781 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
782 if RO_nsr_id:
783 ns_status, ns_status_info = self.RO.check_ns_status(desc)
784 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
787 .format(RO_nsr_id)
788 self.logger.debug(logging_text + step)
789 await self.RO.delete("ns", RO_nsr_id)
790 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
791 if not RO_nsr_id:
792 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
793 # self.logger.debug(logging_text + step)
794
795 # check if VIM is creating and wait look if previous tasks in process
796 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
797 if task_dependency:
798 step = "Waiting for related tasks to be completed: {}".format(task_name)
799 self.logger.debug(logging_text + step)
800 await asyncio.wait(task_dependency, timeout=3600)
801 if ns_params.get("vnf"):
802 for vnf in ns_params["vnf"]:
803 if "vimAccountId" in vnf:
804 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
805 vnf["vimAccountId"])
806 if task_dependency:
807 step = "Waiting for related tasks to be completed: {}".format(task_name)
808 self.logger.debug(logging_text + step)
809 await asyncio.wait(task_dependency, timeout=3600)
810
811 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
812
813 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
814
815 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
816 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
817 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
818 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
819 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
820 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
821 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
822 self.update_db_2("nsrs", nsr_id, db_nsr_update)
823 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
824
825 # wait until NS is ready
826 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
827 detailed_status_old = None
828 self.logger.debug(logging_text + step)
829
830 while time() <= start_deploy + self.total_deploy_timeout:
831 desc = await self.RO.show("ns", RO_nsr_id)
832 ns_status, ns_status_info = self.RO.check_ns_status(desc)
833 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
834 if ns_status == "ERROR":
835 raise ROclient.ROClientException(ns_status_info)
836 elif ns_status == "BUILD":
837 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
838 elif ns_status == "ACTIVE":
839 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
840 try:
841 if vdu_flag:
842 self.ns_update_vnfr(db_vnfrs, desc)
843 break
844 except LcmExceptionNoMgmtIP:
845 pass
846 else:
847 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
848 if detailed_status != detailed_status_old:
849 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
852 await asyncio.sleep(5, loop=self.loop)
853 else: # total_deploy_timeout
854 raise ROclient.ROClientException("Timeout waiting ns to be ready")
855
856 step = "Updating NSR"
857 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
858
859 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
860 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self.update_db_2("nsrs", nsr_id, db_nsr_update)
863 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
864
865 step = "Deployed at VIM"
866 self.logger.debug(logging_text + step)
867
868 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
869 """
870 Wait for ip addres at RO, and optionally, insert public key in virtual machine
871 :param logging_text: prefix use for logging
872 :param nsr_id:
873 :param vnfr_id:
874 :param vdu_id:
875 :param vdu_index:
876 :param pub_key: public ssh key to inject, None to skip
877 :param user: user to apply the public ssh key
878 :return: IP address
879 """
880
881 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
882 ro_nsr_id = None
883 ip_address = None
884 nb_tries = 0
885 target_vdu_id = None
886 ro_retries = 0
887
888 while True:
889
890 ro_retries += 1
891 if ro_retries >= 360: # 1 hour
892 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
893
894 await asyncio.sleep(10, loop=self.loop)
895 # wait until NS is deployed at RO
896 if not ro_nsr_id:
897 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
898 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
899 if not ro_nsr_id:
900 continue
901
902 # get ip address
903 if not target_vdu_id:
904 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
905
906 if not vdu_id: # for the VNF case
907 ip_address = db_vnfr.get("ip-address")
908 if not ip_address:
909 continue
910 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
911 else: # VDU case
912 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
913 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
914
915 if not vdur:
916 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
917 vnfr_id, vdu_id, vdu_index
918 ))
919
920 if vdur.get("status") == "ACTIVE":
921 ip_address = vdur.get("ip-address")
922 if not ip_address:
923 continue
924 target_vdu_id = vdur["vdu-id-ref"]
925 elif vdur.get("status") == "ERROR":
926 raise LcmException("Cannot inject ssh-key because target VM is in error state")
927
928 if not target_vdu_id:
929 continue
930
931 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
932
933 # inject public key into machine
934 if pub_key and user:
935 # self.logger.debug(logging_text + "Inserting RO key")
936 try:
937 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
938 result_dict = await self.RO.create_action(
939 item="ns",
940 item_id_name=ro_nsr_id,
941 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
942 )
943 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
944 if not result_dict or not isinstance(result_dict, dict):
945 raise LcmException("Unknown response from RO when injecting key")
946 for result in result_dict.values():
947 if result.get("vim_result") == 200:
948 break
949 else:
950 raise ROclient.ROClientException("error injecting key: {}".format(
951 result.get("description")))
952 break
953 except ROclient.ROClientException as e:
954 if not nb_tries:
955 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
956 format(e, 20*10))
957 nb_tries += 1
958 if nb_tries >= 20:
959 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
960 else:
961 break
962
963 return ip_address
964
965 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
966 """
967 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
968 """
969 my_vca = vca_deployed_list[vca_index]
970 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
971 return
972 timeout = 300
973 while timeout >= 0:
974 for index, vca_deployed in enumerate(vca_deployed_list):
975 if index == vca_index:
976 continue
977 if not my_vca.get("member-vnf-index") or \
978 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
979 if not vca_deployed.get("instantiation"):
980 break # wait
981 if vca_deployed["instantiation"] == "FAILED":
982 raise LcmException("Configuration aborted because dependent charm/s has failed")
983 else:
984 return
985 await asyncio.sleep(10)
986 timeout -= 1
987 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
988 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
989
990 raise LcmException("Configuration aborted because dependent charm/s timeout")
991
992 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
993 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
994 nsr_id = db_nsr["_id"]
995 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
996 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
997 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
998 db_dict = {
999 'collection': 'nsrs',
1000 'filter': {'_id': nsr_id},
1001 'path': db_update_entry
1002 }
1003 step = ""
1004 try:
1005 vnfr_id = None
1006 if db_vnfr:
1007 vnfr_id = db_vnfr["_id"]
1008
1009 namespace = "{nsi}.{ns}".format(
1010 nsi=nsi_id if nsi_id else "",
1011 ns=nsr_id)
1012 if vnfr_id:
1013 namespace += "." + vnfr_id
1014 if vdu_id:
1015 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1016
1017 # Get artifact path
1018 artifact_path = "{}/{}/charms/{}".format(
1019 base_folder["folder"],
1020 base_folder["pkg-dir"],
1021 config_descriptor["juju"]["charm"]
1022 )
1023
1024 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1025 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1026 is_proxy_charm = False
1027
1028 # n2vc_redesign STEP 3.1
1029
1030 # find old ee_id if exists
1031 ee_id = vca_deployed.get("ee_id")
1032
1033 # create or register execution environment in VCA
1034 if is_proxy_charm:
1035 step = "create execution environment"
1036 self.logger.debug(logging_text + step)
1037 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1038 reuse_ee_id=ee_id,
1039 db_dict=db_dict)
1040 else:
1041 step = "Waiting to VM being up and getting IP address"
1042 self.logger.debug(logging_text + step)
1043 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1044 user=None, pub_key=None)
1045 credentials = {"hostname": rw_mgmt_ip}
1046 # get username
1047 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1048 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1049 # merged. Meanwhile let's get username from initial-config-primitive
1050 if not username and config_descriptor.get("initial-config-primitive"):
1051 for config_primitive in config_descriptor["initial-config-primitive"]:
1052 for param in config_primitive.get("parameter", ()):
1053 if param["name"] == "ssh-username":
1054 username = param["value"]
1055 break
1056 if not username:
1057 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1058 "'config-access.ssh-access.default-user'")
1059 credentials["username"] = username
1060 # n2vc_redesign STEP 3.2
1061
1062 step = "register execution environment {}".format(credentials)
1063 self.logger.debug(logging_text + step)
1064 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1065 db_dict=db_dict)
1066
1067 # for compatibility with MON/POL modules, the need model and application name at database
1068 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1069 ee_id_parts = ee_id.split('.')
1070 model_name = ee_id_parts[0]
1071 application_name = ee_id_parts[1]
1072 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1073 db_update_entry + "application": application_name,
1074 db_update_entry + "ee_id": ee_id})
1075
1076 # n2vc_redesign STEP 3.3
1077
1078 step = "Install configuration Software"
1079 # TODO check if already done
1080 self.logger.debug(logging_text + step)
1081 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1082
1083 # if SSH access is required, then get execution environment SSH public
1084 if is_proxy_charm: # if native charm we have waited already to VM be UP
1085 pub_key = None
1086 user = None
1087 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1088 # Needed to inject a ssh key
1089 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1090 step = "Install configuration Software, getting public ssh key"
1091 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1092
1093 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1094 else:
1095 step = "Waiting to VM being up and getting IP address"
1096 self.logger.debug(logging_text + step)
1097
1098 # n2vc_redesign STEP 5.1
1099 # wait for RO (ip-address) Insert pub_key into VM
1100 if vnfr_id:
1101 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1102 user=user, pub_key=pub_key)
1103 else:
1104 rw_mgmt_ip = None # This is for a NS configuration
1105
1106 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1107
1108 # store rw_mgmt_ip in deploy params for later replacement
1109 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1110
1111 # n2vc_redesign STEP 6 Execute initial config primitive
1112 step = 'execute initial config primitive'
1113 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1114
1115 # sort initial config primitives by 'seq'
1116 try:
1117 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1118 except Exception as e:
1119 self.logger.error(logging_text + step + ": " + str(e))
1120
1121 # add config if not present for NS charm
1122 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1123 vca_deployed)
1124 if initial_config_primitive_list:
1125 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1126 for initial_config_primitive in initial_config_primitive_list:
1127 # adding information on the vca_deployed if it is a NS execution environment
1128 if not vca_deployed["member-vnf-index"]:
1129 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1130 # TODO check if already done
1131 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1132
1133 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1134 self.logger.debug(logging_text + step)
1135 await self.n2vc.exec_primitive(
1136 ee_id=ee_id,
1137 primitive_name=initial_config_primitive["name"],
1138 params_dict=primitive_params_,
1139 db_dict=db_dict
1140 )
1141 # TODO register in database that primitive is done
1142
1143 step = "instantiated at VCA"
1144 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "COMPLETED"})
1145 self.logger.debug(logging_text + step)
1146
1147 except Exception as e: # TODO not use Exception but N2VC exception
1148 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1149 raise Exception("{} {}".format(step, e)) from e
1150 # TODO raise N2VC exception with 'step' extra information
1151
1152 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1153 error_description: str = None, error_detail: str = None):
1154 try:
1155 db_dict = dict()
1156 if ns_state:
1157 db_dict["nsState"] = ns_state
1158 db_dict["currentOperation"] = current_operation
1159 db_dict["currentOperationID"] = current_operation_id
1160 db_dict["errorDescription"] = error_description
1161 db_dict["errorDetail"] = error_detail
1162 self.update_db_2("nsrs", nsr_id, db_dict)
1163 except Exception as e:
1164 self.logger.warn('Error writing NS status: {}'.format(e))
1165
1166 async def instantiate(self, nsr_id, nslcmop_id):
1167 """
1168
1169 :param nsr_id: ns instance to deploy
1170 :param nslcmop_id: operation to run
1171 :return:
1172 """
1173
1174 # Try to lock HA task here
1175 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1176 if not task_is_locked_by_me:
1177 self.logger.debug('instantiate() task is not locked by me')
1178 return
1179
1180 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1181 self.logger.debug(logging_text + "Enter")
1182
1183 # get all needed from database
1184
1185 # database nsrs record
1186 db_nsr = None
1187
1188 # database nslcmops record
1189 db_nslcmop = None
1190
1191 # update operation on nsrs
1192 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1193 "_admin.current-operation": nslcmop_id,
1194 "_admin.operation-type": "instantiate"}
1195 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1196
1197 # update operation on nslcmops
1198 db_nslcmop_update = {}
1199
1200 nslcmop_operation_state = None
1201 db_vnfrs = {} # vnf's info indexed by member-index
1202 # n2vc_info = {}
1203 task_instantiation_list = []
1204 task_instantiation_info = {} # from task to info text
1205 exc = None
1206 try:
1207 # wait for any previous tasks in process
1208 step = "Waiting for previous operations to terminate"
1209 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1210
1211 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1212
1213 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1214 self._write_ns_status(
1215 nsr_id=nsr_id,
1216 ns_state="BUILDING",
1217 current_operation="INSTANTIATING",
1218 current_operation_id=nslcmop_id
1219 )
1220
1221 # read from db: operation
1222 step = "Getting nslcmop={} from db".format(nslcmop_id)
1223 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1224
1225 # read from db: ns
1226 step = "Getting nsr={} from db".format(nsr_id)
1227 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1228 # nsd is replicated into ns (no db read)
1229 nsd = db_nsr["nsd"]
1230 # nsr_name = db_nsr["name"] # TODO short-name??
1231
1232 # read from db: vnf's of this ns
1233 step = "Getting vnfrs from db"
1234 self.logger.debug(logging_text + step)
1235 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1236
1237 # read from db: vnfd's for every vnf
1238 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1239 db_vnfds = {} # every vnfd data indexed by vnf id
1240 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1241
1242 # for each vnf in ns, read vnfd
1243 for vnfr in db_vnfrs_list:
1244 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1245 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1246 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1247 # if we haven't this vnfd, read it from db
1248 if vnfd_id not in db_vnfds:
1249 # read from cb
1250 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1251 self.logger.debug(logging_text + step)
1252 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1253
1254 # store vnfd
1255 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1256 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1257 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1258
1259 # Get or generates the _admin.deployed.VCA list
1260 vca_deployed_list = None
1261 if db_nsr["_admin"].get("deployed"):
1262 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1263 if vca_deployed_list is None:
1264 vca_deployed_list = []
1265 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1266 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1267 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1268 elif isinstance(vca_deployed_list, dict):
1269 # maintain backward compatibility. Change a dict to list at database
1270 vca_deployed_list = list(vca_deployed_list.values())
1271 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1272 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1273
1274 db_nsr_update["detailed-status"] = "creating"
1275 db_nsr_update["operational-status"] = "init"
1276
1277 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1278 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1279 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1280
1281 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1282 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1283 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1284 self.logger.debug(logging_text + "Before deploy_kdus")
1285 # Call to deploy_kdus in case exists the "vdu:kdu" param
1286 task_kdu = asyncio.ensure_future(
1287 self.deploy_kdus(
1288 logging_text=logging_text,
1289 nsr_id=nsr_id,
1290 db_nsr=db_nsr,
1291 db_vnfrs=db_vnfrs,
1292 )
1293 )
1294 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1295 task_instantiation_info[task_kdu] = "Deploy KDUs"
1296 task_instantiation_list.append(task_kdu)
1297 # n2vc_redesign STEP 1 Get VCA public ssh-key
1298 # feature 1429. Add n2vc public key to needed VMs
1299 n2vc_key = self.n2vc.get_public_key()
1300 n2vc_key_list = [n2vc_key]
1301 if self.vca_config.get("public_key"):
1302 n2vc_key_list.append(self.vca_config["public_key"])
1303
1304 # n2vc_redesign STEP 2 Deploy Network Scenario
1305 task_ro = asyncio.ensure_future(
1306 self.instantiate_RO(
1307 logging_text=logging_text,
1308 nsr_id=nsr_id,
1309 nsd=nsd,
1310 db_nsr=db_nsr,
1311 db_nslcmop=db_nslcmop,
1312 db_vnfrs=db_vnfrs,
1313 db_vnfds_ref=db_vnfds_ref,
1314 n2vc_key_list=n2vc_key_list
1315 )
1316 )
1317 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1318 task_instantiation_info[task_ro] = "Deploy at VIM"
1319 task_instantiation_list.append(task_ro)
1320
1321 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1322 step = "Looking for needed vnfd to configure with proxy charm"
1323 self.logger.debug(logging_text + step)
1324
1325 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1326 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1327 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1328 vnfd_id = c_vnf["vnfd-id-ref"]
1329 vnfd = db_vnfds_ref[vnfd_id]
1330 member_vnf_index = str(c_vnf["member-vnf-index"])
1331 db_vnfr = db_vnfrs[member_vnf_index]
1332 base_folder = vnfd["_admin"]["storage"]
1333 vdu_id = None
1334 vdu_index = 0
1335 vdu_name = None
1336 kdu_name = None
1337
1338 # Get additional parameters
1339 deploy_params = {}
1340 if db_vnfr.get("additionalParamsForVnf"):
1341 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1342
1343 descriptor_config = vnfd.get("vnf-configuration")
1344 if descriptor_config and descriptor_config.get("juju"):
1345 self._deploy_n2vc(
1346 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1347 db_nsr=db_nsr,
1348 db_vnfr=db_vnfr,
1349 nslcmop_id=nslcmop_id,
1350 nsr_id=nsr_id,
1351 nsi_id=nsi_id,
1352 vnfd_id=vnfd_id,
1353 vdu_id=vdu_id,
1354 kdu_name=kdu_name,
1355 member_vnf_index=member_vnf_index,
1356 vdu_index=vdu_index,
1357 vdu_name=vdu_name,
1358 deploy_params=deploy_params,
1359 descriptor_config=descriptor_config,
1360 base_folder=base_folder,
1361 task_instantiation_list=task_instantiation_list,
1362 task_instantiation_info=task_instantiation_info
1363 )
1364
1365 # Deploy charms for each VDU that supports one.
1366 for vdud in get_iterable(vnfd, 'vdu'):
1367 vdu_id = vdud["id"]
1368 descriptor_config = vdud.get('vdu-configuration')
1369 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1370 if vdur.get("additionalParams"):
1371 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1372 else:
1373 deploy_params_vdu = deploy_params
1374 if descriptor_config and descriptor_config.get("juju"):
1375 # look for vdu index in the db_vnfr["vdu"] section
1376 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1377 # if vdur["vdu-id-ref"] == vdu_id:
1378 # break
1379 # else:
1380 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1381 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1382 # vdu_name = vdur.get("name")
1383 vdu_name = None
1384 kdu_name = None
1385 for vdu_index in range(int(vdud.get("count", 1))):
1386 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1387 self._deploy_n2vc(
1388 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1389 member_vnf_index, vdu_id, vdu_index),
1390 db_nsr=db_nsr,
1391 db_vnfr=db_vnfr,
1392 nslcmop_id=nslcmop_id,
1393 nsr_id=nsr_id,
1394 nsi_id=nsi_id,
1395 vnfd_id=vnfd_id,
1396 vdu_id=vdu_id,
1397 kdu_name=kdu_name,
1398 member_vnf_index=member_vnf_index,
1399 vdu_index=vdu_index,
1400 vdu_name=vdu_name,
1401 deploy_params=deploy_params_vdu,
1402 descriptor_config=descriptor_config,
1403 base_folder=base_folder,
1404 task_instantiation_list=task_instantiation_list,
1405 task_instantiation_info=task_instantiation_info
1406 )
1407 for kdud in get_iterable(vnfd, 'kdu'):
1408 kdu_name = kdud["name"]
1409 descriptor_config = kdud.get('kdu-configuration')
1410 if descriptor_config and descriptor_config.get("juju"):
1411 vdu_id = None
1412 vdu_index = 0
1413 vdu_name = None
1414 # look for vdu index in the db_vnfr["vdu"] section
1415 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1416 # if vdur["vdu-id-ref"] == vdu_id:
1417 # break
1418 # else:
1419 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1420 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1421 # vdu_name = vdur.get("name")
1422 # vdu_name = None
1423
1424 self._deploy_n2vc(
1425 logging_text=logging_text,
1426 db_nsr=db_nsr,
1427 db_vnfr=db_vnfr,
1428 nslcmop_id=nslcmop_id,
1429 nsr_id=nsr_id,
1430 nsi_id=nsi_id,
1431 vnfd_id=vnfd_id,
1432 vdu_id=vdu_id,
1433 kdu_name=kdu_name,
1434 member_vnf_index=member_vnf_index,
1435 vdu_index=vdu_index,
1436 vdu_name=vdu_name,
1437 deploy_params=deploy_params,
1438 descriptor_config=descriptor_config,
1439 base_folder=base_folder,
1440 task_instantiation_list=task_instantiation_list,
1441 task_instantiation_info=task_instantiation_info
1442 )
1443
1444 # Check if this NS has a charm configuration
1445 descriptor_config = nsd.get("ns-configuration")
1446 if descriptor_config and descriptor_config.get("juju"):
1447 vnfd_id = None
1448 db_vnfr = None
1449 member_vnf_index = None
1450 vdu_id = None
1451 kdu_name = None
1452 vdu_index = 0
1453 vdu_name = None
1454
1455 # Get additional parameters
1456 deploy_params = {}
1457 if db_nsr.get("additionalParamsForNs"):
1458 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1459 base_folder = nsd["_admin"]["storage"]
1460 self._deploy_n2vc(
1461 logging_text=logging_text,
1462 db_nsr=db_nsr,
1463 db_vnfr=db_vnfr,
1464 nslcmop_id=nslcmop_id,
1465 nsr_id=nsr_id,
1466 nsi_id=nsi_id,
1467 vnfd_id=vnfd_id,
1468 vdu_id=vdu_id,
1469 kdu_name=kdu_name,
1470 member_vnf_index=member_vnf_index,
1471 vdu_index=vdu_index,
1472 vdu_name=vdu_name,
1473 deploy_params=deploy_params,
1474 descriptor_config=descriptor_config,
1475 base_folder=base_folder,
1476 task_instantiation_list=task_instantiation_list,
1477 task_instantiation_info=task_instantiation_info
1478 )
1479
1480 # Wait until all tasks of "task_instantiation_list" have been finished
1481
1482 # while time() <= start_deploy + self.total_deploy_timeout:
1483 error_text_list = []
1484 timeout = 3600
1485
1486 # let's begin with all OK
1487 instantiated_ok = True
1488 # let's begin with RO 'running' status (later we can change it)
1489 db_nsr_update["operational-status"] = "running"
1490 # let's begin with VCA 'configured' status (later we can change it)
1491 db_nsr_update["config-status"] = "configured"
1492
1493 if task_instantiation_list:
1494 # wait for all tasks completion
1495 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1496
1497 for task in pending:
1498 instantiated_ok = False
1499 if task == task_ro:
1500 db_nsr_update["operational-status"] = "failed"
1501 else:
1502 db_nsr_update["config-status"] = "failed"
1503 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1504 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1505 for task in done:
1506 if task.cancelled():
1507 instantiated_ok = False
1508 if task == task_ro:
1509 db_nsr_update["operational-status"] = "failed"
1510 else:
1511 db_nsr_update["config-status"] = "failed"
1512 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1513 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1514 else:
1515 exc = task.exception()
1516 if exc:
1517 instantiated_ok = False
1518 if task == task_ro:
1519 db_nsr_update["operational-status"] = "failed"
1520 else:
1521 db_nsr_update["config-status"] = "failed"
1522 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1523 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1524 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1525 else:
1526 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1527 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1528 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1529 else:
1530 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1531
1532 if error_text_list:
1533 error_text = "\n".join(error_text_list)
1534 db_nsr_update["detailed-status"] = error_text
1535 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1536 db_nslcmop_update["detailed-status"] = error_text
1537 db_nslcmop_update["statusEnteredTime"] = time()
1538 else:
1539 # all is done
1540 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1541 db_nslcmop_update["statusEnteredTime"] = time()
1542 db_nslcmop_update["detailed-status"] = "done"
1543 db_nsr_update["detailed-status"] = "done"
1544
1545 except (ROclient.ROClientException, DbException, LcmException) as e:
1546 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1547 exc = e
1548 except asyncio.CancelledError:
1549 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1550 exc = "Operation was cancelled"
1551 except Exception as e:
1552 exc = traceback.format_exc()
1553 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1554 exc_info=True)
1555 finally:
1556 if exc:
1557 if db_nsr:
1558 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1559 db_nsr_update["operational-status"] = "failed"
1560 db_nsr_update["config-status"] = "failed"
1561 if db_nslcmop:
1562 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1563 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1564 db_nslcmop_update["statusEnteredTime"] = time()
1565 try:
1566 if db_nsr:
1567 db_nsr_update["_admin.nslcmop"] = None
1568 db_nsr_update["_admin.current-operation"] = None
1569 db_nsr_update["_admin.operation-type"] = None
1570 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1571
1572 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1573 ns_state = None
1574 error_description = None
1575 error_detail = None
1576 if instantiated_ok:
1577 ns_state = "READY"
1578 else:
1579 ns_state = "BROKEN"
1580 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1581 error_detail = error_text
1582 self._write_ns_status(
1583 nsr_id=nsr_id,
1584 ns_state=ns_state,
1585 current_operation="IDLE",
1586 current_operation_id=None,
1587 error_description=error_description,
1588 error_detail=error_detail
1589 )
1590
1591 if db_nslcmop_update:
1592 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1593 except DbException as e:
1594 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1595 if nslcmop_operation_state:
1596 try:
1597 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1598 "operationState": nslcmop_operation_state},
1599 loop=self.loop)
1600 except Exception as e:
1601 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1602
1603 self.logger.debug(logging_text + "Exit")
1604 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1605
1606 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1607 # Launch kdus if present in the descriptor
1608
1609 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1610
1611 def _get_cluster_id(cluster_id, cluster_type):
1612 nonlocal k8scluster_id_2_uuic
1613 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1614 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1615
1616 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1617 if not db_k8scluster:
1618 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1619 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1620 if not k8s_id:
1621 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1622 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1623 return k8s_id
1624
1625 logging_text += "Deploy kdus: "
1626 try:
1627 db_nsr_update = {"_admin.deployed.K8s": []}
1628 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1629
1630 # Look for all vnfds
1631 pending_tasks = {}
1632 index = 0
1633 for vnfr_data in db_vnfrs.values():
1634 for kdur in get_iterable(vnfr_data, "kdur"):
1635 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1636 kdumodel = None
1637 k8sclustertype = None
1638 error_text = None
1639 cluster_uuid = None
1640 if kdur.get("helm-chart"):
1641 kdumodel = kdur["helm-chart"]
1642 k8sclustertype = "chart"
1643 k8sclustertype_full = "helm-chart"
1644 elif kdur.get("juju-bundle"):
1645 kdumodel = kdur["juju-bundle"]
1646 k8sclustertype = "juju"
1647 k8sclustertype_full = "juju-bundle"
1648 else:
1649 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1650 " running"
1651 try:
1652 if not error_text:
1653 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1654 except LcmException as e:
1655 error_text = str(e)
1656 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1657
1658 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1659 "k8scluster-type": k8sclustertype,
1660 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1661 if error_text:
1662 k8s_instace_info["detailed-status"] = error_text
1663 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1665 if error_text:
1666 continue
1667
1668 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1669 "{}".format(index)}
1670 if k8sclustertype == "chart":
1671 task = asyncio.ensure_future(
1672 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1673 params=desc_params, db_dict=db_dict, timeout=3600)
1674 )
1675 else:
1676 task = asyncio.ensure_future(
1677 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1678 atomic=True, params=desc_params,
1679 db_dict=db_dict, timeout=600)
1680 )
1681
1682 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1683 index += 1
1684 if not pending_tasks:
1685 return
1686 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1687 pending_list = list(pending_tasks.keys())
1688 while pending_list:
1689 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1690 return_when=asyncio.FIRST_COMPLETED)
1691 if not done_list: # timeout
1692 for task in pending_list:
1693 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1694 break
1695 for task in done_list:
1696 exc = task.exception()
1697 if exc:
1698 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1699 else:
1700 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1701
1702 except Exception as e:
1703 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1704 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1705 finally:
1706 # TODO Write in data base
1707 if db_nsr_update:
1708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1709
1710 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1711 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1712 base_folder, task_instantiation_list, task_instantiation_info):
1713 # launch instantiate_N2VC in a asyncio task and register task object
1714 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1715 # if not found, create one entry and update database
1716
1717 # fill db_nsr._admin.deployed.VCA.<index>
1718 vca_index = -1
1719 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1720 if not vca_deployed:
1721 continue
1722 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1723 vca_deployed.get("vdu_id") == vdu_id and \
1724 vca_deployed.get("kdu_name") == kdu_name and \
1725 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1726 break
1727 else:
1728 # not found, create one.
1729 vca_deployed = {
1730 "member-vnf-index": member_vnf_index,
1731 "vdu_id": vdu_id,
1732 "kdu_name": kdu_name,
1733 "vdu_count_index": vdu_index,
1734 "operational-status": "init", # TODO revise
1735 "detailed-status": "", # TODO revise
1736 "step": "initial-deploy", # TODO revise
1737 "vnfd_id": vnfd_id,
1738 "vdu_name": vdu_name,
1739 }
1740 vca_index += 1
1741 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1742 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1743
1744 # Launch task
1745 task_n2vc = asyncio.ensure_future(
1746 self.instantiate_N2VC(
1747 logging_text=logging_text,
1748 vca_index=vca_index,
1749 nsi_id=nsi_id,
1750 db_nsr=db_nsr,
1751 db_vnfr=db_vnfr,
1752 vdu_id=vdu_id,
1753 kdu_name=kdu_name,
1754 vdu_index=vdu_index,
1755 deploy_params=deploy_params,
1756 config_descriptor=descriptor_config,
1757 base_folder=base_folder,
1758 )
1759 )
1760 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1761 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1762 task_instantiation_list.append(task_n2vc)
1763
1764 # Check if this VNFD has a configured terminate action
1765 def _has_terminate_config_primitive(self, vnfd):
1766 vnf_config = vnfd.get("vnf-configuration")
1767 if vnf_config and vnf_config.get("terminate-config-primitive"):
1768 return True
1769 else:
1770 return False
1771
1772 @staticmethod
1773 def _get_terminate_config_primitive_seq_list(vnfd):
1774 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1775 # No need to check for existing primitive twice, already done before
1776 vnf_config = vnfd.get("vnf-configuration")
1777 seq_list = vnf_config.get("terminate-config-primitive")
1778 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1779 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1780 return seq_list_sorted
1781
1782 @staticmethod
1783 def _create_nslcmop(nsr_id, operation, params):
1784 """
1785 Creates a ns-lcm-opp content to be stored at database.
1786 :param nsr_id: internal id of the instance
1787 :param operation: instantiate, terminate, scale, action, ...
1788 :param params: user parameters for the operation
1789 :return: dictionary following SOL005 format
1790 """
1791 # Raise exception if invalid arguments
1792 if not (nsr_id and operation and params):
1793 raise LcmException(
1794 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1795 now = time()
1796 _id = str(uuid4())
1797 nslcmop = {
1798 "id": _id,
1799 "_id": _id,
1800 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1801 "operationState": "PROCESSING",
1802 "statusEnteredTime": now,
1803 "nsInstanceId": nsr_id,
1804 "lcmOperationType": operation,
1805 "startTime": now,
1806 "isAutomaticInvocation": False,
1807 "operationParams": params,
1808 "isCancelPending": False,
1809 "links": {
1810 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1811 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1812 }
1813 }
1814 return nslcmop
1815
1816 def _format_additional_params(self, params):
1817 params = params or {}
1818 for key, value in params.items():
1819 if str(value).startswith("!!yaml "):
1820 params[key] = yaml.safe_load(value[7:])
1821 return params
1822
1823 def _get_terminate_primitive_params(self, seq, vnf_index):
1824 primitive = seq.get('name')
1825 primitive_params = {}
1826 params = {
1827 "member_vnf_index": vnf_index,
1828 "primitive": primitive,
1829 "primitive_params": primitive_params,
1830 }
1831 desc_params = {}
1832 return self._map_primitive_params(seq, params, desc_params)
1833
1834 # sub-operations
1835
1836 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1837 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1838 if (op.get('operationState') == 'COMPLETED'):
1839 # b. Skip sub-operation
1840 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1841 return self.SUBOPERATION_STATUS_SKIP
1842 else:
1843 # c. Reintent executing sub-operation
1844 # The sub-operation exists, and operationState != 'COMPLETED'
1845 # Update operationState = 'PROCESSING' to indicate a reintent.
1846 operationState = 'PROCESSING'
1847 detailed_status = 'In progress'
1848 self._update_suboperation_status(
1849 db_nslcmop, op_index, operationState, detailed_status)
1850 # Return the sub-operation index
1851 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1852 # with arguments extracted from the sub-operation
1853 return op_index
1854
1855 # Find a sub-operation where all keys in a matching dictionary must match
1856 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1857 def _find_suboperation(self, db_nslcmop, match):
1858 if (db_nslcmop and match):
1859 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1860 for i, op in enumerate(op_list):
1861 if all(op.get(k) == match[k] for k in match):
1862 return i
1863 return self.SUBOPERATION_STATUS_NOT_FOUND
1864
1865 # Update status for a sub-operation given its index
1866 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1867 # Update DB for HA tasks
1868 q_filter = {'_id': db_nslcmop['_id']}
1869 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1870 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1871 self.db.set_one("nslcmops",
1872 q_filter=q_filter,
1873 update_dict=update_dict,
1874 fail_on_empty=False)
1875
1876 # Add sub-operation, return the index of the added sub-operation
1877 # Optionally, set operationState, detailed-status, and operationType
1878 # Status and type are currently set for 'scale' sub-operations:
1879 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1880 # 'detailed-status' : status message
1881 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1882 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1883 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1884 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1885 RO_nsr_id=None, RO_scaling_info=None):
1886 if not (db_nslcmop):
1887 return self.SUBOPERATION_STATUS_NOT_FOUND
1888 # Get the "_admin.operations" list, if it exists
1889 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1890 op_list = db_nslcmop_admin.get('operations')
1891 # Create or append to the "_admin.operations" list
1892 new_op = {'member_vnf_index': vnf_index,
1893 'vdu_id': vdu_id,
1894 'vdu_count_index': vdu_count_index,
1895 'primitive': primitive,
1896 'primitive_params': mapped_primitive_params}
1897 if operationState:
1898 new_op['operationState'] = operationState
1899 if detailed_status:
1900 new_op['detailed-status'] = detailed_status
1901 if operationType:
1902 new_op['lcmOperationType'] = operationType
1903 if RO_nsr_id:
1904 new_op['RO_nsr_id'] = RO_nsr_id
1905 if RO_scaling_info:
1906 new_op['RO_scaling_info'] = RO_scaling_info
1907 if not op_list:
1908 # No existing operations, create key 'operations' with current operation as first list element
1909 db_nslcmop_admin.update({'operations': [new_op]})
1910 op_list = db_nslcmop_admin.get('operations')
1911 else:
1912 # Existing operations, append operation to list
1913 op_list.append(new_op)
1914
1915 db_nslcmop_update = {'_admin.operations': op_list}
1916 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1917 op_index = len(op_list) - 1
1918 return op_index
1919
1920 # Helper methods for scale() sub-operations
1921
1922 # pre-scale/post-scale:
1923 # Check for 3 different cases:
1924 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1925 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1926 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1927 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1928 operationType, RO_nsr_id=None, RO_scaling_info=None):
1929 # Find this sub-operation
1930 if (RO_nsr_id and RO_scaling_info):
1931 operationType = 'SCALE-RO'
1932 match = {
1933 'member_vnf_index': vnf_index,
1934 'RO_nsr_id': RO_nsr_id,
1935 'RO_scaling_info': RO_scaling_info,
1936 }
1937 else:
1938 match = {
1939 'member_vnf_index': vnf_index,
1940 'primitive': vnf_config_primitive,
1941 'primitive_params': primitive_params,
1942 'lcmOperationType': operationType
1943 }
1944 op_index = self._find_suboperation(db_nslcmop, match)
1945 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1946 # a. New sub-operation
1947 # The sub-operation does not exist, add it.
1948 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1949 # The following parameters are set to None for all kind of scaling:
1950 vdu_id = None
1951 vdu_count_index = None
1952 vdu_name = None
1953 if (RO_nsr_id and RO_scaling_info):
1954 vnf_config_primitive = None
1955 primitive_params = None
1956 else:
1957 RO_nsr_id = None
1958 RO_scaling_info = None
1959 # Initial status for sub-operation
1960 operationState = 'PROCESSING'
1961 detailed_status = 'In progress'
1962 # Add sub-operation for pre/post-scaling (zero or more operations)
1963 self._add_suboperation(db_nslcmop,
1964 vnf_index,
1965 vdu_id,
1966 vdu_count_index,
1967 vdu_name,
1968 vnf_config_primitive,
1969 primitive_params,
1970 operationState,
1971 detailed_status,
1972 operationType,
1973 RO_nsr_id,
1974 RO_scaling_info)
1975 return self.SUBOPERATION_STATUS_NEW
1976 else:
1977 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1978 # or op_index (operationState != 'COMPLETED')
1979 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1980
1981 # Function to return execution_environment id
1982
1983 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
1984 for vca in vca_deployed_list:
1985 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
1986 return vca["ee_id"]
1987
1988 # Helper methods for terminate()
1989
1990 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1991 """ Create a primitive with params from VNFD
1992 Called from terminate() before deleting instance
1993 Calls action() to execute the primitive """
1994 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1995 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1996 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1997 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1998 db_vnfds = {}
1999 # Loop over VNFRs
2000 for vnfr in db_vnfrs_list:
2001 vnfd_id = vnfr["vnfd-id"]
2002 vnf_index = vnfr["member-vnf-index-ref"]
2003 if vnfd_id not in db_vnfds:
2004 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2005 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2006 db_vnfds[vnfd_id] = vnfd
2007 vnfd = db_vnfds[vnfd_id]
2008 if not self._has_terminate_config_primitive(vnfd):
2009 continue
2010 # Get the primitive's sorted sequence list
2011 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2012 for seq in seq_list:
2013 # For each sequence in list, get primitive and call _ns_execute_primitive()
2014 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2015 vnf_index, seq.get("name"))
2016 self.logger.debug(logging_text + step)
2017 # Create the primitive for each sequence, i.e. "primitive": "touch"
2018 primitive = seq.get('name')
2019 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2020 # The following 3 parameters are currently set to None for 'terminate':
2021 # vdu_id, vdu_count_index, vdu_name
2022 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2023 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2024 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2025 # Add sub-operation
2026 self._add_suboperation(db_nslcmop,
2027 nslcmop_id,
2028 vnf_index,
2029 vdu_id,
2030 vdu_count_index,
2031 vdu_name,
2032 primitive,
2033 mapped_primitive_params)
2034 # Sub-operations: Call _ns_execute_primitive() instead of action()
2035 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2036 # nsr_deployed = db_nsr["_admin"]["deployed"]
2037
2038 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2039 # nsr_id, nslcmop_terminate_action_id)
2040 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2041 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2042 # if result not in result_ok:
2043 # raise LcmException(
2044 # "terminate_primitive_action for vnf_member_index={}",
2045 # " primitive={} fails with error {}".format(
2046 # vnf_index, seq.get("name"), result_detail))
2047
2048 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2049 try:
2050 await self.n2vc.exec_primitive(
2051 ee_id=ee_id,
2052 primitive_name=primitive,
2053 params_dict=mapped_primitive_params
2054 )
2055 except Exception as e:
2056 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2057 raise LcmException(
2058 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2059 .format(vnf_index, seq.get("name"), e),
2060 )
2061
2062 async def terminate(self, nsr_id, nslcmop_id):
2063
2064 # Try to lock HA task here
2065 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2066 if not task_is_locked_by_me:
2067 return
2068
2069 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2070 self.logger.debug(logging_text + "Enter")
2071 db_nsr = None
2072 db_nslcmop = None
2073 exc = None
2074 failed_detail = [] # annotates all failed error messages
2075 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2076 "_admin.current-operation": nslcmop_id,
2077 "_admin.operation-type": "terminate"}
2078 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2079 db_nslcmop_update = {}
2080 nslcmop_operation_state = None
2081 autoremove = False # autoremove after terminated
2082 pending_tasks = []
2083 try:
2084 # wait for any previous tasks in process
2085 step = "Waiting for previous operations to terminate"
2086 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2087
2088 self._write_ns_status(
2089 nsr_id=nsr_id,
2090 ns_state="TERMINATING",
2091 current_operation="TERMINATING",
2092 current_operation_id=nslcmop_id
2093 )
2094
2095 step = "Getting nslcmop={} from db".format(nslcmop_id)
2096 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2097 step = "Getting nsr={} from db".format(nsr_id)
2098 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2099 # nsd = db_nsr["nsd"]
2100 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2101 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2102 return
2103 # #TODO check if VIM is creating and wait
2104 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2105 # Call internal terminate action
2106 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2107
2108 pending_tasks = []
2109
2110 db_nsr_update["operational-status"] = "terminating"
2111 db_nsr_update["config-status"] = "terminating"
2112
2113 # remove NS
2114 try:
2115 step = "delete execution environment"
2116 self.logger.debug(logging_text + step)
2117
2118 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2119 pending_tasks.append(task_delete_ee)
2120 except Exception as e:
2121 msg = "Failed while deleting NS in VCA: {}".format(e)
2122 self.logger.error(msg)
2123 failed_detail.append(msg)
2124
2125 try:
2126 # Delete from k8scluster
2127 step = "delete kdus"
2128 self.logger.debug(logging_text + step)
2129 # print(nsr_deployed)
2130 if nsr_deployed:
2131 for kdu in nsr_deployed.get("K8s", ()):
2132 kdu_instance = kdu.get("kdu-instance")
2133 if not kdu_instance:
2134 continue
2135 if kdu.get("k8scluster-type") == "chart":
2136 task_delete_kdu_instance = asyncio.ensure_future(
2137 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2138 kdu_instance=kdu_instance))
2139 elif kdu.get("k8scluster-type") == "juju":
2140 task_delete_kdu_instance = asyncio.ensure_future(
2141 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2142 kdu_instance=kdu_instance))
2143 else:
2144 self.error(logging_text + "Unknown k8s deployment type {}".
2145 format(kdu.get("k8scluster-type")))
2146 continue
2147 pending_tasks.append(task_delete_kdu_instance)
2148 except LcmException as e:
2149 msg = "Failed while deleting KDUs from NS: {}".format(e)
2150 self.logger.error(msg)
2151 failed_detail.append(msg)
2152
2153 # remove from RO
2154 RO_fail = False
2155
2156 # Delete ns
2157 RO_nsr_id = RO_delete_action = None
2158 if nsr_deployed and nsr_deployed.get("RO"):
2159 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2160 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2161 try:
2162 if RO_nsr_id:
2163 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2164 "Deleting ns from VIM"
2165 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2166 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2167 self.logger.debug(logging_text + step)
2168 desc = await self.RO.delete("ns", RO_nsr_id)
2169 RO_delete_action = desc["action_id"]
2170 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2171 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2172 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2173 if RO_delete_action:
2174 # wait until NS is deleted from VIM
2175 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2176 format(RO_nsr_id, RO_delete_action)
2177 detailed_status_old = None
2178 self.logger.debug(logging_text + step)
2179
2180 delete_timeout = 20 * 60 # 20 minutes
2181 while delete_timeout > 0:
2182 desc = await self.RO.show(
2183 "ns",
2184 item_id_name=RO_nsr_id,
2185 extra_item="action",
2186 extra_item_id=RO_delete_action)
2187 ns_status, ns_status_info = self.RO.check_action_status(desc)
2188 if ns_status == "ERROR":
2189 raise ROclient.ROClientException(ns_status_info)
2190 elif ns_status == "BUILD":
2191 detailed_status = step + "; {}".format(ns_status_info)
2192 elif ns_status == "ACTIVE":
2193 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2194 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2195 break
2196 else:
2197 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2198 if detailed_status != detailed_status_old:
2199 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2200 db_nsr_update["detailed-status"] = detailed_status
2201 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2202 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2203 await asyncio.sleep(5, loop=self.loop)
2204 delete_timeout -= 5
2205 else: # delete_timeout <= 0:
2206 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2207
2208 except ROclient.ROClientException as e:
2209 if e.http_code == 404: # not found
2210 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2211 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2212 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2213 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2214 elif e.http_code == 409: # conflict
2215 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2216 self.logger.debug(logging_text + failed_detail[-1])
2217 RO_fail = True
2218 else:
2219 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2220 self.logger.error(logging_text + failed_detail[-1])
2221 RO_fail = True
2222
2223 # Delete nsd
2224 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2225 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2226 try:
2227 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2228 "Deleting nsd from RO"
2229 await self.RO.delete("nsd", RO_nsd_id)
2230 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2231 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2232 except ROclient.ROClientException as e:
2233 if e.http_code == 404: # not found
2234 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2235 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2236 elif e.http_code == 409: # conflict
2237 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2238 self.logger.debug(logging_text + failed_detail[-1])
2239 RO_fail = True
2240 else:
2241 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2242 self.logger.error(logging_text + failed_detail[-1])
2243 RO_fail = True
2244
2245 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2246 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2247 if not vnf_deployed or not vnf_deployed["id"]:
2248 continue
2249 try:
2250 RO_vnfd_id = vnf_deployed["id"]
2251 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2252 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2253 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2254 await self.RO.delete("vnfd", RO_vnfd_id)
2255 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2256 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2257 except ROclient.ROClientException as e:
2258 if e.http_code == 404: # not found
2259 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2260 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2261 elif e.http_code == 409: # conflict
2262 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2263 self.logger.debug(logging_text + failed_detail[-1])
2264 else:
2265 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2266 self.logger.error(logging_text + failed_detail[-1])
2267
2268 if failed_detail:
2269 terminate_ok = False
2270 self.logger.error(logging_text + " ;".join(failed_detail))
2271 db_nsr_update["operational-status"] = "failed"
2272 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2273 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2274 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2275 db_nslcmop_update["statusEnteredTime"] = time()
2276 else:
2277 terminate_ok = True
2278 db_nsr_update["operational-status"] = "terminated"
2279 db_nsr_update["detailed-status"] = "Done"
2280 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2281 db_nslcmop_update["detailed-status"] = "Done"
2282 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2283 db_nslcmop_update["statusEnteredTime"] = time()
2284 if db_nslcmop["operationParams"].get("autoremove"):
2285 autoremove = True
2286
2287 except (ROclient.ROClientException, DbException, LcmException) as e:
2288 self.logger.error(logging_text + "Exit Exception {}".format(e))
2289 exc = e
2290 except asyncio.CancelledError:
2291 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2292 exc = "Operation was cancelled"
2293 except Exception as e:
2294 exc = traceback.format_exc()
2295 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2296 finally:
2297 if exc and db_nslcmop:
2298 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2299 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2300 db_nslcmop_update["statusEnteredTime"] = time()
2301 try:
2302 if db_nslcmop and db_nslcmop_update:
2303 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2304 if db_nsr:
2305 db_nsr_update["_admin.nslcmop"] = None
2306 db_nsr_update["_admin.current-operation"] = None
2307 db_nsr_update["_admin.operation-type"] = None
2308 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2309
2310 if terminate_ok:
2311 ns_state = "IDLE"
2312 error_description = None
2313 error_detail = None
2314 else:
2315 ns_state = "BROKEN"
2316 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2317 error_detail = "; ".join(failed_detail)
2318
2319 self._write_ns_status(
2320 nsr_id=nsr_id,
2321 ns_state=ns_state,
2322 current_operation="IDLE",
2323 current_operation_id=None,
2324 error_description=error_description,
2325 error_detail=error_detail
2326 )
2327
2328 except DbException as e:
2329 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2330 if nslcmop_operation_state:
2331 try:
2332 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2333 "operationState": nslcmop_operation_state,
2334 "autoremove": autoremove},
2335 loop=self.loop)
2336 except Exception as e:
2337 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2338
2339 # wait for pending tasks
2340 done = None
2341 pending = None
2342 if pending_tasks:
2343 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2344 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2345 if not pending:
2346 self.logger.debug(logging_text + 'All tasks finished...')
2347 else:
2348 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2349
2350 self.logger.debug(logging_text + "Exit")
2351 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2352
2353 @staticmethod
2354 def _map_primitive_params(primitive_desc, params, instantiation_params):
2355 """
2356 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2357 The default-value is used. If it is between < > it look for a value at instantiation_params
2358 :param primitive_desc: portion of VNFD/NSD that describes primitive
2359 :param params: Params provided by user
2360 :param instantiation_params: Instantiation params provided by user
2361 :return: a dictionary with the calculated params
2362 """
2363 calculated_params = {}
2364 for parameter in primitive_desc.get("parameter", ()):
2365 param_name = parameter["name"]
2366 if param_name in params:
2367 calculated_params[param_name] = params[param_name]
2368 elif "default-value" in parameter or "value" in parameter:
2369 if "value" in parameter:
2370 calculated_params[param_name] = parameter["value"]
2371 else:
2372 calculated_params[param_name] = parameter["default-value"]
2373 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2374 and calculated_params[param_name].endswith(">"):
2375 if calculated_params[param_name][1:-1] in instantiation_params:
2376 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2377 else:
2378 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2379 format(calculated_params[param_name], primitive_desc["name"]))
2380 else:
2381 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2382 format(param_name, primitive_desc["name"]))
2383
2384 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2385 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2386 width=256)
2387 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2388 calculated_params[param_name] = calculated_params[param_name][7:]
2389
2390 # add always ns_config_info if primitive name is config
2391 if primitive_desc["name"] == "config":
2392 if "ns_config_info" in instantiation_params:
2393 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2394 return calculated_params
2395
2396 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2397 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2398
2399 # find vca_deployed record for this action
2400 try:
2401 for vca_deployed in db_deployed["VCA"]:
2402 if not vca_deployed:
2403 continue
2404 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2405 continue
2406 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2407 continue
2408 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2409 continue
2410 break
2411 else:
2412 # vca_deployed not found
2413 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2414 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2415
2416 # get ee_id
2417 ee_id = vca_deployed.get("ee_id")
2418 if not ee_id:
2419 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2420 "execution environment"
2421 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2422
2423 if primitive == "config":
2424 primitive_params = {"params": primitive_params}
2425
2426 while retries >= 0:
2427 try:
2428 output = await self.n2vc.exec_primitive(
2429 ee_id=ee_id,
2430 primitive_name=primitive,
2431 params_dict=primitive_params
2432 )
2433 # execution was OK
2434 break
2435 except Exception as e:
2436 retries -= 1
2437 if retries >= 0:
2438 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2439 # wait and retry
2440 await asyncio.sleep(retries_interval, loop=self.loop)
2441 else:
2442 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2443
2444 return output, 'OK'
2445
2446 except Exception as e:
2447 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2448
2449 async def action(self, nsr_id, nslcmop_id):
2450
2451 # Try to lock HA task here
2452 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2453 if not task_is_locked_by_me:
2454 return
2455
2456 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2457 self.logger.debug(logging_text + "Enter")
2458 # get all needed from database
2459 db_nsr = None
2460 db_nslcmop = None
2461 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2462 "_admin.current-operation": nslcmop_id,
2463 "_admin.operation-type": "action"}
2464 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2465 db_nslcmop_update = {}
2466 nslcmop_operation_state = None
2467 nslcmop_operation_state_detail = None
2468 exc = None
2469 try:
2470 # wait for any previous tasks in process
2471 step = "Waiting for previous operations to terminate"
2472 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2473
2474 self._write_ns_status(
2475 nsr_id=nsr_id,
2476 ns_state=None,
2477 current_operation="RUNNING ACTION",
2478 current_operation_id=nslcmop_id
2479 )
2480
2481 step = "Getting information from database"
2482 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2483 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2484
2485 nsr_deployed = db_nsr["_admin"].get("deployed")
2486 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2487 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2488 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2489 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2490 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2491
2492 if vnf_index:
2493 step = "Getting vnfr from database"
2494 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2495 step = "Getting vnfd from database"
2496 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2497 else:
2498 if db_nsr.get("nsd"):
2499 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2500 else:
2501 step = "Getting nsd from database"
2502 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2503
2504 # for backward compatibility
2505 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2506 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2507 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2508 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2509
2510 primitive = db_nslcmop["operationParams"]["primitive"]
2511 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2512
2513 # look for primitive
2514 config_primitive_desc = None
2515 if vdu_id:
2516 for vdu in get_iterable(db_vnfd, "vdu"):
2517 if vdu_id == vdu["id"]:
2518 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2519 if config_primitive["name"] == primitive:
2520 config_primitive_desc = config_primitive
2521 break
2522 elif kdu_name:
2523 self.logger.debug(logging_text + "Checking actions in KDUs")
2524 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2525 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2526 if primitive_params:
2527 desc_params.update(primitive_params)
2528 # TODO Check if we will need something at vnf level
2529 index = 0
2530 for kdu in get_iterable(nsr_deployed, "K8s"):
2531 if kdu_name == kdu["kdu-name"]:
2532 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2533 "path": "_admin.deployed.K8s.{}".format(index)}
2534 if primitive == "upgrade":
2535 if desc_params.get("kdu_model"):
2536 kdu_model = desc_params.get("kdu_model")
2537 del desc_params["kdu_model"]
2538 else:
2539 kdu_model = kdu.get("kdu-model")
2540 parts = kdu_model.split(sep=":")
2541 if len(parts) == 2:
2542 kdu_model = parts[0]
2543
2544 if kdu.get("k8scluster-type") == "chart":
2545 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2546 kdu_instance=kdu.get("kdu-instance"),
2547 atomic=True, kdu_model=kdu_model,
2548 params=desc_params, db_dict=db_dict,
2549 timeout=300)
2550 elif kdu.get("k8scluster-type") == "juju":
2551 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2552 kdu_instance=kdu.get("kdu-instance"),
2553 atomic=True, kdu_model=kdu_model,
2554 params=desc_params, db_dict=db_dict,
2555 timeout=300)
2556
2557 else:
2558 msg = "k8scluster-type not defined"
2559 raise LcmException(msg)
2560
2561 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2562 break
2563 elif primitive == "rollback":
2564 if kdu.get("k8scluster-type") == "chart":
2565 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2566 kdu_instance=kdu.get("kdu-instance"),
2567 db_dict=db_dict)
2568 elif kdu.get("k8scluster-type") == "juju":
2569 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2570 kdu_instance=kdu.get("kdu-instance"),
2571 db_dict=db_dict)
2572 else:
2573 msg = "k8scluster-type not defined"
2574 raise LcmException(msg)
2575 break
2576 elif primitive == "status":
2577 if kdu.get("k8scluster-type") == "chart":
2578 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2579 kdu_instance=kdu.get("kdu-instance"))
2580 elif kdu.get("k8scluster-type") == "juju":
2581 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2582 kdu_instance=kdu.get("kdu-instance"))
2583 else:
2584 msg = "k8scluster-type not defined"
2585 raise LcmException(msg)
2586 break
2587 index += 1
2588
2589 else:
2590 raise LcmException("KDU '{}' not found".format(kdu_name))
2591 if output:
2592 db_nslcmop_update["detailed-status"] = output
2593 db_nslcmop_update["operationState"] = 'COMPLETED'
2594 db_nslcmop_update["statusEnteredTime"] = time()
2595 else:
2596 db_nslcmop_update["detailed-status"] = ''
2597 db_nslcmop_update["operationState"] = 'FAILED'
2598 db_nslcmop_update["statusEnteredTime"] = time()
2599 return
2600 elif vnf_index:
2601 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2602 if config_primitive["name"] == primitive:
2603 config_primitive_desc = config_primitive
2604 break
2605 else:
2606 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2607 if config_primitive["name"] == primitive:
2608 config_primitive_desc = config_primitive
2609 break
2610
2611 if not config_primitive_desc:
2612 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2613 format(primitive))
2614
2615 desc_params = {}
2616 if vnf_index:
2617 if db_vnfr.get("additionalParamsForVnf"):
2618 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2619 if vdu_id:
2620 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2621 if vdur.get("additionalParams"):
2622 desc_params = self._format_additional_params(vdur["additionalParams"])
2623 else:
2624 if db_nsr.get("additionalParamsForNs"):
2625 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2626
2627 # TODO check if ns is in a proper status
2628 output, detail = await self._ns_execute_primitive(
2629 db_deployed=nsr_deployed,
2630 member_vnf_index=vnf_index,
2631 vdu_id=vdu_id,
2632 vdu_name=vdu_name,
2633 vdu_count_index=vdu_count_index,
2634 primitive=primitive,
2635 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2636
2637 detailed_status = output
2638 if detail == 'OK':
2639 result = 'COMPLETED'
2640 else:
2641 result = 'FAILED'
2642
2643 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2644 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2645 db_nslcmop_update["statusEnteredTime"] = time()
2646 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2647 return # database update is called inside finally
2648
2649 except (DbException, LcmException) as e:
2650 self.logger.error(logging_text + "Exit Exception {}".format(e))
2651 exc = e
2652 except asyncio.CancelledError:
2653 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2654 exc = "Operation was cancelled"
2655 except Exception as e:
2656 exc = traceback.format_exc()
2657 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2658 finally:
2659 if exc and db_nslcmop:
2660 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2661 "FAILED {}: {}".format(step, exc)
2662 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2663 db_nslcmop_update["statusEnteredTime"] = time()
2664 try:
2665 if db_nslcmop_update:
2666 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2667 if db_nsr:
2668 db_nsr_update["_admin.nslcmop"] = None
2669 db_nsr_update["_admin.operation-type"] = None
2670 db_nsr_update["_admin.nslcmop"] = None
2671 db_nsr_update["_admin.current-operation"] = None
2672 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2673 self._write_ns_status(
2674 nsr_id=nsr_id,
2675 ns_state=None,
2676 current_operation="IDLE",
2677 current_operation_id=None
2678 )
2679 except DbException as e:
2680 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2681 self.logger.debug(logging_text + "Exit")
2682 if nslcmop_operation_state:
2683 try:
2684 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2685 "operationState": nslcmop_operation_state},
2686 loop=self.loop)
2687 except Exception as e:
2688 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2689 self.logger.debug(logging_text + "Exit")
2690 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2691 return nslcmop_operation_state, nslcmop_operation_state_detail
2692
2693 async def scale(self, nsr_id, nslcmop_id):
2694
2695 # Try to lock HA task here
2696 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2697 if not task_is_locked_by_me:
2698 return
2699
2700 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2701 self.logger.debug(logging_text + "Enter")
2702 # get all needed from database
2703 db_nsr = None
2704 db_nslcmop = None
2705 db_nslcmop_update = {}
2706 nslcmop_operation_state = None
2707 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2708 "_admin.current-operation": nslcmop_id,
2709 "_admin.operation-type": "scale"}
2710 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2711 exc = None
2712 # in case of error, indicates what part of scale was failed to put nsr at error status
2713 scale_process = None
2714 old_operational_status = ""
2715 old_config_status = ""
2716 vnfr_scaled = False
2717 try:
2718 # wait for any previous tasks in process
2719 step = "Waiting for previous operations to terminate"
2720 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2721
2722 self._write_ns_status(
2723 nsr_id=nsr_id,
2724 ns_state=None,
2725 current_operation="SCALING",
2726 current_operation_id=nslcmop_id
2727 )
2728
2729 step = "Getting nslcmop from database"
2730 self.logger.debug(step + " after having waited for previous tasks to be completed")
2731 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2732 step = "Getting nsr from database"
2733 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2734
2735 old_operational_status = db_nsr["operational-status"]
2736 old_config_status = db_nsr["config-status"]
2737 step = "Parsing scaling parameters"
2738 # self.logger.debug(step)
2739 db_nsr_update["operational-status"] = "scaling"
2740 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2741 nsr_deployed = db_nsr["_admin"].get("deployed")
2742
2743 #######
2744 nsr_deployed = db_nsr["_admin"].get("deployed")
2745 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2746 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2747 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2748 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2749 #######
2750
2751 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2752 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2753 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2754 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2755 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2756
2757 # for backward compatibility
2758 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2759 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2760 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2761 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2762
2763 step = "Getting vnfr from database"
2764 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2765 step = "Getting vnfd from database"
2766 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2767
2768 step = "Getting scaling-group-descriptor"
2769 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2770 if scaling_descriptor["name"] == scaling_group:
2771 break
2772 else:
2773 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2774 "at vnfd:scaling-group-descriptor".format(scaling_group))
2775
2776 # cooldown_time = 0
2777 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2778 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2779 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2780 # break
2781
2782 # TODO check if ns is in a proper status
2783 step = "Sending scale order to VIM"
2784 nb_scale_op = 0
2785 if not db_nsr["_admin"].get("scaling-group"):
2786 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2787 admin_scale_index = 0
2788 else:
2789 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2790 if admin_scale_info["name"] == scaling_group:
2791 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2792 break
2793 else: # not found, set index one plus last element and add new entry with the name
2794 admin_scale_index += 1
2795 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2796 RO_scaling_info = []
2797 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2798 if scaling_type == "SCALE_OUT":
2799 # count if max-instance-count is reached
2800 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2801 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2802 if nb_scale_op >= max_instance_count:
2803 raise LcmException("reached the limit of {} (max-instance-count) "
2804 "scaling-out operations for the "
2805 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2806
2807 nb_scale_op += 1
2808 vdu_scaling_info["scaling_direction"] = "OUT"
2809 vdu_scaling_info["vdu-create"] = {}
2810 for vdu_scale_info in scaling_descriptor["vdu"]:
2811 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2812 "type": "create", "count": vdu_scale_info.get("count", 1)})
2813 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2814
2815 elif scaling_type == "SCALE_IN":
2816 # count if min-instance-count is reached
2817 min_instance_count = 0
2818 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2819 min_instance_count = int(scaling_descriptor["min-instance-count"])
2820 if nb_scale_op <= min_instance_count:
2821 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2822 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2823 nb_scale_op -= 1
2824 vdu_scaling_info["scaling_direction"] = "IN"
2825 vdu_scaling_info["vdu-delete"] = {}
2826 for vdu_scale_info in scaling_descriptor["vdu"]:
2827 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2828 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2829 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2830
2831 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2832 vdu_create = vdu_scaling_info.get("vdu-create")
2833 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2834 if vdu_scaling_info["scaling_direction"] == "IN":
2835 for vdur in reversed(db_vnfr["vdur"]):
2836 if vdu_delete.get(vdur["vdu-id-ref"]):
2837 vdu_delete[vdur["vdu-id-ref"]] -= 1
2838 vdu_scaling_info["vdu"].append({
2839 "name": vdur["name"],
2840 "vdu_id": vdur["vdu-id-ref"],
2841 "interface": []
2842 })
2843 for interface in vdur["interfaces"]:
2844 vdu_scaling_info["vdu"][-1]["interface"].append({
2845 "name": interface["name"],
2846 "ip_address": interface["ip-address"],
2847 "mac_address": interface.get("mac-address"),
2848 })
2849 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2850
2851 # PRE-SCALE BEGIN
2852 step = "Executing pre-scale vnf-config-primitive"
2853 if scaling_descriptor.get("scaling-config-action"):
2854 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2855 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2856 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2857 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2858 step = db_nslcmop_update["detailed-status"] = \
2859 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2860
2861 # look for primitive
2862 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2863 if config_primitive["name"] == vnf_config_primitive:
2864 break
2865 else:
2866 raise LcmException(
2867 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2868 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2869 "primitive".format(scaling_group, config_primitive))
2870
2871 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2872 if db_vnfr.get("additionalParamsForVnf"):
2873 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2874
2875 scale_process = "VCA"
2876 db_nsr_update["config-status"] = "configuring pre-scaling"
2877 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2878
2879 # Pre-scale reintent check: Check if this sub-operation has been executed before
2880 op_index = self._check_or_add_scale_suboperation(
2881 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2882 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2883 # Skip sub-operation
2884 result = 'COMPLETED'
2885 result_detail = 'Done'
2886 self.logger.debug(logging_text +
2887 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2888 vnf_config_primitive, result, result_detail))
2889 else:
2890 if (op_index == self.SUBOPERATION_STATUS_NEW):
2891 # New sub-operation: Get index of this sub-operation
2892 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2893 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2894 format(vnf_config_primitive))
2895 else:
2896 # Reintent: Get registered params for this existing sub-operation
2897 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2898 vnf_index = op.get('member_vnf_index')
2899 vnf_config_primitive = op.get('primitive')
2900 primitive_params = op.get('primitive_params')
2901 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2902 format(vnf_config_primitive))
2903 # Execute the primitive, either with new (first-time) or registered (reintent) args
2904 result, result_detail = await self._ns_execute_primitive(
2905 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2906 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2907 vnf_config_primitive, result, result_detail))
2908 # Update operationState = COMPLETED | FAILED
2909 self._update_suboperation_status(
2910 db_nslcmop, op_index, result, result_detail)
2911
2912 if result == "FAILED":
2913 raise LcmException(result_detail)
2914 db_nsr_update["config-status"] = old_config_status
2915 scale_process = None
2916 # PRE-SCALE END
2917
2918 # SCALE RO - BEGIN
2919 # Should this block be skipped if 'RO_nsr_id' == None ?
2920 # if (RO_nsr_id and RO_scaling_info):
2921 if RO_scaling_info:
2922 scale_process = "RO"
2923 # Scale RO reintent check: Check if this sub-operation has been executed before
2924 op_index = self._check_or_add_scale_suboperation(
2925 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2926 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2927 # Skip sub-operation
2928 result = 'COMPLETED'
2929 result_detail = 'Done'
2930 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2931 result, result_detail))
2932 else:
2933 if (op_index == self.SUBOPERATION_STATUS_NEW):
2934 # New sub-operation: Get index of this sub-operation
2935 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2936 self.logger.debug(logging_text + "New sub-operation RO")
2937 else:
2938 # Reintent: Get registered params for this existing sub-operation
2939 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2940 RO_nsr_id = op.get('RO_nsr_id')
2941 RO_scaling_info = op.get('RO_scaling_info')
2942 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2943 vnf_config_primitive))
2944
2945 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2946 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2947 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2948 # wait until ready
2949 RO_nslcmop_id = RO_desc["instance_action_id"]
2950 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2951
2952 RO_task_done = False
2953 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2954 detailed_status_old = None
2955 self.logger.debug(logging_text + step)
2956
2957 deployment_timeout = 1 * 3600 # One hour
2958 while deployment_timeout > 0:
2959 if not RO_task_done:
2960 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2961 extra_item_id=RO_nslcmop_id)
2962 ns_status, ns_status_info = self.RO.check_action_status(desc)
2963 if ns_status == "ERROR":
2964 raise ROclient.ROClientException(ns_status_info)
2965 elif ns_status == "BUILD":
2966 detailed_status = step + "; {}".format(ns_status_info)
2967 elif ns_status == "ACTIVE":
2968 RO_task_done = True
2969 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2970 self.logger.debug(logging_text + step)
2971 else:
2972 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2973 else:
2974
2975 if ns_status == "ERROR":
2976 raise ROclient.ROClientException(ns_status_info)
2977 elif ns_status == "BUILD":
2978 detailed_status = step + "; {}".format(ns_status_info)
2979 elif ns_status == "ACTIVE":
2980 step = detailed_status = \
2981 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2982 if not vnfr_scaled:
2983 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2984 vnfr_scaled = True
2985 try:
2986 desc = await self.RO.show("ns", RO_nsr_id)
2987 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2988 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2989 break
2990 except LcmExceptionNoMgmtIP:
2991 pass
2992 else:
2993 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2994 if detailed_status != detailed_status_old:
2995 self._update_suboperation_status(
2996 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2997 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2998 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2999
3000 await asyncio.sleep(5, loop=self.loop)
3001 deployment_timeout -= 5
3002 if deployment_timeout <= 0:
3003 self._update_suboperation_status(
3004 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3005 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3006
3007 # update VDU_SCALING_INFO with the obtained ip_addresses
3008 if vdu_scaling_info["scaling_direction"] == "OUT":
3009 for vdur in reversed(db_vnfr["vdur"]):
3010 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3011 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3012 vdu_scaling_info["vdu"].append({
3013 "name": vdur["name"],
3014 "vdu_id": vdur["vdu-id-ref"],
3015 "interface": []
3016 })
3017 for interface in vdur["interfaces"]:
3018 vdu_scaling_info["vdu"][-1]["interface"].append({
3019 "name": interface["name"],
3020 "ip_address": interface["ip-address"],
3021 "mac_address": interface.get("mac-address"),
3022 })
3023 del vdu_scaling_info["vdu-create"]
3024
3025 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3026 # SCALE RO - END
3027
3028 scale_process = None
3029 if db_nsr_update:
3030 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3031
3032 # POST-SCALE BEGIN
3033 # execute primitive service POST-SCALING
3034 step = "Executing post-scale vnf-config-primitive"
3035 if scaling_descriptor.get("scaling-config-action"):
3036 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3037 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3038 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3039 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3040 step = db_nslcmop_update["detailed-status"] = \
3041 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3042
3043 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3044 if db_vnfr.get("additionalParamsForVnf"):
3045 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3046
3047 # look for primitive
3048 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3049 if config_primitive["name"] == vnf_config_primitive:
3050 break
3051 else:
3052 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3053 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3054 "match any vnf-configuration:config-primitive".format(scaling_group,
3055 config_primitive))
3056 scale_process = "VCA"
3057 db_nsr_update["config-status"] = "configuring post-scaling"
3058 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3059
3060 # Post-scale reintent check: Check if this sub-operation has been executed before
3061 op_index = self._check_or_add_scale_suboperation(
3062 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3063 if op_index == self.SUBOPERATION_STATUS_SKIP:
3064 # Skip sub-operation
3065 result = 'COMPLETED'
3066 result_detail = 'Done'
3067 self.logger.debug(logging_text +
3068 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3069 format(vnf_config_primitive, result, result_detail))
3070 else:
3071 if op_index == self.SUBOPERATION_STATUS_NEW:
3072 # New sub-operation: Get index of this sub-operation
3073 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3074 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3075 format(vnf_config_primitive))
3076 else:
3077 # Reintent: Get registered params for this existing sub-operation
3078 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3079 vnf_index = op.get('member_vnf_index')
3080 vnf_config_primitive = op.get('primitive')
3081 primitive_params = op.get('primitive_params')
3082 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3083 format(vnf_config_primitive))
3084 # Execute the primitive, either with new (first-time) or registered (reintent) args
3085 result, result_detail = await self._ns_execute_primitive(
3086 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3087 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3088 vnf_config_primitive, result, result_detail))
3089 # Update operationState = COMPLETED | FAILED
3090 self._update_suboperation_status(
3091 db_nslcmop, op_index, result, result_detail)
3092
3093 if result == "FAILED":
3094 raise LcmException(result_detail)
3095 db_nsr_update["config-status"] = old_config_status
3096 scale_process = None
3097 # POST-SCALE END
3098
3099 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3100 db_nslcmop_update["statusEnteredTime"] = time()
3101 db_nslcmop_update["detailed-status"] = "done"
3102 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3103 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3104 else old_operational_status
3105 db_nsr_update["config-status"] = old_config_status
3106 return
3107 except (ROclient.ROClientException, DbException, LcmException) as e:
3108 self.logger.error(logging_text + "Exit Exception {}".format(e))
3109 exc = e
3110 except asyncio.CancelledError:
3111 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3112 exc = "Operation was cancelled"
3113 except Exception as e:
3114 exc = traceback.format_exc()
3115 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3116 finally:
3117 if exc:
3118 if db_nslcmop:
3119 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3120 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3121 db_nslcmop_update["statusEnteredTime"] = time()
3122 if db_nsr:
3123 db_nsr_update["operational-status"] = old_operational_status
3124 db_nsr_update["config-status"] = old_config_status
3125 db_nsr_update["detailed-status"] = ""
3126 db_nsr_update["_admin.nslcmop"] = None
3127 if scale_process:
3128 if "VCA" in scale_process:
3129 db_nsr_update["config-status"] = "failed"
3130 if "RO" in scale_process:
3131 db_nsr_update["operational-status"] = "failed"
3132 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3133 exc)
3134 try:
3135 if db_nslcmop and db_nslcmop_update:
3136 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3137 if db_nsr:
3138 db_nsr_update["_admin.current-operation"] = None
3139 db_nsr_update["_admin.operation-type"] = None
3140 db_nsr_update["_admin.nslcmop"] = None
3141 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3142
3143 self._write_ns_status(
3144 nsr_id=nsr_id,
3145 ns_state=None,
3146 current_operation="IDLE",
3147 current_operation_id=None
3148 )
3149
3150 except DbException as e:
3151 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3152 if nslcmop_operation_state:
3153 try:
3154 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3155 "operationState": nslcmop_operation_state},
3156 loop=self.loop)
3157 # if cooldown_time:
3158 # await asyncio.sleep(cooldown_time, loop=self.loop)
3159 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3160 except Exception as e:
3161 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3162 self.logger.debug(logging_text + "Exit")
3163 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")