Bug 1007 - Pass NS Id and KDU Name to Juju k8s
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
619
620 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
621 vdur_RO_count_index = 0
622 if vdur.get("pdu-type"):
623 continue
624 for vdur_RO in get_iterable(vnf_RO, "vms"):
625 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
626 continue
627 if vdur["count-index"] != vdur_RO_count_index:
628 vdur_RO_count_index += 1
629 continue
630 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
631 if vdur_RO.get("ip_address"):
632 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
633 else:
634 vdur["ip-address"] = None
635 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
636 vdur["name"] = vdur_RO.get("vim_name")
637 vdur["status"] = vdur_RO.get("status")
638 vdur["status-detailed"] = vdur_RO.get("error_msg")
639 for ifacer in get_iterable(vdur, "interfaces"):
640 for interface_RO in get_iterable(vdur_RO, "interfaces"):
641 if ifacer["name"] == interface_RO.get("internal_name"):
642 ifacer["ip-address"] = interface_RO.get("ip_address")
643 ifacer["mac-address"] = interface_RO.get("mac_address")
644 break
645 else:
646 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
647 "from VIM info"
648 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
649 vnfr_update["vdur.{}".format(vdu_index)] = vdur
650 break
651 else:
652 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
653 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
654
655 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
656 for net_RO in get_iterable(nsr_desc_RO, "nets"):
657 if vld["id"] != net_RO.get("vnf_net_osm_id"):
658 continue
659 vld["vim-id"] = net_RO.get("vim_net_id")
660 vld["name"] = net_RO.get("vim_name")
661 vld["status"] = net_RO.get("status")
662 vld["status-detailed"] = net_RO.get("error_msg")
663 vnfr_update["vld.{}".format(vld_index)] = vld
664 break
665 else:
666 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
667 vnf_index, vld["id"]))
668
669 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
670 break
671
672 else:
673 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
674
675 def _get_ns_config_info(self, nsr_id):
676 """
677 Generates a mapping between vnf,vdu elements and the N2VC id
678 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
679 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
680 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
681 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
682 """
683 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
684 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
685 mapping = {}
686 ns_config_info = {"osm-config-mapping": mapping}
687 for vca in vca_deployed_list:
688 if not vca["member-vnf-index"]:
689 continue
690 if not vca["vdu_id"]:
691 mapping[vca["member-vnf-index"]] = vca["application"]
692 else:
693 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
694 vca["application"]
695 return ns_config_info
696
697 @staticmethod
698 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
699 """
700 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
701 primitives as verify-ssh-credentials, or config when needed
702 :param desc_primitive_list: information of the descriptor
703 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
704 this element contains a ssh public key
705 :return: The modified list. Can ba an empty list, but always a list
706 """
707 if desc_primitive_list:
708 primitive_list = desc_primitive_list.copy()
709 else:
710 primitive_list = []
711 # look for primitive config, and get the position. None if not present
712 config_position = None
713 for index, primitive in enumerate(primitive_list):
714 if primitive["name"] == "config":
715 config_position = index
716 break
717
718 # for NS, add always a config primitive if not present (bug 874)
719 if not vca_deployed["member-vnf-index"] and config_position is None:
720 primitive_list.insert(0, {"name": "config", "parameter": []})
721 config_position = 0
722 # for VNF/VDU add verify-ssh-credentials after config
723 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
724 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
725 return primitive_list
726
727 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
728 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
729
730 db_nsr_update = {}
731 RO_descriptor_number = 0 # number of descriptors created at RO
732 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
733 start_deploy = time()
734 vdu_flag = False # If any of the VNFDs has VDUs
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # deploy RO
742
743 # get vnfds, instantiate at RO
744
745 for c_vnf in nsd.get("constituent-vnfd", ()):
746 member_vnf_index = c_vnf["member-vnf-index"]
747 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
748 if vnfd.get("vdu"):
749 vdu_flag = True
750 vnfd_ref = vnfd["id"]
751 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
752 " RO".format(vnfd_ref, member_vnf_index)
753 # self.logger.debug(logging_text + step)
754 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
755 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
756 RO_descriptor_number += 1
757
758 # look position at deployed.RO.vnfd if not present it will be appended at the end
759 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
760 if vnf_deployed["member-vnf-index"] == member_vnf_index:
761 break
762 else:
763 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
764 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
765
766 # look if present
767 RO_update = {"member-vnf-index": member_vnf_index}
768 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
769 if vnfd_list:
770 RO_update["id"] = vnfd_list[0]["uuid"]
771 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
772 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
773 else:
774 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
775 get("additionalParamsForVnf"), nsr_id)
776 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
777 RO_update["id"] = desc["uuid"]
778 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
779 vnfd_ref, member_vnf_index, desc["uuid"]))
780 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
781 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
782 self.update_db_2("nsrs", nsr_id, db_nsr_update)
783
784 # create nsd at RO
785 nsd_ref = nsd["id"]
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
787 # self.logger.debug(logging_text + step)
788
789 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
790 RO_descriptor_number += 1
791 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
792 if nsd_list:
793 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
794 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
795 nsd_ref, RO_nsd_uuid))
796 else:
797 nsd_RO = deepcopy(nsd)
798 nsd_RO["id"] = RO_osm_nsd_id
799 nsd_RO.pop("_id", None)
800 nsd_RO.pop("_admin", None)
801 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
802 member_vnf_index = c_vnf["member-vnf-index"]
803 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
804 for c_vld in nsd_RO.get("vld", ()):
805 for cp in c_vld.get("vnfd-connection-point-ref", ()):
806 member_vnf_index = cp["member-vnf-index-ref"]
807 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
808
809 desc = await self.RO.create("nsd", descriptor=nsd_RO)
810 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
811 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
812 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
813 self.update_db_2("nsrs", nsr_id, db_nsr_update)
814
815 # Crate ns at RO
816 # if present use it unless in error status
817 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
818 if RO_nsr_id:
819 try:
820 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
821 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
822 desc = await self.RO.show("ns", RO_nsr_id)
823
824 except ROclient.ROClientException as e:
825 if e.http_code != HTTPStatus.NOT_FOUND:
826 raise
827 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
828 if RO_nsr_id:
829 ns_status, ns_status_info = self.RO.check_ns_status(desc)
830 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
831 if ns_status == "ERROR":
832 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
833 .format(RO_nsr_id)
834 self.logger.debug(logging_text + step)
835 await self.RO.delete("ns", RO_nsr_id)
836 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
837 if not RO_nsr_id:
838 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
839 # self.logger.debug(logging_text + step)
840
841 # check if VIM is creating and wait look if previous tasks in process
842 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
843 if task_dependency:
844 step = "Waiting for related tasks to be completed: {}".format(task_name)
845 self.logger.debug(logging_text + step)
846 await asyncio.wait(task_dependency, timeout=3600)
847 if ns_params.get("vnf"):
848 for vnf in ns_params["vnf"]:
849 if "vimAccountId" in vnf:
850 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
851 vnf["vimAccountId"])
852 if task_dependency:
853 step = "Waiting for related tasks to be completed: {}".format(task_name)
854 self.logger.debug(logging_text + step)
855 await asyncio.wait(task_dependency, timeout=3600)
856
857 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
858
859 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
860
861 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
862 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
863 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
864 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
865 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
866 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
867 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
868 self.update_db_2("nsrs", nsr_id, db_nsr_update)
869
870 # wait until NS is ready
871 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
872 detailed_status_old = None
873 self.logger.debug(logging_text + step)
874
875 old_desc = None
876 while time() <= start_deploy + timeout_ns_deploy:
877 desc = await self.RO.show("ns", RO_nsr_id)
878
879 # deploymentStatus
880 if desc != old_desc:
881 # desc has changed => update db
882 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
883 old_desc = desc
884
885 ns_status, ns_status_info = self.RO.check_ns_status(desc)
886 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
887 if ns_status == "ERROR":
888 raise ROclient.ROClientException(ns_status_info)
889 elif ns_status == "BUILD":
890 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
891 elif ns_status == "ACTIVE":
892 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
893 try:
894 if vdu_flag:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur:
968 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
969 vnfr_id, vdu_id, vdu_index
970 ))
971
972 if vdur.get("status") == "ACTIVE":
973 ip_address = vdur.get("ip-address")
974 if not ip_address:
975 continue
976 target_vdu_id = vdur["vdu-id-ref"]
977 elif vdur.get("status") == "ERROR":
978 raise LcmException("Cannot inject ssh-key because target VM is in error state")
979
980 if not target_vdu_id:
981 continue
982
983 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
984
985 # inject public key into machine
986 if pub_key and user:
987 # self.logger.debug(logging_text + "Inserting RO key")
988 try:
989 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
990 result_dict = await self.RO.create_action(
991 item="ns",
992 item_id_name=ro_nsr_id,
993 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
994 )
995 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
996 if not result_dict or not isinstance(result_dict, dict):
997 raise LcmException("Unknown response from RO when injecting key")
998 for result in result_dict.values():
999 if result.get("vim_result") == 200:
1000 break
1001 else:
1002 raise ROclient.ROClientException("error injecting key: {}".format(
1003 result.get("description")))
1004 break
1005 except ROclient.ROClientException as e:
1006 if not nb_tries:
1007 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1008 format(e, 20*10))
1009 nb_tries += 1
1010 if nb_tries >= 20:
1011 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1012 else:
1013 break
1014
1015 return ip_address
1016
1017 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1018 """
1019 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1020 """
1021 my_vca = vca_deployed_list[vca_index]
1022 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1023 # vdu or kdu: no dependencies
1024 return
1025 timeout = 300
1026 while timeout >= 0:
1027 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1028 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1029 configuration_status_list = db_nsr["configurationStatus"]
1030 for index, vca_deployed in enumerate(configuration_status_list):
1031 if index == vca_index:
1032 # myself
1033 continue
1034 if not my_vca.get("member-vnf-index") or \
1035 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1036 internal_status = configuration_status_list[index].get("status")
1037 if internal_status == 'READY':
1038 continue
1039 elif internal_status == 'BROKEN':
1040 raise LcmException("Configuration aborted because dependent charm/s has failed")
1041 else:
1042 break
1043 else:
1044 # no dependencies, return
1045 return
1046 await asyncio.sleep(10)
1047 timeout -= 1
1048
1049 raise LcmException("Configuration aborted because dependent charm/s timeout")
1050
1051 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1052 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1053 nsr_id = db_nsr["_id"]
1054 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1055 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1056 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1057 db_dict = {
1058 'collection': 'nsrs',
1059 'filter': {'_id': nsr_id},
1060 'path': db_update_entry
1061 }
1062 step = ""
1063 try:
1064
1065 element_type = 'NS'
1066 element_under_configuration = nsr_id
1067
1068 vnfr_id = None
1069 if db_vnfr:
1070 vnfr_id = db_vnfr["_id"]
1071
1072 namespace = "{nsi}.{ns}".format(
1073 nsi=nsi_id if nsi_id else "",
1074 ns=nsr_id)
1075
1076 if vnfr_id:
1077 element_type = 'VNF'
1078 element_under_configuration = vnfr_id
1079 namespace += ".{}".format(vnfr_id)
1080 if vdu_id:
1081 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1082 element_type = 'VDU'
1083 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1084
1085 # Get artifact path
1086 self.fs.sync() # Sync from FSMongo
1087 artifact_path = "{}/{}/charms/{}".format(
1088 base_folder["folder"],
1089 base_folder["pkg-dir"],
1090 config_descriptor["juju"]["charm"]
1091 )
1092
1093 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1094 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1095 is_proxy_charm = False
1096
1097 # n2vc_redesign STEP 3.1
1098
1099 # find old ee_id if exists
1100 ee_id = vca_deployed.get("ee_id")
1101
1102 # create or register execution environment in VCA
1103 if is_proxy_charm:
1104
1105 self._write_configuration_status(
1106 nsr_id=nsr_id,
1107 vca_index=vca_index,
1108 status='CREATING',
1109 element_under_configuration=element_under_configuration,
1110 element_type=element_type
1111 )
1112
1113 step = "create execution environment"
1114 self.logger.debug(logging_text + step)
1115 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1116 reuse_ee_id=ee_id,
1117 db_dict=db_dict)
1118
1119 else:
1120 step = "Waiting to VM being up and getting IP address"
1121 self.logger.debug(logging_text + step)
1122 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1123 user=None, pub_key=None)
1124 credentials = {"hostname": rw_mgmt_ip}
1125 # get username
1126 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1127 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1128 # merged. Meanwhile let's get username from initial-config-primitive
1129 if not username and config_descriptor.get("initial-config-primitive"):
1130 for config_primitive in config_descriptor["initial-config-primitive"]:
1131 for param in config_primitive.get("parameter", ()):
1132 if param["name"] == "ssh-username":
1133 username = param["value"]
1134 break
1135 if not username:
1136 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1137 "'config-access.ssh-access.default-user'")
1138 credentials["username"] = username
1139 # n2vc_redesign STEP 3.2
1140
1141 self._write_configuration_status(
1142 nsr_id=nsr_id,
1143 vca_index=vca_index,
1144 status='REGISTERING',
1145 element_under_configuration=element_under_configuration,
1146 element_type=element_type
1147 )
1148
1149 step = "register execution environment {}".format(credentials)
1150 self.logger.debug(logging_text + step)
1151 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1152 db_dict=db_dict)
1153
1154 # for compatibility with MON/POL modules, the need model and application name at database
1155 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1156 ee_id_parts = ee_id.split('.')
1157 model_name = ee_id_parts[0]
1158 application_name = ee_id_parts[1]
1159 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1160 db_update_entry + "application": application_name,
1161 db_update_entry + "ee_id": ee_id})
1162
1163 # n2vc_redesign STEP 3.3
1164
1165 step = "Install configuration Software"
1166
1167 self._write_configuration_status(
1168 nsr_id=nsr_id,
1169 vca_index=vca_index,
1170 status='INSTALLING SW',
1171 element_under_configuration=element_under_configuration,
1172 element_type=element_type
1173 )
1174
1175 # TODO check if already done
1176 self.logger.debug(logging_text + step)
1177 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1178
1179 # write in db flag of configuration_sw already installed
1180 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1181
1182 # add relations for this VCA (wait for other peers related with this VCA)
1183 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1184
1185 # if SSH access is required, then get execution environment SSH public
1186 if is_proxy_charm: # if native charm we have waited already to VM be UP
1187 pub_key = None
1188 user = None
1189 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1190 # Needed to inject a ssh key
1191 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1192 step = "Install configuration Software, getting public ssh key"
1193 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1194
1195 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1196 else:
1197 step = "Waiting to VM being up and getting IP address"
1198 self.logger.debug(logging_text + step)
1199
1200 # n2vc_redesign STEP 5.1
1201 # wait for RO (ip-address) Insert pub_key into VM
1202 if vnfr_id:
1203 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1204 user=user, pub_key=pub_key)
1205 else:
1206 rw_mgmt_ip = None # This is for a NS configuration
1207
1208 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1209
1210 # store rw_mgmt_ip in deploy params for later replacement
1211 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1212
1213 # n2vc_redesign STEP 6 Execute initial config primitive
1214 step = 'execute initial config primitive'
1215 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1216
1217 # sort initial config primitives by 'seq'
1218 if initial_config_primitive_list:
1219 try:
1220 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1221 except Exception as e:
1222 self.logger.error(logging_text + step + ": " + str(e))
1223 else:
1224 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1225
1226 # add config if not present for NS charm
1227 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1228 vca_deployed)
1229
1230 # wait for dependent primitives execution (NS -> VNF -> VDU)
1231 if initial_config_primitive_list:
1232 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1233
1234 # stage, in function of element type: vdu, kdu, vnf or ns
1235 my_vca = vca_deployed_list[vca_index]
1236 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1237 # VDU or KDU
1238 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1239 elif my_vca.get("member-vnf-index"):
1240 # VNF
1241 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1242 else:
1243 # NS
1244 stage = 'Stage 5/5: running Day-1 primitives for NS'
1245
1246 self._write_configuration_status(
1247 nsr_id=nsr_id,
1248 vca_index=vca_index,
1249 status='EXECUTING PRIMITIVE'
1250 )
1251
1252 self._write_op_status(
1253 op_id=nslcmop_id,
1254 stage=stage
1255 )
1256
1257 for initial_config_primitive in initial_config_primitive_list:
1258 # adding information on the vca_deployed if it is a NS execution environment
1259 if not vca_deployed["member-vnf-index"]:
1260 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1261 # TODO check if already done
1262 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1263
1264 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1265 self.logger.debug(logging_text + step)
1266 await self.n2vc.exec_primitive(
1267 ee_id=ee_id,
1268 primitive_name=initial_config_primitive["name"],
1269 params_dict=primitive_params_,
1270 db_dict=db_dict
1271 )
1272
1273 # TODO register in database that primitive is done
1274
1275 step = "instantiated at VCA"
1276 self.logger.debug(logging_text + step)
1277
1278 self._write_configuration_status(
1279 nsr_id=nsr_id,
1280 vca_index=vca_index,
1281 status='READY'
1282 )
1283
1284 except Exception as e: # TODO not use Exception but N2VC exception
1285 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1286 self._write_configuration_status(
1287 nsr_id=nsr_id,
1288 vca_index=vca_index,
1289 status='BROKEN'
1290 )
1291 raise Exception("{} {}".format(step, e)) from e
1292 # TODO raise N2VC exception with 'step' extra information
1293
1294 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1295 error_description: str = None):
1296 try:
1297 db_dict = dict()
1298 if ns_state:
1299 db_dict["nsState"] = ns_state
1300 db_dict["currentOperation"] = current_operation
1301 db_dict["currentOperationID"] = current_operation_id
1302 db_dict["errorDescription"] = error_description
1303 self.update_db_2("nsrs", nsr_id, db_dict)
1304 except Exception as e:
1305 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1306
1307 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1308 try:
1309 db_dict = dict()
1310 db_dict['queuePosition'] = queuePosition
1311 db_dict['stage'] = stage
1312 if error_message:
1313 db_dict['errorMessage'] = error_message
1314 self.update_db_2("nslcmops", op_id, db_dict)
1315 except Exception as e:
1316 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1317
1318 def _write_all_config_status(self, nsr_id: str, status: str):
1319 try:
1320 # nsrs record
1321 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1322 # configurationStatus
1323 config_status = db_nsr.get('configurationStatus')
1324 if config_status:
1325 # update status
1326 db_dict = dict()
1327 db_dict['configurationStatus'] = list()
1328 for c in config_status:
1329 c['status'] = status
1330 db_dict['configurationStatus'].append(c)
1331 self.update_db_2("nsrs", nsr_id, db_dict)
1332
1333 except Exception as e:
1334 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1335
1336 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1337 element_under_configuration: str = None, element_type: str = None):
1338
1339 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1340 # .format(vca_index, status))
1341
1342 try:
1343 db_path = 'configurationStatus.{}.'.format(vca_index)
1344 db_dict = dict()
1345 if status:
1346 db_dict[db_path + 'status'] = status
1347 if element_under_configuration:
1348 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1349 if element_type:
1350 db_dict[db_path + 'elementType'] = element_type
1351 self.update_db_2("nsrs", nsr_id, db_dict)
1352 except Exception as e:
1353 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1354 .format(status, nsr_id, vca_index, e))
1355
1356 async def instantiate(self, nsr_id, nslcmop_id):
1357 """
1358
1359 :param nsr_id: ns instance to deploy
1360 :param nslcmop_id: operation to run
1361 :return:
1362 """
1363
1364 # Try to lock HA task here
1365 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1366 if not task_is_locked_by_me:
1367 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1368 return
1369
1370 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1371 self.logger.debug(logging_text + "Enter")
1372
1373 # get all needed from database
1374
1375 # database nsrs record
1376 db_nsr = None
1377
1378 # database nslcmops record
1379 db_nslcmop = None
1380
1381 # update operation on nsrs
1382 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1383 "_admin.current-operation": nslcmop_id,
1384 "_admin.operation-type": "instantiate"}
1385 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1386
1387 # update operation on nslcmops
1388 db_nslcmop_update = {}
1389
1390 nslcmop_operation_state = None
1391 db_vnfrs = {} # vnf's info indexed by member-index
1392 # n2vc_info = {}
1393 task_instantiation_list = []
1394 task_instantiation_info = {} # from task to info text
1395 exc = None
1396 try:
1397 # wait for any previous tasks in process
1398 step = "Waiting for previous operations to terminate"
1399 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1400
1401 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1402
1403 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1404 self._write_ns_status(
1405 nsr_id=nsr_id,
1406 ns_state="BUILDING",
1407 current_operation="INSTANTIATING",
1408 current_operation_id=nslcmop_id
1409 )
1410
1411 # read from db: operation
1412 step = "Getting nslcmop={} from db".format(nslcmop_id)
1413 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1414 ns_params = db_nslcmop.get("operationParams")
1415 if ns_params and ns_params.get("timeout_ns_deploy"):
1416 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1417 else:
1418 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1419
1420 # read from db: ns
1421 step = "Getting nsr={} from db".format(nsr_id)
1422 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1423 # nsd is replicated into ns (no db read)
1424 nsd = db_nsr["nsd"]
1425 # nsr_name = db_nsr["name"] # TODO short-name??
1426
1427 # read from db: vnf's of this ns
1428 step = "Getting vnfrs from db"
1429 self.logger.debug(logging_text + step)
1430 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1431
1432 # read from db: vnfd's for every vnf
1433 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1434 db_vnfds = {} # every vnfd data indexed by vnf id
1435 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1436
1437 self._write_op_status(
1438 op_id=nslcmop_id,
1439 stage='Stage 1/5: preparation of the environment',
1440 queuePosition=0
1441 )
1442
1443 # for each vnf in ns, read vnfd
1444 for vnfr in db_vnfrs_list:
1445 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1446 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1447 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1448 # if we haven't this vnfd, read it from db
1449 if vnfd_id not in db_vnfds:
1450 # read from db
1451 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1452 self.logger.debug(logging_text + step)
1453 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1454
1455 # store vnfd
1456 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1457 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1458 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1459
1460 # Get or generates the _admin.deployed.VCA list
1461 vca_deployed_list = None
1462 if db_nsr["_admin"].get("deployed"):
1463 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1464 if vca_deployed_list is None:
1465 vca_deployed_list = []
1466 configuration_status_list = []
1467 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1468 db_nsr_update["configurationStatus"] = configuration_status_list
1469 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1470 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1471 elif isinstance(vca_deployed_list, dict):
1472 # maintain backward compatibility. Change a dict to list at database
1473 vca_deployed_list = list(vca_deployed_list.values())
1474 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1475 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1476
1477 db_nsr_update["detailed-status"] = "creating"
1478 db_nsr_update["operational-status"] = "init"
1479
1480 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1481 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1482 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1483
1484 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1485 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1486 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1487
1488 # n2vc_redesign STEP 2 Deploy Network Scenario
1489
1490 self._write_op_status(
1491 op_id=nslcmop_id,
1492 stage='Stage 2/5: deployment of VMs and execution environments'
1493 )
1494
1495 self.logger.debug(logging_text + "Before deploy_kdus")
1496 # Call to deploy_kdus in case exists the "vdu:kdu" param
1497 task_kdu = asyncio.ensure_future(
1498 self.deploy_kdus(
1499 logging_text=logging_text,
1500 nsr_id=nsr_id,
1501 db_nsr=db_nsr,
1502 db_vnfrs=db_vnfrs,
1503 db_vnfds=db_vnfds
1504 )
1505 )
1506 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1507 task_instantiation_info[task_kdu] = "Deploy KDUs"
1508 task_instantiation_list.append(task_kdu)
1509 # n2vc_redesign STEP 1 Get VCA public ssh-key
1510 # feature 1429. Add n2vc public key to needed VMs
1511 n2vc_key = self.n2vc.get_public_key()
1512 n2vc_key_list = [n2vc_key]
1513 if self.vca_config.get("public_key"):
1514 n2vc_key_list.append(self.vca_config["public_key"])
1515
1516 task_ro = asyncio.ensure_future(
1517 self.instantiate_RO(
1518 logging_text=logging_text,
1519 nsr_id=nsr_id,
1520 nsd=nsd,
1521 db_nsr=db_nsr,
1522 db_nslcmop=db_nslcmop,
1523 db_vnfrs=db_vnfrs,
1524 db_vnfds_ref=db_vnfds_ref,
1525 n2vc_key_list=n2vc_key_list
1526 )
1527 )
1528 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1529 task_instantiation_info[task_ro] = "Deploy at VIM"
1530 task_instantiation_list.append(task_ro)
1531
1532 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1533 step = "Deploying proxy and native charms"
1534 self.logger.debug(logging_text + step)
1535
1536 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1537 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1538 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1539 vnfd_id = c_vnf["vnfd-id-ref"]
1540 vnfd = db_vnfds_ref[vnfd_id]
1541 member_vnf_index = str(c_vnf["member-vnf-index"])
1542 db_vnfr = db_vnfrs[member_vnf_index]
1543 base_folder = vnfd["_admin"]["storage"]
1544 vdu_id = None
1545 vdu_index = 0
1546 vdu_name = None
1547 kdu_name = None
1548
1549 # Get additional parameters
1550 deploy_params = {}
1551 if db_vnfr.get("additionalParamsForVnf"):
1552 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1553
1554 descriptor_config = vnfd.get("vnf-configuration")
1555 if descriptor_config and descriptor_config.get("juju"):
1556 self._deploy_n2vc(
1557 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1558 db_nsr=db_nsr,
1559 db_vnfr=db_vnfr,
1560 nslcmop_id=nslcmop_id,
1561 nsr_id=nsr_id,
1562 nsi_id=nsi_id,
1563 vnfd_id=vnfd_id,
1564 vdu_id=vdu_id,
1565 kdu_name=kdu_name,
1566 member_vnf_index=member_vnf_index,
1567 vdu_index=vdu_index,
1568 vdu_name=vdu_name,
1569 deploy_params=deploy_params,
1570 descriptor_config=descriptor_config,
1571 base_folder=base_folder,
1572 task_instantiation_list=task_instantiation_list,
1573 task_instantiation_info=task_instantiation_info
1574 )
1575
1576 # Deploy charms for each VDU that supports one.
1577 for vdud in get_iterable(vnfd, 'vdu'):
1578 vdu_id = vdud["id"]
1579 descriptor_config = vdud.get('vdu-configuration')
1580 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1581 if vdur.get("additionalParams"):
1582 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1583 else:
1584 deploy_params_vdu = deploy_params
1585 if descriptor_config and descriptor_config.get("juju"):
1586 # look for vdu index in the db_vnfr["vdu"] section
1587 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1588 # if vdur["vdu-id-ref"] == vdu_id:
1589 # break
1590 # else:
1591 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1592 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1593 # vdu_name = vdur.get("name")
1594 vdu_name = None
1595 kdu_name = None
1596 for vdu_index in range(int(vdud.get("count", 1))):
1597 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1598 self._deploy_n2vc(
1599 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1600 member_vnf_index, vdu_id, vdu_index),
1601 db_nsr=db_nsr,
1602 db_vnfr=db_vnfr,
1603 nslcmop_id=nslcmop_id,
1604 nsr_id=nsr_id,
1605 nsi_id=nsi_id,
1606 vnfd_id=vnfd_id,
1607 vdu_id=vdu_id,
1608 kdu_name=kdu_name,
1609 member_vnf_index=member_vnf_index,
1610 vdu_index=vdu_index,
1611 vdu_name=vdu_name,
1612 deploy_params=deploy_params_vdu,
1613 descriptor_config=descriptor_config,
1614 base_folder=base_folder,
1615 task_instantiation_list=task_instantiation_list,
1616 task_instantiation_info=task_instantiation_info
1617 )
1618 for kdud in get_iterable(vnfd, 'kdu'):
1619 kdu_name = kdud["name"]
1620 descriptor_config = kdud.get('kdu-configuration')
1621 if descriptor_config and descriptor_config.get("juju"):
1622 vdu_id = None
1623 vdu_index = 0
1624 vdu_name = None
1625 # look for vdu index in the db_vnfr["vdu"] section
1626 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1627 # if vdur["vdu-id-ref"] == vdu_id:
1628 # break
1629 # else:
1630 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1631 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1632 # vdu_name = vdur.get("name")
1633 # vdu_name = None
1634
1635 self._deploy_n2vc(
1636 logging_text=logging_text,
1637 db_nsr=db_nsr,
1638 db_vnfr=db_vnfr,
1639 nslcmop_id=nslcmop_id,
1640 nsr_id=nsr_id,
1641 nsi_id=nsi_id,
1642 vnfd_id=vnfd_id,
1643 vdu_id=vdu_id,
1644 kdu_name=kdu_name,
1645 member_vnf_index=member_vnf_index,
1646 vdu_index=vdu_index,
1647 vdu_name=vdu_name,
1648 deploy_params=deploy_params,
1649 descriptor_config=descriptor_config,
1650 base_folder=base_folder,
1651 task_instantiation_list=task_instantiation_list,
1652 task_instantiation_info=task_instantiation_info
1653 )
1654
1655 # Check if this NS has a charm configuration
1656 descriptor_config = nsd.get("ns-configuration")
1657 if descriptor_config and descriptor_config.get("juju"):
1658 vnfd_id = None
1659 db_vnfr = None
1660 member_vnf_index = None
1661 vdu_id = None
1662 kdu_name = None
1663 vdu_index = 0
1664 vdu_name = None
1665
1666 # Get additional parameters
1667 deploy_params = {}
1668 if db_nsr.get("additionalParamsForNs"):
1669 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1670 base_folder = nsd["_admin"]["storage"]
1671 self._deploy_n2vc(
1672 logging_text=logging_text,
1673 db_nsr=db_nsr,
1674 db_vnfr=db_vnfr,
1675 nslcmop_id=nslcmop_id,
1676 nsr_id=nsr_id,
1677 nsi_id=nsi_id,
1678 vnfd_id=vnfd_id,
1679 vdu_id=vdu_id,
1680 kdu_name=kdu_name,
1681 member_vnf_index=member_vnf_index,
1682 vdu_index=vdu_index,
1683 vdu_name=vdu_name,
1684 deploy_params=deploy_params,
1685 descriptor_config=descriptor_config,
1686 base_folder=base_folder,
1687 task_instantiation_list=task_instantiation_list,
1688 task_instantiation_info=task_instantiation_info
1689 )
1690
1691 # Wait until all tasks of "task_instantiation_list" have been finished
1692
1693 error_text_list = []
1694
1695 # let's begin with all OK
1696 instantiated_ok = True
1697 # let's begin with RO 'running' status (later we can change it)
1698 db_nsr_update["operational-status"] = "running"
1699 # let's begin with VCA 'configured' status (later we can change it)
1700 db_nsr_update["config-status"] = "configured"
1701
1702 step = "Waiting for tasks to be finished"
1703 if task_instantiation_list:
1704 # wait for all tasks completion
1705 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1706
1707 for task in pending:
1708 instantiated_ok = False
1709 if task in (task_ro, task_kdu):
1710 # RO or KDU task is pending
1711 db_nsr_update["operational-status"] = "failed"
1712 else:
1713 # A N2VC task is pending
1714 db_nsr_update["config-status"] = "failed"
1715 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1716 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1717 for task in done:
1718 if task.cancelled():
1719 instantiated_ok = False
1720 if task in (task_ro, task_kdu):
1721 # RO or KDU task was cancelled
1722 db_nsr_update["operational-status"] = "failed"
1723 else:
1724 # A N2VC was cancelled
1725 db_nsr_update["config-status"] = "failed"
1726 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1727 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1728 else:
1729 exc = task.exception()
1730 if exc:
1731 instantiated_ok = False
1732 if task in (task_ro, task_kdu):
1733 # RO or KDU task raised an exception
1734 db_nsr_update["operational-status"] = "failed"
1735 else:
1736 # A N2VC task raised an exception
1737 db_nsr_update["config-status"] = "failed"
1738 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1739
1740 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1741 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1742 else:
1743 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1744 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1745 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1746 else:
1747 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1748
1749 if error_text_list:
1750 error_text = "\n".join(error_text_list)
1751 db_nsr_update["detailed-status"] = error_text
1752 db_nslcmop_update["detailed-status"] = error_text
1753 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1754 db_nslcmop_update["statusEnteredTime"] = time()
1755 else:
1756 # all is done
1757 db_nsr_update["detailed-status"] = "done"
1758 db_nslcmop_update["detailed-status"] = "done"
1759 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1760 db_nslcmop_update["statusEnteredTime"] = time()
1761
1762 except (ROclient.ROClientException, DbException, LcmException) as e:
1763 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1764 exc = e
1765 except asyncio.CancelledError:
1766 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1767 exc = "Operation was cancelled"
1768 except Exception as e:
1769 exc = traceback.format_exc()
1770 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1771 exc_info=True)
1772 finally:
1773 if exc:
1774 if db_nsr:
1775 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1776 db_nsr_update["operational-status"] = "failed"
1777 db_nsr_update["config-status"] = "failed"
1778 if db_nslcmop:
1779 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1780 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1781 db_nslcmop_update["statusEnteredTime"] = time()
1782 try:
1783 if db_nsr:
1784 db_nsr_update["_admin.nslcmop"] = None
1785 db_nsr_update["_admin.current-operation"] = None
1786 db_nsr_update["_admin.operation-type"] = None
1787 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1788
1789 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1790 ns_state = None
1791 error_description = None
1792 if instantiated_ok:
1793 ns_state = "READY"
1794 else:
1795 ns_state = "BROKEN"
1796 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1797
1798 self._write_ns_status(
1799 nsr_id=nsr_id,
1800 ns_state=ns_state,
1801 current_operation="IDLE",
1802 current_operation_id=None,
1803 error_description=error_description
1804 )
1805
1806 self._write_op_status(
1807 op_id=nslcmop_id,
1808 error_message=error_description
1809 )
1810
1811 if db_nslcmop_update:
1812 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1813
1814 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1815
1816 except DbException as e:
1817 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1818
1819 if nslcmop_operation_state:
1820 try:
1821 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1822 "operationState": nslcmop_operation_state},
1823 loop=self.loop)
1824 except Exception as e:
1825 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1826
1827 self.logger.debug(logging_text + "Exit")
1828 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1829
1830 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1831
1832 # steps:
1833 # 1. find all relations for this VCA
1834 # 2. wait for other peers related
1835 # 3. add relations
1836
1837 try:
1838
1839 # STEP 1: find all relations for this VCA
1840
1841 # read nsr record
1842 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1843
1844 # this VCA data
1845 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1846
1847 # read all ns-configuration relations
1848 ns_relations = list()
1849 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1850 if db_ns_relations:
1851 for r in db_ns_relations:
1852 # check if this VCA is in the relation
1853 if my_vca.get('member-vnf-index') in\
1854 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1855 ns_relations.append(r)
1856
1857 # read all vnf-configuration relations
1858 vnf_relations = list()
1859 db_vnfd_list = db_nsr.get('vnfd-id')
1860 if db_vnfd_list:
1861 for vnfd in db_vnfd_list:
1862 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1863 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1864 if db_vnf_relations:
1865 for r in db_vnf_relations:
1866 # check if this VCA is in the relation
1867 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1868 vnf_relations.append(r)
1869
1870 # if no relations, terminate
1871 if not ns_relations and not vnf_relations:
1872 self.logger.debug(logging_text + ' No relations')
1873 return True
1874
1875 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1876
1877 # add all relations
1878 start = time()
1879 while True:
1880 # check timeout
1881 now = time()
1882 if now - start >= timeout:
1883 self.logger.error(logging_text + ' : timeout adding relations')
1884 return False
1885
1886 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1887 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1888
1889 # for each defined NS relation, find the VCA's related
1890 for r in ns_relations:
1891 from_vca_ee_id = None
1892 to_vca_ee_id = None
1893 from_vca_endpoint = None
1894 to_vca_endpoint = None
1895 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1896 for vca in vca_list:
1897 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1898 and vca.get('config_sw_installed'):
1899 from_vca_ee_id = vca.get('ee_id')
1900 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1901 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1902 and vca.get('config_sw_installed'):
1903 to_vca_ee_id = vca.get('ee_id')
1904 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1905 if from_vca_ee_id and to_vca_ee_id:
1906 # add relation
1907 await self.n2vc.add_relation(
1908 ee_id_1=from_vca_ee_id,
1909 ee_id_2=to_vca_ee_id,
1910 endpoint_1=from_vca_endpoint,
1911 endpoint_2=to_vca_endpoint)
1912 # remove entry from relations list
1913 ns_relations.remove(r)
1914 else:
1915 # check failed peers
1916 try:
1917 vca_status_list = db_nsr.get('configurationStatus')
1918 if vca_status_list:
1919 for i in range(len(vca_list)):
1920 vca = vca_list[i]
1921 vca_status = vca_status_list[i]
1922 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1923 if vca_status.get('status') == 'BROKEN':
1924 # peer broken: remove relation from list
1925 ns_relations.remove(r)
1926 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1927 if vca_status.get('status') == 'BROKEN':
1928 # peer broken: remove relation from list
1929 ns_relations.remove(r)
1930 except Exception:
1931 # ignore
1932 pass
1933
1934 # for each defined VNF relation, find the VCA's related
1935 for r in vnf_relations:
1936 from_vca_ee_id = None
1937 to_vca_ee_id = None
1938 from_vca_endpoint = None
1939 to_vca_endpoint = None
1940 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1941 for vca in vca_list:
1942 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1943 from_vca_ee_id = vca.get('ee_id')
1944 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1945 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1946 to_vca_ee_id = vca.get('ee_id')
1947 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1948 if from_vca_ee_id and to_vca_ee_id:
1949 # add relation
1950 await self.n2vc.add_relation(
1951 ee_id_1=from_vca_ee_id,
1952 ee_id_2=to_vca_ee_id,
1953 endpoint_1=from_vca_endpoint,
1954 endpoint_2=to_vca_endpoint)
1955 # remove entry from relations list
1956 vnf_relations.remove(r)
1957 else:
1958 # check failed peers
1959 try:
1960 vca_status_list = db_nsr.get('configurationStatus')
1961 if vca_status_list:
1962 for i in range(len(vca_list)):
1963 vca = vca_list[i]
1964 vca_status = vca_status_list[i]
1965 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
1966 if vca_status.get('status') == 'BROKEN':
1967 # peer broken: remove relation from list
1968 ns_relations.remove(r)
1969 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
1970 if vca_status.get('status') == 'BROKEN':
1971 # peer broken: remove relation from list
1972 ns_relations.remove(r)
1973 except Exception:
1974 # ignore
1975 pass
1976
1977 # wait for next try
1978 await asyncio.sleep(5.0)
1979
1980 if not ns_relations and not vnf_relations:
1981 self.logger.debug('Relations added')
1982 break
1983
1984 return True
1985
1986 except Exception as e:
1987 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
1988 return False
1989
1990 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
1991 # Launch kdus if present in the descriptor
1992
1993 deployed_ok = True
1994
1995 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1996
1997 def _get_cluster_id(cluster_id, cluster_type):
1998 nonlocal k8scluster_id_2_uuic
1999 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2000 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2001
2002 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2003 if not db_k8scluster:
2004 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2005 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2006 if not k8s_id:
2007 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2008 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2009 return k8s_id
2010
2011 logging_text += "Deploy kdus: "
2012 try:
2013 db_nsr_update = {"_admin.deployed.K8s": []}
2014 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2015
2016 # Look for all vnfds
2017 pending_tasks = {}
2018 index = 0
2019 for vnfr_data in db_vnfrs.values():
2020 for kdur in get_iterable(vnfr_data, "kdur"):
2021 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2022 kdumodel = None
2023 k8sclustertype = None
2024 error_text = None
2025 cluster_uuid = None
2026 vnfd_id = vnfr_data.get('vnfd-id')
2027 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2028 if kdur.get("helm-chart"):
2029 kdumodel = kdur["helm-chart"]
2030 k8sclustertype = "chart"
2031 k8sclustertype_full = "helm-chart"
2032 elif kdur.get("juju-bundle"):
2033 kdumodel = kdur["juju-bundle"]
2034 k8sclustertype = "juju"
2035 k8sclustertype_full = "juju-bundle"
2036 else:
2037 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
2038 " running"
2039 # check if kdumodel is a file and exists
2040 try:
2041 # path format: /vnfdid/pkkdir/kdumodel
2042 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
2043 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2044 kdumodel = self.fs.path + filename
2045 except Exception:
2046 # it is not a file
2047 pass
2048 try:
2049 if not error_text:
2050 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
2051 except LcmException as e:
2052 error_text = str(e)
2053 deployed_ok = False
2054
2055 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
2056
2057 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2058 "k8scluster-type": k8sclustertype,
2059 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2060 if error_text:
2061 k8s_instace_info["detailed-status"] = error_text
2062 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2063 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2064 if error_text:
2065 continue
2066
2067 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
2068 "{}".format(index)}
2069 if k8sclustertype == "chart":
2070 task = asyncio.ensure_future(
2071 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2072 params=desc_params, db_dict=db_dict, timeout=3600)
2073 )
2074 else:
2075 task = asyncio.ensure_future(
2076 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2077 atomic=True, params=desc_params,
2078 db_dict=db_dict, timeout=600,
2079 kdu_name=kdur["kdu-name"])
2080 )
2081
2082 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
2083 index += 1
2084
2085 if pending_tasks:
2086 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2087 pending_list = list(pending_tasks.keys())
2088 while pending_list:
2089 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
2090 return_when=asyncio.FIRST_COMPLETED)
2091 if not done_list: # timeout
2092 for task in pending_list:
2093 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
2094 deployed_ok = False
2095 break
2096 for task in done_list:
2097 exc = task.exception()
2098 if exc:
2099 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
2100 deployed_ok = False
2101 else:
2102 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
2103
2104 if not deployed_ok:
2105 raise LcmException('Cannot deploy KDUs')
2106
2107 except Exception as e:
2108 msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
2109 self.logger.error(msg)
2110 raise LcmException(msg)
2111 finally:
2112 if db_nsr_update:
2113 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2114
2115 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2116 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2117 base_folder, task_instantiation_list, task_instantiation_info):
2118 # launch instantiate_N2VC in a asyncio task and register task object
2119 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2120 # if not found, create one entry and update database
2121
2122 # fill db_nsr._admin.deployed.VCA.<index>
2123 vca_index = -1
2124 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2125 if not vca_deployed:
2126 continue
2127 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2128 vca_deployed.get("vdu_id") == vdu_id and \
2129 vca_deployed.get("kdu_name") == kdu_name and \
2130 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2131 break
2132 else:
2133 # not found, create one.
2134 vca_deployed = {
2135 "member-vnf-index": member_vnf_index,
2136 "vdu_id": vdu_id,
2137 "kdu_name": kdu_name,
2138 "vdu_count_index": vdu_index,
2139 "operational-status": "init", # TODO revise
2140 "detailed-status": "", # TODO revise
2141 "step": "initial-deploy", # TODO revise
2142 "vnfd_id": vnfd_id,
2143 "vdu_name": vdu_name,
2144 }
2145 vca_index += 1
2146
2147 # create VCA and configurationStatus in db
2148 db_dict = {
2149 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2150 "configurationStatus.{}".format(vca_index): dict()
2151 }
2152 self.update_db_2("nsrs", nsr_id, db_dict)
2153
2154 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2155
2156 # Launch task
2157 task_n2vc = asyncio.ensure_future(
2158 self.instantiate_N2VC(
2159 logging_text=logging_text,
2160 vca_index=vca_index,
2161 nsi_id=nsi_id,
2162 db_nsr=db_nsr,
2163 db_vnfr=db_vnfr,
2164 vdu_id=vdu_id,
2165 kdu_name=kdu_name,
2166 vdu_index=vdu_index,
2167 deploy_params=deploy_params,
2168 config_descriptor=descriptor_config,
2169 base_folder=base_folder,
2170 nslcmop_id=nslcmop_id
2171 )
2172 )
2173 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2174 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2175 task_instantiation_list.append(task_n2vc)
2176
2177 # Check if this VNFD has a configured terminate action
2178 def _has_terminate_config_primitive(self, vnfd):
2179 vnf_config = vnfd.get("vnf-configuration")
2180 if vnf_config and vnf_config.get("terminate-config-primitive"):
2181 return True
2182 else:
2183 return False
2184
2185 @staticmethod
2186 def _get_terminate_config_primitive_seq_list(vnfd):
2187 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2188 # No need to check for existing primitive twice, already done before
2189 vnf_config = vnfd.get("vnf-configuration")
2190 seq_list = vnf_config.get("terminate-config-primitive")
2191 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2192 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2193 return seq_list_sorted
2194
2195 @staticmethod
2196 def _create_nslcmop(nsr_id, operation, params):
2197 """
2198 Creates a ns-lcm-opp content to be stored at database.
2199 :param nsr_id: internal id of the instance
2200 :param operation: instantiate, terminate, scale, action, ...
2201 :param params: user parameters for the operation
2202 :return: dictionary following SOL005 format
2203 """
2204 # Raise exception if invalid arguments
2205 if not (nsr_id and operation and params):
2206 raise LcmException(
2207 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2208 now = time()
2209 _id = str(uuid4())
2210 nslcmop = {
2211 "id": _id,
2212 "_id": _id,
2213 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2214 "operationState": "PROCESSING",
2215 "statusEnteredTime": now,
2216 "nsInstanceId": nsr_id,
2217 "lcmOperationType": operation,
2218 "startTime": now,
2219 "isAutomaticInvocation": False,
2220 "operationParams": params,
2221 "isCancelPending": False,
2222 "links": {
2223 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2224 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2225 }
2226 }
2227 return nslcmop
2228
2229 def _format_additional_params(self, params):
2230 params = params or {}
2231 for key, value in params.items():
2232 if str(value).startswith("!!yaml "):
2233 params[key] = yaml.safe_load(value[7:])
2234 return params
2235
2236 def _get_terminate_primitive_params(self, seq, vnf_index):
2237 primitive = seq.get('name')
2238 primitive_params = {}
2239 params = {
2240 "member_vnf_index": vnf_index,
2241 "primitive": primitive,
2242 "primitive_params": primitive_params,
2243 }
2244 desc_params = {}
2245 return self._map_primitive_params(seq, params, desc_params)
2246
2247 # sub-operations
2248
2249 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2250 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2251 if (op.get('operationState') == 'COMPLETED'):
2252 # b. Skip sub-operation
2253 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2254 return self.SUBOPERATION_STATUS_SKIP
2255 else:
2256 # c. Reintent executing sub-operation
2257 # The sub-operation exists, and operationState != 'COMPLETED'
2258 # Update operationState = 'PROCESSING' to indicate a reintent.
2259 operationState = 'PROCESSING'
2260 detailed_status = 'In progress'
2261 self._update_suboperation_status(
2262 db_nslcmop, op_index, operationState, detailed_status)
2263 # Return the sub-operation index
2264 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2265 # with arguments extracted from the sub-operation
2266 return op_index
2267
2268 # Find a sub-operation where all keys in a matching dictionary must match
2269 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2270 def _find_suboperation(self, db_nslcmop, match):
2271 if (db_nslcmop and match):
2272 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2273 for i, op in enumerate(op_list):
2274 if all(op.get(k) == match[k] for k in match):
2275 return i
2276 return self.SUBOPERATION_STATUS_NOT_FOUND
2277
2278 # Update status for a sub-operation given its index
2279 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2280 # Update DB for HA tasks
2281 q_filter = {'_id': db_nslcmop['_id']}
2282 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2283 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2284 self.db.set_one("nslcmops",
2285 q_filter=q_filter,
2286 update_dict=update_dict,
2287 fail_on_empty=False)
2288
2289 # Add sub-operation, return the index of the added sub-operation
2290 # Optionally, set operationState, detailed-status, and operationType
2291 # Status and type are currently set for 'scale' sub-operations:
2292 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2293 # 'detailed-status' : status message
2294 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2295 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2296 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2297 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2298 RO_nsr_id=None, RO_scaling_info=None):
2299 if not (db_nslcmop):
2300 return self.SUBOPERATION_STATUS_NOT_FOUND
2301 # Get the "_admin.operations" list, if it exists
2302 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2303 op_list = db_nslcmop_admin.get('operations')
2304 # Create or append to the "_admin.operations" list
2305 new_op = {'member_vnf_index': vnf_index,
2306 'vdu_id': vdu_id,
2307 'vdu_count_index': vdu_count_index,
2308 'primitive': primitive,
2309 'primitive_params': mapped_primitive_params}
2310 if operationState:
2311 new_op['operationState'] = operationState
2312 if detailed_status:
2313 new_op['detailed-status'] = detailed_status
2314 if operationType:
2315 new_op['lcmOperationType'] = operationType
2316 if RO_nsr_id:
2317 new_op['RO_nsr_id'] = RO_nsr_id
2318 if RO_scaling_info:
2319 new_op['RO_scaling_info'] = RO_scaling_info
2320 if not op_list:
2321 # No existing operations, create key 'operations' with current operation as first list element
2322 db_nslcmop_admin.update({'operations': [new_op]})
2323 op_list = db_nslcmop_admin.get('operations')
2324 else:
2325 # Existing operations, append operation to list
2326 op_list.append(new_op)
2327
2328 db_nslcmop_update = {'_admin.operations': op_list}
2329 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2330 op_index = len(op_list) - 1
2331 return op_index
2332
2333 # Helper methods for scale() sub-operations
2334
2335 # pre-scale/post-scale:
2336 # Check for 3 different cases:
2337 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2338 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2339 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2340 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2341 operationType, RO_nsr_id=None, RO_scaling_info=None):
2342 # Find this sub-operation
2343 if (RO_nsr_id and RO_scaling_info):
2344 operationType = 'SCALE-RO'
2345 match = {
2346 'member_vnf_index': vnf_index,
2347 'RO_nsr_id': RO_nsr_id,
2348 'RO_scaling_info': RO_scaling_info,
2349 }
2350 else:
2351 match = {
2352 'member_vnf_index': vnf_index,
2353 'primitive': vnf_config_primitive,
2354 'primitive_params': primitive_params,
2355 'lcmOperationType': operationType
2356 }
2357 op_index = self._find_suboperation(db_nslcmop, match)
2358 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2359 # a. New sub-operation
2360 # The sub-operation does not exist, add it.
2361 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2362 # The following parameters are set to None for all kind of scaling:
2363 vdu_id = None
2364 vdu_count_index = None
2365 vdu_name = None
2366 if (RO_nsr_id and RO_scaling_info):
2367 vnf_config_primitive = None
2368 primitive_params = None
2369 else:
2370 RO_nsr_id = None
2371 RO_scaling_info = None
2372 # Initial status for sub-operation
2373 operationState = 'PROCESSING'
2374 detailed_status = 'In progress'
2375 # Add sub-operation for pre/post-scaling (zero or more operations)
2376 self._add_suboperation(db_nslcmop,
2377 vnf_index,
2378 vdu_id,
2379 vdu_count_index,
2380 vdu_name,
2381 vnf_config_primitive,
2382 primitive_params,
2383 operationState,
2384 detailed_status,
2385 operationType,
2386 RO_nsr_id,
2387 RO_scaling_info)
2388 return self.SUBOPERATION_STATUS_NEW
2389 else:
2390 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2391 # or op_index (operationState != 'COMPLETED')
2392 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2393
2394 # Function to return execution_environment id
2395
2396 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2397 for vca in vca_deployed_list:
2398 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2399 return vca["ee_id"]
2400
2401 # Helper methods for terminate()
2402
2403 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2404 """ Create a primitive with params from VNFD
2405 Called from terminate() before deleting instance
2406 Calls action() to execute the primitive """
2407 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2408 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2409 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2410 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2411 db_vnfds = {}
2412 # Loop over VNFRs
2413 for vnfr in db_vnfrs_list:
2414 vnfd_id = vnfr["vnfd-id"]
2415 vnf_index = vnfr["member-vnf-index-ref"]
2416 if vnfd_id not in db_vnfds:
2417 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2418 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2419 db_vnfds[vnfd_id] = vnfd
2420 vnfd = db_vnfds[vnfd_id]
2421 if not self._has_terminate_config_primitive(vnfd):
2422 continue
2423 # Get the primitive's sorted sequence list
2424 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2425 for seq in seq_list:
2426 # For each sequence in list, get primitive and call _ns_execute_primitive()
2427 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2428 vnf_index, seq.get("name"))
2429 self.logger.debug(logging_text + step)
2430 # Create the primitive for each sequence, i.e. "primitive": "touch"
2431 primitive = seq.get('name')
2432 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2433 # The following 3 parameters are currently set to None for 'terminate':
2434 # vdu_id, vdu_count_index, vdu_name
2435 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2436 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2437 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2438 # Add sub-operation
2439 self._add_suboperation(db_nslcmop,
2440 nslcmop_id,
2441 vnf_index,
2442 vdu_id,
2443 vdu_count_index,
2444 vdu_name,
2445 primitive,
2446 mapped_primitive_params)
2447 # Sub-operations: Call _ns_execute_primitive() instead of action()
2448 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2449 # nsr_deployed = db_nsr["_admin"]["deployed"]
2450
2451 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2452 # nsr_id, nslcmop_terminate_action_id)
2453 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2454 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2455 # if result not in result_ok:
2456 # raise LcmException(
2457 # "terminate_primitive_action for vnf_member_index={}",
2458 # " primitive={} fails with error {}".format(
2459 # vnf_index, seq.get("name"), result_detail))
2460
2461 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2462 try:
2463 await self.n2vc.exec_primitive(
2464 ee_id=ee_id,
2465 primitive_name=primitive,
2466 params_dict=mapped_primitive_params
2467 )
2468 except Exception as e:
2469 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2470 raise LcmException(
2471 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2472 .format(vnf_index, seq.get("name"), e),
2473 )
2474
2475 async def _delete_N2VC(self, nsr_id: str):
2476 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2477 namespace = "." + nsr_id
2478 await self.n2vc.delete_namespace(namespace=namespace)
2479 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2480
2481 async def terminate(self, nsr_id, nslcmop_id):
2482
2483 # Try to lock HA task here
2484 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2485 if not task_is_locked_by_me:
2486 return
2487
2488 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2489 self.logger.debug(logging_text + "Enter")
2490 db_nsr = None
2491 db_nslcmop = None
2492 exc = None
2493 failed_detail = [] # annotates all failed error messages
2494 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2495 "_admin.current-operation": nslcmop_id,
2496 "_admin.operation-type": "terminate"}
2497 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2498 db_nslcmop_update = {}
2499 nslcmop_operation_state = None
2500 autoremove = False # autoremove after terminated
2501 pending_tasks = []
2502 try:
2503 # wait for any previous tasks in process
2504 step = "Waiting for previous operations to terminate"
2505 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2506
2507 self._write_ns_status(
2508 nsr_id=nsr_id,
2509 ns_state="TERMINATING",
2510 current_operation="TERMINATING",
2511 current_operation_id=nslcmop_id
2512 )
2513 self._write_op_status(
2514 op_id=nslcmop_id,
2515 queuePosition=0
2516 )
2517
2518 step = "Getting nslcmop={} from db".format(nslcmop_id)
2519 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2520 step = "Getting nsr={} from db".format(nsr_id)
2521 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2522 # nsd = db_nsr["nsd"]
2523 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2524 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2525 return
2526 # #TODO check if VIM is creating and wait
2527 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2528 # Call internal terminate action
2529 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2530
2531 pending_tasks = []
2532
2533 db_nsr_update["operational-status"] = "terminating"
2534 db_nsr_update["config-status"] = "terminating"
2535
2536 # remove NS
2537 try:
2538 step = "delete execution environment"
2539 self.logger.debug(logging_text + step)
2540
2541 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2542 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2543
2544 pending_tasks.append(task_delete_ee)
2545 except Exception as e:
2546 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2547 self.logger.error(msg)
2548 failed_detail.append(msg)
2549
2550 try:
2551 # Delete from k8scluster
2552 step = "delete kdus"
2553 self.logger.debug(logging_text + step)
2554 # print(nsr_deployed)
2555 if nsr_deployed:
2556 for kdu in nsr_deployed.get("K8s", ()):
2557 kdu_instance = kdu.get("kdu-instance")
2558 if not kdu_instance:
2559 continue
2560 if kdu.get("k8scluster-type") == "chart":
2561 task_delete_kdu_instance = asyncio.ensure_future(
2562 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2563 kdu_instance=kdu_instance))
2564 elif kdu.get("k8scluster-type") == "juju":
2565 task_delete_kdu_instance = asyncio.ensure_future(
2566 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2567 kdu_instance=kdu_instance))
2568 else:
2569 self.error(logging_text + "Unknown k8s deployment type {}".
2570 format(kdu.get("k8scluster-type")))
2571 continue
2572 pending_tasks.append(task_delete_kdu_instance)
2573 except LcmException as e:
2574 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2575 self.logger.error(msg)
2576 failed_detail.append(msg)
2577
2578 # remove from RO
2579 RO_fail = False
2580
2581 # Delete ns
2582 RO_nsr_id = RO_delete_action = None
2583 if nsr_deployed and nsr_deployed.get("RO"):
2584 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2585 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2586 try:
2587 if RO_nsr_id:
2588 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2589 "Deleting ns from VIM"
2590 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2591 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2592 self.logger.debug(logging_text + step)
2593 desc = await self.RO.delete("ns", RO_nsr_id)
2594 RO_delete_action = desc["action_id"]
2595 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2596 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2597 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2598 if RO_delete_action:
2599 # wait until NS is deleted from VIM
2600 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2601 format(RO_nsr_id, RO_delete_action)
2602 detailed_status_old = None
2603 self.logger.debug(logging_text + step)
2604
2605 delete_timeout = 20 * 60 # 20 minutes
2606 while delete_timeout > 0:
2607 desc = await self.RO.show(
2608 "ns",
2609 item_id_name=RO_nsr_id,
2610 extra_item="action",
2611 extra_item_id=RO_delete_action)
2612
2613 # deploymentStatus
2614 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2615
2616 ns_status, ns_status_info = self.RO.check_action_status(desc)
2617 if ns_status == "ERROR":
2618 raise ROclient.ROClientException(ns_status_info)
2619 elif ns_status == "BUILD":
2620 detailed_status = step + "; {}".format(ns_status_info)
2621 elif ns_status == "ACTIVE":
2622 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2623 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2624 break
2625 else:
2626 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2627 if detailed_status != detailed_status_old:
2628 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2629 db_nsr_update["detailed-status"] = detailed_status
2630 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2631 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2632 await asyncio.sleep(5, loop=self.loop)
2633 delete_timeout -= 5
2634 else: # delete_timeout <= 0:
2635 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2636
2637 except ROclient.ROClientException as e:
2638 if e.http_code == 404: # not found
2639 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2640 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2641 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2642 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2643 elif e.http_code == 409: # conflict
2644 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2645 self.logger.debug(logging_text + failed_detail[-1])
2646 RO_fail = True
2647 else:
2648 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2649 self.logger.error(logging_text + failed_detail[-1])
2650 RO_fail = True
2651
2652 # Delete nsd
2653 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2654 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2655 try:
2656 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2657 "Deleting nsd from RO"
2658 await self.RO.delete("nsd", RO_nsd_id)
2659 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2660 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2661 except ROclient.ROClientException as e:
2662 if e.http_code == 404: # not found
2663 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2664 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2665 elif e.http_code == 409: # conflict
2666 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2667 self.logger.debug(logging_text + failed_detail[-1])
2668 RO_fail = True
2669 else:
2670 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2671 self.logger.error(logging_text + failed_detail[-1])
2672 RO_fail = True
2673
2674 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2675 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2676 if not vnf_deployed or not vnf_deployed["id"]:
2677 continue
2678 try:
2679 RO_vnfd_id = vnf_deployed["id"]
2680 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2681 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2682 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2683 await self.RO.delete("vnfd", RO_vnfd_id)
2684 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2685 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2686 except ROclient.ROClientException as e:
2687 if e.http_code == 404: # not found
2688 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2689 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2690 elif e.http_code == 409: # conflict
2691 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2692 self.logger.debug(logging_text + failed_detail[-1])
2693 else:
2694 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2695 self.logger.error(logging_text + failed_detail[-1])
2696
2697 if failed_detail:
2698 terminate_ok = False
2699 self.logger.error(logging_text + " ;".join(failed_detail))
2700 db_nsr_update["operational-status"] = "failed"
2701 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2702 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2703 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2704 db_nslcmop_update["statusEnteredTime"] = time()
2705 else:
2706 terminate_ok = True
2707 db_nsr_update["operational-status"] = "terminated"
2708 db_nsr_update["detailed-status"] = "Done"
2709 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2710 db_nslcmop_update["detailed-status"] = "Done"
2711 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2712 db_nslcmop_update["statusEnteredTime"] = time()
2713 if db_nslcmop["operationParams"].get("autoremove"):
2714 autoremove = True
2715
2716 except (ROclient.ROClientException, DbException, LcmException) as e:
2717 self.logger.error(logging_text + "Exit Exception {}".format(e))
2718 exc = e
2719 except asyncio.CancelledError:
2720 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2721 exc = "Operation was cancelled"
2722 except Exception as e:
2723 exc = traceback.format_exc()
2724 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2725 finally:
2726 if exc and db_nslcmop:
2727 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2728 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2729 db_nslcmop_update["statusEnteredTime"] = time()
2730 try:
2731 if db_nslcmop and db_nslcmop_update:
2732 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2733 if db_nsr:
2734 db_nsr_update["_admin.nslcmop"] = None
2735 db_nsr_update["_admin.current-operation"] = None
2736 db_nsr_update["_admin.operation-type"] = None
2737 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2738
2739 if terminate_ok:
2740 ns_state = "IDLE"
2741 error_description = None
2742 error_detail = None
2743 else:
2744 ns_state = "BROKEN"
2745 error_detail = "; ".join(failed_detail)
2746 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2747 .format(nslcmop_id, step, error_detail)
2748
2749 self._write_ns_status(
2750 nsr_id=nsr_id,
2751 ns_state=ns_state,
2752 current_operation="IDLE",
2753 current_operation_id=None,
2754 error_description=error_description
2755 )
2756
2757 self._write_op_status(
2758 op_id=nslcmop_id,
2759 error_message=error_description
2760 )
2761
2762 except DbException as e:
2763 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2764 if nslcmop_operation_state:
2765 try:
2766 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2767 "operationState": nslcmop_operation_state,
2768 "autoremove": autoremove},
2769 loop=self.loop)
2770 except Exception as e:
2771 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2772
2773 # wait for pending tasks
2774 done = None
2775 pending = None
2776 if pending_tasks:
2777 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2778 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2779 if not pending:
2780 self.logger.debug(logging_text + 'All tasks finished...')
2781 else:
2782 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2783
2784 self.logger.debug(logging_text + "Exit")
2785 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2786
2787 @staticmethod
2788 def _map_primitive_params(primitive_desc, params, instantiation_params):
2789 """
2790 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2791 The default-value is used. If it is between < > it look for a value at instantiation_params
2792 :param primitive_desc: portion of VNFD/NSD that describes primitive
2793 :param params: Params provided by user
2794 :param instantiation_params: Instantiation params provided by user
2795 :return: a dictionary with the calculated params
2796 """
2797 calculated_params = {}
2798 for parameter in primitive_desc.get("parameter", ()):
2799 param_name = parameter["name"]
2800 if param_name in params:
2801 calculated_params[param_name] = params[param_name]
2802 elif "default-value" in parameter or "value" in parameter:
2803 if "value" in parameter:
2804 calculated_params[param_name] = parameter["value"]
2805 else:
2806 calculated_params[param_name] = parameter["default-value"]
2807 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2808 and calculated_params[param_name].endswith(">"):
2809 if calculated_params[param_name][1:-1] in instantiation_params:
2810 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2811 else:
2812 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2813 format(calculated_params[param_name], primitive_desc["name"]))
2814 else:
2815 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2816 format(param_name, primitive_desc["name"]))
2817
2818 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2819 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2820 width=256)
2821 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2822 calculated_params[param_name] = calculated_params[param_name][7:]
2823
2824 # add always ns_config_info if primitive name is config
2825 if primitive_desc["name"] == "config":
2826 if "ns_config_info" in instantiation_params:
2827 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2828 return calculated_params
2829
2830 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2831 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2832
2833 # find vca_deployed record for this action
2834 try:
2835 for vca_deployed in db_deployed["VCA"]:
2836 if not vca_deployed:
2837 continue
2838 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2839 continue
2840 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2841 continue
2842 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2843 continue
2844 break
2845 else:
2846 # vca_deployed not found
2847 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2848 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2849
2850 # get ee_id
2851 ee_id = vca_deployed.get("ee_id")
2852 if not ee_id:
2853 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2854 "execution environment"
2855 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2856
2857 if primitive == "config":
2858 primitive_params = {"params": primitive_params}
2859
2860 while retries >= 0:
2861 try:
2862 output = await self.n2vc.exec_primitive(
2863 ee_id=ee_id,
2864 primitive_name=primitive,
2865 params_dict=primitive_params
2866 )
2867 # execution was OK
2868 break
2869 except Exception as e:
2870 retries -= 1
2871 if retries >= 0:
2872 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2873 # wait and retry
2874 await asyncio.sleep(retries_interval, loop=self.loop)
2875 else:
2876 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2877
2878 return output, 'OK'
2879
2880 except Exception as e:
2881 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2882
2883 async def action(self, nsr_id, nslcmop_id):
2884
2885 # Try to lock HA task here
2886 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2887 if not task_is_locked_by_me:
2888 return
2889
2890 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2891 self.logger.debug(logging_text + "Enter")
2892 # get all needed from database
2893 db_nsr = None
2894 db_nslcmop = None
2895 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2896 "_admin.current-operation": nslcmop_id,
2897 "_admin.operation-type": "action"}
2898 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2899 db_nslcmop_update = {}
2900 nslcmop_operation_state = None
2901 nslcmop_operation_state_detail = None
2902 exc = None
2903 try:
2904 # wait for any previous tasks in process
2905 step = "Waiting for previous operations to terminate"
2906 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2907
2908 self._write_ns_status(
2909 nsr_id=nsr_id,
2910 ns_state=None,
2911 current_operation="RUNNING ACTION",
2912 current_operation_id=nslcmop_id
2913 )
2914
2915 step = "Getting information from database"
2916 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2917 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2918
2919 nsr_deployed = db_nsr["_admin"].get("deployed")
2920 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2921 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2922 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2923 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2924 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2925
2926 if vnf_index:
2927 step = "Getting vnfr from database"
2928 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2929 step = "Getting vnfd from database"
2930 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2931 else:
2932 if db_nsr.get("nsd"):
2933 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2934 else:
2935 step = "Getting nsd from database"
2936 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2937
2938 # for backward compatibility
2939 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2940 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2941 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2942 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2943
2944 primitive = db_nslcmop["operationParams"]["primitive"]
2945 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2946
2947 # look for primitive
2948 config_primitive_desc = None
2949 if vdu_id:
2950 for vdu in get_iterable(db_vnfd, "vdu"):
2951 if vdu_id == vdu["id"]:
2952 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2953 if config_primitive["name"] == primitive:
2954 config_primitive_desc = config_primitive
2955 break
2956 elif kdu_name:
2957 self.logger.debug(logging_text + "Checking actions in KDUs")
2958 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2959 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2960 if primitive_params:
2961 desc_params.update(primitive_params)
2962 # TODO Check if we will need something at vnf level
2963 index = 0
2964 for kdu in get_iterable(nsr_deployed, "K8s"):
2965 if kdu_name == kdu["kdu-name"]:
2966 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2967 "path": "_admin.deployed.K8s.{}".format(index)}
2968 if primitive == "upgrade":
2969 if desc_params.get("kdu_model"):
2970 kdu_model = desc_params.get("kdu_model")
2971 del desc_params["kdu_model"]
2972 else:
2973 kdu_model = kdu.get("kdu-model")
2974 parts = kdu_model.split(sep=":")
2975 if len(parts) == 2:
2976 kdu_model = parts[0]
2977
2978 if kdu.get("k8scluster-type") == "chart":
2979 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2980 kdu_instance=kdu.get("kdu-instance"),
2981 atomic=True, kdu_model=kdu_model,
2982 params=desc_params, db_dict=db_dict,
2983 timeout=300)
2984 elif kdu.get("k8scluster-type") == "juju":
2985 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2986 kdu_instance=kdu.get("kdu-instance"),
2987 atomic=True, kdu_model=kdu_model,
2988 params=desc_params, db_dict=db_dict,
2989 timeout=300)
2990
2991 else:
2992 msg = "k8scluster-type not defined"
2993 raise LcmException(msg)
2994
2995 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2996 break
2997 elif primitive == "rollback":
2998 if kdu.get("k8scluster-type") == "chart":
2999 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3000 kdu_instance=kdu.get("kdu-instance"),
3001 db_dict=db_dict)
3002 elif kdu.get("k8scluster-type") == "juju":
3003 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3004 kdu_instance=kdu.get("kdu-instance"),
3005 db_dict=db_dict)
3006 else:
3007 msg = "k8scluster-type not defined"
3008 raise LcmException(msg)
3009 break
3010 elif primitive == "status":
3011 if kdu.get("k8scluster-type") == "chart":
3012 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3013 kdu_instance=kdu.get("kdu-instance"))
3014 elif kdu.get("k8scluster-type") == "juju":
3015 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3016 kdu_instance=kdu.get("kdu-instance"))
3017 else:
3018 msg = "k8scluster-type not defined"
3019 raise LcmException(msg)
3020 break
3021 index += 1
3022
3023 else:
3024 raise LcmException("KDU '{}' not found".format(kdu_name))
3025 if output:
3026 db_nslcmop_update["detailed-status"] = output
3027 db_nslcmop_update["operationState"] = 'COMPLETED'
3028 db_nslcmop_update["statusEnteredTime"] = time()
3029 else:
3030 db_nslcmop_update["detailed-status"] = ''
3031 db_nslcmop_update["operationState"] = 'FAILED'
3032 db_nslcmop_update["statusEnteredTime"] = time()
3033 return
3034 elif vnf_index:
3035 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3036 if config_primitive["name"] == primitive:
3037 config_primitive_desc = config_primitive
3038 break
3039 else:
3040 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3041 if config_primitive["name"] == primitive:
3042 config_primitive_desc = config_primitive
3043 break
3044
3045 if not config_primitive_desc:
3046 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3047 format(primitive))
3048
3049 desc_params = {}
3050 if vnf_index:
3051 if db_vnfr.get("additionalParamsForVnf"):
3052 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3053 if vdu_id:
3054 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3055 if vdur.get("additionalParams"):
3056 desc_params = self._format_additional_params(vdur["additionalParams"])
3057 else:
3058 if db_nsr.get("additionalParamsForNs"):
3059 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3060
3061 # TODO check if ns is in a proper status
3062 output, detail = await self._ns_execute_primitive(
3063 db_deployed=nsr_deployed,
3064 member_vnf_index=vnf_index,
3065 vdu_id=vdu_id,
3066 vdu_name=vdu_name,
3067 vdu_count_index=vdu_count_index,
3068 primitive=primitive,
3069 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3070
3071 detailed_status = output
3072 if detail == 'OK':
3073 result = 'COMPLETED'
3074 else:
3075 result = 'FAILED'
3076
3077 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3078 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3079 db_nslcmop_update["statusEnteredTime"] = time()
3080 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3081 return # database update is called inside finally
3082
3083 except (DbException, LcmException) as e:
3084 self.logger.error(logging_text + "Exit Exception {}".format(e))
3085 exc = e
3086 except asyncio.CancelledError:
3087 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3088 exc = "Operation was cancelled"
3089 except Exception as e:
3090 exc = traceback.format_exc()
3091 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3092 finally:
3093 if exc and db_nslcmop:
3094 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3095 "FAILED {}: {}".format(step, exc)
3096 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3097 db_nslcmop_update["statusEnteredTime"] = time()
3098 try:
3099 if db_nslcmop_update:
3100 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3101 if db_nsr:
3102 db_nsr_update["_admin.nslcmop"] = None
3103 db_nsr_update["_admin.operation-type"] = None
3104 db_nsr_update["_admin.nslcmop"] = None
3105 db_nsr_update["_admin.current-operation"] = None
3106 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3107 self._write_ns_status(
3108 nsr_id=nsr_id,
3109 ns_state=None,
3110 current_operation="IDLE",
3111 current_operation_id=None
3112 )
3113 if exc:
3114 self._write_op_status(
3115 op_id=nslcmop_id,
3116 error_message=nslcmop_operation_state_detail
3117 )
3118 except DbException as e:
3119 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3120 self.logger.debug(logging_text + "Exit")
3121 if nslcmop_operation_state:
3122 try:
3123 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3124 "operationState": nslcmop_operation_state},
3125 loop=self.loop)
3126 except Exception as e:
3127 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3128 self.logger.debug(logging_text + "Exit")
3129 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3130 return nslcmop_operation_state, nslcmop_operation_state_detail
3131
3132 async def scale(self, nsr_id, nslcmop_id):
3133
3134 # Try to lock HA task here
3135 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3136 if not task_is_locked_by_me:
3137 return
3138
3139 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3140 self.logger.debug(logging_text + "Enter")
3141 # get all needed from database
3142 db_nsr = None
3143 db_nslcmop = None
3144 db_nslcmop_update = {}
3145 nslcmop_operation_state = None
3146 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3147 "_admin.current-operation": nslcmop_id,
3148 "_admin.operation-type": "scale"}
3149 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3150 exc = None
3151 # in case of error, indicates what part of scale was failed to put nsr at error status
3152 scale_process = None
3153 old_operational_status = ""
3154 old_config_status = ""
3155 vnfr_scaled = False
3156 try:
3157 # wait for any previous tasks in process
3158 step = "Waiting for previous operations to terminate"
3159 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3160
3161 self._write_ns_status(
3162 nsr_id=nsr_id,
3163 ns_state=None,
3164 current_operation="SCALING",
3165 current_operation_id=nslcmop_id
3166 )
3167
3168 step = "Getting nslcmop from database"
3169 self.logger.debug(step + " after having waited for previous tasks to be completed")
3170 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3171 step = "Getting nsr from database"
3172 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3173
3174 old_operational_status = db_nsr["operational-status"]
3175 old_config_status = db_nsr["config-status"]
3176 step = "Parsing scaling parameters"
3177 # self.logger.debug(step)
3178 db_nsr_update["operational-status"] = "scaling"
3179 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3180 nsr_deployed = db_nsr["_admin"].get("deployed")
3181
3182 #######
3183 nsr_deployed = db_nsr["_admin"].get("deployed")
3184 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3185 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3186 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3187 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3188 #######
3189
3190 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3191 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3192 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3193 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3194 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3195
3196 # for backward compatibility
3197 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3198 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3199 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3201
3202 step = "Getting vnfr from database"
3203 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3204 step = "Getting vnfd from database"
3205 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3206
3207 step = "Getting scaling-group-descriptor"
3208 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3209 if scaling_descriptor["name"] == scaling_group:
3210 break
3211 else:
3212 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3213 "at vnfd:scaling-group-descriptor".format(scaling_group))
3214
3215 # cooldown_time = 0
3216 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3217 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3218 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3219 # break
3220
3221 # TODO check if ns is in a proper status
3222 step = "Sending scale order to VIM"
3223 nb_scale_op = 0
3224 if not db_nsr["_admin"].get("scaling-group"):
3225 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3226 admin_scale_index = 0
3227 else:
3228 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3229 if admin_scale_info["name"] == scaling_group:
3230 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3231 break
3232 else: # not found, set index one plus last element and add new entry with the name
3233 admin_scale_index += 1
3234 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3235 RO_scaling_info = []
3236 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3237 if scaling_type == "SCALE_OUT":
3238 # count if max-instance-count is reached
3239 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3240 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3241 if nb_scale_op >= max_instance_count:
3242 raise LcmException("reached the limit of {} (max-instance-count) "
3243 "scaling-out operations for the "
3244 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3245
3246 nb_scale_op += 1
3247 vdu_scaling_info["scaling_direction"] = "OUT"
3248 vdu_scaling_info["vdu-create"] = {}
3249 for vdu_scale_info in scaling_descriptor["vdu"]:
3250 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3251 "type": "create", "count": vdu_scale_info.get("count", 1)})
3252 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3253
3254 elif scaling_type == "SCALE_IN":
3255 # count if min-instance-count is reached
3256 min_instance_count = 0
3257 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3258 min_instance_count = int(scaling_descriptor["min-instance-count"])
3259 if nb_scale_op <= min_instance_count:
3260 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3261 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3262 nb_scale_op -= 1
3263 vdu_scaling_info["scaling_direction"] = "IN"
3264 vdu_scaling_info["vdu-delete"] = {}
3265 for vdu_scale_info in scaling_descriptor["vdu"]:
3266 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3267 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3268 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3269
3270 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3271 vdu_create = vdu_scaling_info.get("vdu-create")
3272 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3273 if vdu_scaling_info["scaling_direction"] == "IN":
3274 for vdur in reversed(db_vnfr["vdur"]):
3275 if vdu_delete.get(vdur["vdu-id-ref"]):
3276 vdu_delete[vdur["vdu-id-ref"]] -= 1
3277 vdu_scaling_info["vdu"].append({
3278 "name": vdur["name"],
3279 "vdu_id": vdur["vdu-id-ref"],
3280 "interface": []
3281 })
3282 for interface in vdur["interfaces"]:
3283 vdu_scaling_info["vdu"][-1]["interface"].append({
3284 "name": interface["name"],
3285 "ip_address": interface["ip-address"],
3286 "mac_address": interface.get("mac-address"),
3287 })
3288 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3289
3290 # PRE-SCALE BEGIN
3291 step = "Executing pre-scale vnf-config-primitive"
3292 if scaling_descriptor.get("scaling-config-action"):
3293 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3294 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3295 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3296 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3297 step = db_nslcmop_update["detailed-status"] = \
3298 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3299
3300 # look for primitive
3301 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3302 if config_primitive["name"] == vnf_config_primitive:
3303 break
3304 else:
3305 raise LcmException(
3306 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3307 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3308 "primitive".format(scaling_group, config_primitive))
3309
3310 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3311 if db_vnfr.get("additionalParamsForVnf"):
3312 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3313
3314 scale_process = "VCA"
3315 db_nsr_update["config-status"] = "configuring pre-scaling"
3316 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3317
3318 # Pre-scale reintent check: Check if this sub-operation has been executed before
3319 op_index = self._check_or_add_scale_suboperation(
3320 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3321 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3322 # Skip sub-operation
3323 result = 'COMPLETED'
3324 result_detail = 'Done'
3325 self.logger.debug(logging_text +
3326 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3327 vnf_config_primitive, result, result_detail))
3328 else:
3329 if (op_index == self.SUBOPERATION_STATUS_NEW):
3330 # New sub-operation: Get index of this sub-operation
3331 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3332 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3333 format(vnf_config_primitive))
3334 else:
3335 # Reintent: Get registered params for this existing sub-operation
3336 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3337 vnf_index = op.get('member_vnf_index')
3338 vnf_config_primitive = op.get('primitive')
3339 primitive_params = op.get('primitive_params')
3340 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3341 format(vnf_config_primitive))
3342 # Execute the primitive, either with new (first-time) or registered (reintent) args
3343 result, result_detail = await self._ns_execute_primitive(
3344 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3345 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3346 vnf_config_primitive, result, result_detail))
3347 # Update operationState = COMPLETED | FAILED
3348 self._update_suboperation_status(
3349 db_nslcmop, op_index, result, result_detail)
3350
3351 if result == "FAILED":
3352 raise LcmException(result_detail)
3353 db_nsr_update["config-status"] = old_config_status
3354 scale_process = None
3355 # PRE-SCALE END
3356
3357 # SCALE RO - BEGIN
3358 # Should this block be skipped if 'RO_nsr_id' == None ?
3359 # if (RO_nsr_id and RO_scaling_info):
3360 if RO_scaling_info:
3361 scale_process = "RO"
3362 # Scale RO reintent check: Check if this sub-operation has been executed before
3363 op_index = self._check_or_add_scale_suboperation(
3364 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3365 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3366 # Skip sub-operation
3367 result = 'COMPLETED'
3368 result_detail = 'Done'
3369 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3370 result, result_detail))
3371 else:
3372 if (op_index == self.SUBOPERATION_STATUS_NEW):
3373 # New sub-operation: Get index of this sub-operation
3374 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3375 self.logger.debug(logging_text + "New sub-operation RO")
3376 else:
3377 # Reintent: Get registered params for this existing sub-operation
3378 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3379 RO_nsr_id = op.get('RO_nsr_id')
3380 RO_scaling_info = op.get('RO_scaling_info')
3381 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3382 vnf_config_primitive))
3383
3384 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3385 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3386 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3387 # wait until ready
3388 RO_nslcmop_id = RO_desc["instance_action_id"]
3389 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3390
3391 RO_task_done = False
3392 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3393 detailed_status_old = None
3394 self.logger.debug(logging_text + step)
3395
3396 deployment_timeout = 1 * 3600 # One hour
3397 while deployment_timeout > 0:
3398 if not RO_task_done:
3399 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3400 extra_item_id=RO_nslcmop_id)
3401
3402 # deploymentStatus
3403 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3404
3405 ns_status, ns_status_info = self.RO.check_action_status(desc)
3406 if ns_status == "ERROR":
3407 raise ROclient.ROClientException(ns_status_info)
3408 elif ns_status == "BUILD":
3409 detailed_status = step + "; {}".format(ns_status_info)
3410 elif ns_status == "ACTIVE":
3411 RO_task_done = True
3412 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3413 self.logger.debug(logging_text + step)
3414 else:
3415 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3416 else:
3417
3418 if ns_status == "ERROR":
3419 raise ROclient.ROClientException(ns_status_info)
3420 elif ns_status == "BUILD":
3421 detailed_status = step + "; {}".format(ns_status_info)
3422 elif ns_status == "ACTIVE":
3423 step = detailed_status = \
3424 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3425 if not vnfr_scaled:
3426 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3427 vnfr_scaled = True
3428 try:
3429 desc = await self.RO.show("ns", RO_nsr_id)
3430
3431 # deploymentStatus
3432 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3433
3434 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3435 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3436 break
3437 except LcmExceptionNoMgmtIP:
3438 pass
3439 else:
3440 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3441 if detailed_status != detailed_status_old:
3442 self._update_suboperation_status(
3443 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3444 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3445 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3446
3447 await asyncio.sleep(5, loop=self.loop)
3448 deployment_timeout -= 5
3449 if deployment_timeout <= 0:
3450 self._update_suboperation_status(
3451 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3452 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3453
3454 # update VDU_SCALING_INFO with the obtained ip_addresses
3455 if vdu_scaling_info["scaling_direction"] == "OUT":
3456 for vdur in reversed(db_vnfr["vdur"]):
3457 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3458 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3459 vdu_scaling_info["vdu"].append({
3460 "name": vdur["name"],
3461 "vdu_id": vdur["vdu-id-ref"],
3462 "interface": []
3463 })
3464 for interface in vdur["interfaces"]:
3465 vdu_scaling_info["vdu"][-1]["interface"].append({
3466 "name": interface["name"],
3467 "ip_address": interface["ip-address"],
3468 "mac_address": interface.get("mac-address"),
3469 })
3470 del vdu_scaling_info["vdu-create"]
3471
3472 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3473 # SCALE RO - END
3474
3475 scale_process = None
3476 if db_nsr_update:
3477 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3478
3479 # POST-SCALE BEGIN
3480 # execute primitive service POST-SCALING
3481 step = "Executing post-scale vnf-config-primitive"
3482 if scaling_descriptor.get("scaling-config-action"):
3483 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3484 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3485 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3486 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3487 step = db_nslcmop_update["detailed-status"] = \
3488 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3489
3490 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3491 if db_vnfr.get("additionalParamsForVnf"):
3492 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3493
3494 # look for primitive
3495 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3496 if config_primitive["name"] == vnf_config_primitive:
3497 break
3498 else:
3499 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3500 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3501 "match any vnf-configuration:config-primitive".format(scaling_group,
3502 config_primitive))
3503 scale_process = "VCA"
3504 db_nsr_update["config-status"] = "configuring post-scaling"
3505 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3506
3507 # Post-scale reintent check: Check if this sub-operation has been executed before
3508 op_index = self._check_or_add_scale_suboperation(
3509 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3510 if op_index == self.SUBOPERATION_STATUS_SKIP:
3511 # Skip sub-operation
3512 result = 'COMPLETED'
3513 result_detail = 'Done'
3514 self.logger.debug(logging_text +
3515 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3516 format(vnf_config_primitive, result, result_detail))
3517 else:
3518 if op_index == self.SUBOPERATION_STATUS_NEW:
3519 # New sub-operation: Get index of this sub-operation
3520 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3521 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3522 format(vnf_config_primitive))
3523 else:
3524 # Reintent: Get registered params for this existing sub-operation
3525 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3526 vnf_index = op.get('member_vnf_index')
3527 vnf_config_primitive = op.get('primitive')
3528 primitive_params = op.get('primitive_params')
3529 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3530 format(vnf_config_primitive))
3531 # Execute the primitive, either with new (first-time) or registered (reintent) args
3532 result, result_detail = await self._ns_execute_primitive(
3533 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3534 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3535 vnf_config_primitive, result, result_detail))
3536 # Update operationState = COMPLETED | FAILED
3537 self._update_suboperation_status(
3538 db_nslcmop, op_index, result, result_detail)
3539
3540 if result == "FAILED":
3541 raise LcmException(result_detail)
3542 db_nsr_update["config-status"] = old_config_status
3543 scale_process = None
3544 # POST-SCALE END
3545
3546 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3547 db_nslcmop_update["statusEnteredTime"] = time()
3548 db_nslcmop_update["detailed-status"] = "done"
3549 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3550 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3551 else old_operational_status
3552 db_nsr_update["config-status"] = old_config_status
3553 return
3554 except (ROclient.ROClientException, DbException, LcmException) as e:
3555 self.logger.error(logging_text + "Exit Exception {}".format(e))
3556 exc = e
3557 except asyncio.CancelledError:
3558 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3559 exc = "Operation was cancelled"
3560 except Exception as e:
3561 exc = traceback.format_exc()
3562 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3563 finally:
3564 self._write_ns_status(
3565 nsr_id=nsr_id,
3566 ns_state=None,
3567 current_operation="IDLE",
3568 current_operation_id=None
3569 )
3570 if exc:
3571 if db_nslcmop:
3572 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3573 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3574 db_nslcmop_update["statusEnteredTime"] = time()
3575 if db_nsr:
3576 db_nsr_update["operational-status"] = old_operational_status
3577 db_nsr_update["config-status"] = old_config_status
3578 db_nsr_update["detailed-status"] = ""
3579 db_nsr_update["_admin.nslcmop"] = None
3580 if scale_process:
3581 if "VCA" in scale_process:
3582 db_nsr_update["config-status"] = "failed"
3583 if "RO" in scale_process:
3584 db_nsr_update["operational-status"] = "failed"
3585 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3586 exc)
3587 try:
3588 if db_nslcmop and db_nslcmop_update:
3589 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3590 if db_nsr:
3591 db_nsr_update["_admin.current-operation"] = None
3592 db_nsr_update["_admin.operation-type"] = None
3593 db_nsr_update["_admin.nslcmop"] = None
3594 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3595
3596 self._write_ns_status(
3597 nsr_id=nsr_id,
3598 ns_state=None,
3599 current_operation="IDLE",
3600 current_operation_id=None
3601 )
3602
3603 except DbException as e:
3604 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3605 if nslcmop_operation_state:
3606 try:
3607 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3608 "operationState": nslcmop_operation_state},
3609 loop=self.loop)
3610 # if cooldown_time:
3611 # await asyncio.sleep(cooldown_time, loop=self.loop)
3612 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3613 except Exception as e:
3614 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3615 self.logger.debug(logging_text + "Exit")
3616 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")