bug 1045. Show early errors at ns-list when processing
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52
53 SUBOPERATION_STATUS_NOT_FOUND = -1
54 SUBOPERATION_STATUS_NEW = -2
55 SUBOPERATION_STATUS_SKIP = -3
56 task_name_deploy_vca = "Deploying VCA"
57
58 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 super().__init__(
65 db=db,
66 msg=msg,
67 fs=fs,
68 logger=logging.getLogger('lcm.ns')
69 )
70
71 self.loop = loop
72 self.lcm_tasks = lcm_tasks
73 self.timeout = config["timeout"]
74 self.ro_config = config["ro_config"]
75 self.vca_config = config["VCA"].copy()
76
77 # create N2VC connector
78 self.n2vc = N2VCJujuConnector(
79 db=self.db,
80 fs=self.fs,
81 log=self.logger,
82 loop=self.loop,
83 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
84 username=self.vca_config.get('user', None),
85 vca_config=self.vca_config,
86 on_update_db=self._on_update_n2vc_db
87 )
88
89 self.k8sclusterhelm = K8sHelmConnector(
90 kubectl_command=self.vca_config.get("kubectlpath"),
91 helm_command=self.vca_config.get("helmpath"),
92 fs=self.fs,
93 log=self.logger,
94 db=self.db,
95 on_update_db=None,
96 )
97
98 self.k8sclusterjuju = K8sJujuConnector(
99 kubectl_command=self.vca_config.get("kubectlpath"),
100 juju_command=self.vca_config.get("jujupath"),
101 fs=self.fs,
102 log=self.logger,
103 db=self.db,
104 on_update_db=None,
105 )
106
107 self.k8scluster_map = {
108 "helm-chart": self.k8sclusterhelm,
109 "chart": self.k8sclusterhelm,
110 "juju-bundle": self.k8sclusterjuju,
111 "juju": self.k8sclusterjuju,
112 }
113 # create RO client
114 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
115
116 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
117
118 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
119
120 try:
121 # TODO filter RO descriptor fields...
122
123 # write to database
124 db_dict = dict()
125 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
126 db_dict['deploymentStatus'] = ro_descriptor
127 self.update_db_2("nsrs", nsrs_id, db_dict)
128
129 except Exception as e:
130 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
131
132 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
133
134 # remove last dot from path (if exists)
135 if path.endswith('.'):
136 path = path[:-1]
137
138 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
139 # .format(table, filter, path, updated_data))
140
141 try:
142
143 nsr_id = filter.get('_id')
144
145 # read ns record from database
146 nsr = self.db.get_one(table='nsrs', q_filter=filter)
147 current_ns_status = nsr.get('nsState')
148
149 # get vca status for NS
150 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
151
152 # vcaStatus
153 db_dict = dict()
154 db_dict['vcaStatus'] = status_dict
155
156 # update configurationStatus for this VCA
157 try:
158 vca_index = int(path[path.rfind(".")+1:])
159
160 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
161 vca_status = vca_list[vca_index].get('status')
162
163 configuration_status_list = nsr.get('configurationStatus')
164 config_status = configuration_status_list[vca_index].get('status')
165
166 if config_status == 'BROKEN' and vca_status != 'failed':
167 db_dict['configurationStatus'][vca_index] = 'READY'
168 elif config_status != 'BROKEN' and vca_status == 'failed':
169 db_dict['configurationStatus'][vca_index] = 'BROKEN'
170 except Exception as e:
171 # not update configurationStatus
172 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
173
174 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
175 # if nsState = 'DEGRADED' check if all is OK
176 is_degraded = False
177 if current_ns_status in ('READY', 'DEGRADED'):
178 error_description = ''
179 # check machines
180 if status_dict.get('machines'):
181 for machine_id in status_dict.get('machines'):
182 machine = status_dict.get('machines').get(machine_id)
183 # check machine agent-status
184 if machine.get('agent-status'):
185 s = machine.get('agent-status').get('status')
186 if s != 'started':
187 is_degraded = True
188 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
189 # check machine instance status
190 if machine.get('instance-status'):
191 s = machine.get('instance-status').get('status')
192 if s != 'running':
193 is_degraded = True
194 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
195 # check applications
196 if status_dict.get('applications'):
197 for app_id in status_dict.get('applications'):
198 app = status_dict.get('applications').get(app_id)
199 # check application status
200 if app.get('status'):
201 s = app.get('status').get('status')
202 if s != 'active':
203 is_degraded = True
204 error_description += 'application {} status={} ; '.format(app_id, s)
205
206 if error_description:
207 db_dict['errorDescription'] = error_description
208 if current_ns_status == 'READY' and is_degraded:
209 db_dict['nsState'] = 'DEGRADED'
210 if current_ns_status == 'DEGRADED' and not is_degraded:
211 db_dict['nsState'] = 'READY'
212
213 # write to database
214 self.update_db_2("nsrs", nsr_id, db_dict)
215
216 except Exception as e:
217 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
218
219 return
220
221 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
222 """
223 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
224 :param vnfd: input vnfd
225 :param new_id: overrides vnf id if provided
226 :param additionalParams: Instantiation params for VNFs provided
227 :param nsrId: Id of the NSR
228 :return: copy of vnfd
229 """
230 try:
231 vnfd_RO = deepcopy(vnfd)
232 # remove unused by RO configuration, monitoring, scaling and internal keys
233 vnfd_RO.pop("_id", None)
234 vnfd_RO.pop("_admin", None)
235 vnfd_RO.pop("vnf-configuration", None)
236 vnfd_RO.pop("monitoring-param", None)
237 vnfd_RO.pop("scaling-group-descriptor", None)
238 vnfd_RO.pop("kdu", None)
239 vnfd_RO.pop("k8s-cluster", None)
240 if new_id:
241 vnfd_RO["id"] = new_id
242
243 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
244 for vdu in get_iterable(vnfd_RO, "vdu"):
245 cloud_init_file = None
246 if vdu.get("cloud-init-file"):
247 base_folder = vnfd["_admin"]["storage"]
248 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
249 vdu["cloud-init-file"])
250 with self.fs.file_open(cloud_init_file, "r") as ci_file:
251 cloud_init_content = ci_file.read()
252 vdu.pop("cloud-init-file", None)
253 elif vdu.get("cloud-init"):
254 cloud_init_content = vdu["cloud-init"]
255 else:
256 continue
257
258 env = Environment()
259 ast = env.parse(cloud_init_content)
260 mandatory_vars = meta.find_undeclared_variables(ast)
261 if mandatory_vars:
262 for var in mandatory_vars:
263 if not additionalParams or var not in additionalParams.keys():
264 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
265 "file, must be provided in the instantiation parameters inside the "
266 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
267 template = Template(cloud_init_content)
268 cloud_init_content = template.render(additionalParams or {})
269 vdu["cloud-init"] = cloud_init_content
270
271 return vnfd_RO
272 except FsException as e:
273 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
274 format(vnfd["id"], vdu["id"], cloud_init_file, e))
275 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
276 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
277 format(vnfd["id"], vdu["id"], e))
278
279 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
280 """
281 Creates a RO ns descriptor from OSM ns_instantiate params
282 :param ns_params: OSM instantiate params
283 :return: The RO ns descriptor
284 """
285 vim_2_RO = {}
286 wim_2_RO = {}
287 # TODO feature 1417: Check that no instantiation is set over PDU
288 # check if PDU forces a concrete vim-network-id and add it
289 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
290
291 def vim_account_2_RO(vim_account):
292 if vim_account in vim_2_RO:
293 return vim_2_RO[vim_account]
294
295 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
296 if db_vim["_admin"]["operationalState"] != "ENABLED":
297 raise LcmException("VIM={} is not available. operationalState={}".format(
298 vim_account, db_vim["_admin"]["operationalState"]))
299 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
300 vim_2_RO[vim_account] = RO_vim_id
301 return RO_vim_id
302
303 def wim_account_2_RO(wim_account):
304 if isinstance(wim_account, str):
305 if wim_account in wim_2_RO:
306 return wim_2_RO[wim_account]
307
308 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
309 if db_wim["_admin"]["operationalState"] != "ENABLED":
310 raise LcmException("WIM={} is not available. operationalState={}".format(
311 wim_account, db_wim["_admin"]["operationalState"]))
312 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
313 wim_2_RO[wim_account] = RO_wim_id
314 return RO_wim_id
315 else:
316 return wim_account
317
318 def ip_profile_2_RO(ip_profile):
319 RO_ip_profile = deepcopy((ip_profile))
320 if "dns-server" in RO_ip_profile:
321 if isinstance(RO_ip_profile["dns-server"], list):
322 RO_ip_profile["dns-address"] = []
323 for ds in RO_ip_profile.pop("dns-server"):
324 RO_ip_profile["dns-address"].append(ds['address'])
325 else:
326 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
327 if RO_ip_profile.get("ip-version") == "ipv4":
328 RO_ip_profile["ip-version"] = "IPv4"
329 if RO_ip_profile.get("ip-version") == "ipv6":
330 RO_ip_profile["ip-version"] = "IPv6"
331 if "dhcp-params" in RO_ip_profile:
332 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
333 return RO_ip_profile
334
335 if not ns_params:
336 return None
337 RO_ns_params = {
338 # "name": ns_params["nsName"],
339 # "description": ns_params.get("nsDescription"),
340 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
341 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
342 # "scenario": ns_params["nsdId"],
343 }
344
345 n2vc_key_list = n2vc_key_list or []
346 for vnfd_ref, vnfd in vnfd_dict.items():
347 vdu_needed_access = []
348 mgmt_cp = None
349 if vnfd.get("vnf-configuration"):
350 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
351 if ssh_required and vnfd.get("mgmt-interface"):
352 if vnfd["mgmt-interface"].get("vdu-id"):
353 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
354 elif vnfd["mgmt-interface"].get("cp"):
355 mgmt_cp = vnfd["mgmt-interface"]["cp"]
356
357 for vdu in vnfd.get("vdu", ()):
358 if vdu.get("vdu-configuration"):
359 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
360 if ssh_required:
361 vdu_needed_access.append(vdu["id"])
362 elif mgmt_cp:
363 for vdu_interface in vdu.get("interface"):
364 if vdu_interface.get("external-connection-point-ref") and \
365 vdu_interface["external-connection-point-ref"] == mgmt_cp:
366 vdu_needed_access.append(vdu["id"])
367 mgmt_cp = None
368 break
369
370 if vdu_needed_access:
371 for vnf_member in nsd.get("constituent-vnfd"):
372 if vnf_member["vnfd-id-ref"] != vnfd_ref:
373 continue
374 for vdu in vdu_needed_access:
375 populate_dict(RO_ns_params,
376 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
377 n2vc_key_list)
378
379 if ns_params.get("vduImage"):
380 RO_ns_params["vduImage"] = ns_params["vduImage"]
381
382 if ns_params.get("ssh_keys"):
383 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
384 for vnf_params in get_iterable(ns_params, "vnf"):
385 for constituent_vnfd in nsd["constituent-vnfd"]:
386 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
387 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
388 break
389 else:
390 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
391 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
392 if vnf_params.get("vimAccountId"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
394 vim_account_2_RO(vnf_params["vimAccountId"]))
395
396 for vdu_params in get_iterable(vnf_params, "vdu"):
397 # TODO feature 1417: check that this VDU exist and it is not a PDU
398 if vdu_params.get("volume"):
399 for volume_params in vdu_params["volume"]:
400 if volume_params.get("vim-volume-id"):
401 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
402 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
403 volume_params["vim-volume-id"])
404 if vdu_params.get("interface"):
405 for interface_params in vdu_params["interface"]:
406 if interface_params.get("ip-address"):
407 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
408 vdu_params["id"], "interfaces", interface_params["name"],
409 "ip_address"),
410 interface_params["ip-address"])
411 if interface_params.get("mac-address"):
412 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
413 vdu_params["id"], "interfaces", interface_params["name"],
414 "mac_address"),
415 interface_params["mac-address"])
416 if interface_params.get("floating-ip-required"):
417 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
418 vdu_params["id"], "interfaces", interface_params["name"],
419 "floating-ip"),
420 interface_params["floating-ip-required"])
421
422 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
423 if internal_vld_params.get("vim-network-name"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "vim-network-name"),
426 internal_vld_params["vim-network-name"])
427 if internal_vld_params.get("vim-network-id"):
428 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
429 internal_vld_params["name"], "vim-network-id"),
430 internal_vld_params["vim-network-id"])
431 if internal_vld_params.get("ip-profile"):
432 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
433 internal_vld_params["name"], "ip-profile"),
434 ip_profile_2_RO(internal_vld_params["ip-profile"]))
435 if internal_vld_params.get("provider-network"):
436
437 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
438 internal_vld_params["name"], "provider-network"),
439 internal_vld_params["provider-network"].copy())
440
441 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
442 # look for interface
443 iface_found = False
444 for vdu_descriptor in vnf_descriptor["vdu"]:
445 for vdu_interface in vdu_descriptor["interface"]:
446 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
447 if icp_params.get("ip-address"):
448 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
449 vdu_descriptor["id"], "interfaces",
450 vdu_interface["name"], "ip_address"),
451 icp_params["ip-address"])
452
453 if icp_params.get("mac-address"):
454 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
455 vdu_descriptor["id"], "interfaces",
456 vdu_interface["name"], "mac_address"),
457 icp_params["mac-address"])
458 iface_found = True
459 break
460 if iface_found:
461 break
462 else:
463 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
464 "internal-vld:id-ref={} is not present at vnfd:internal-"
465 "connection-point".format(vnf_params["member-vnf-index"],
466 icp_params["id-ref"]))
467
468 for vld_params in get_iterable(ns_params, "vld"):
469 if "ip-profile" in vld_params:
470 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
471 ip_profile_2_RO(vld_params["ip-profile"]))
472
473 if vld_params.get("provider-network"):
474
475 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
476 vld_params["provider-network"].copy())
477
478 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
479 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
480 wim_account_2_RO(vld_params["wimAccountId"])),
481 if vld_params.get("vim-network-name"):
482 RO_vld_sites = []
483 if isinstance(vld_params["vim-network-name"], dict):
484 for vim_account, vim_net in vld_params["vim-network-name"].items():
485 RO_vld_sites.append({
486 "netmap-use": vim_net,
487 "datacenter": vim_account_2_RO(vim_account)
488 })
489 else: # isinstance str
490 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
491 if RO_vld_sites:
492 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
493
494 if vld_params.get("vim-network-id"):
495 RO_vld_sites = []
496 if isinstance(vld_params["vim-network-id"], dict):
497 for vim_account, vim_net in vld_params["vim-network-id"].items():
498 RO_vld_sites.append({
499 "netmap-use": vim_net,
500 "datacenter": vim_account_2_RO(vim_account)
501 })
502 else: # isinstance str
503 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
504 if RO_vld_sites:
505 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
506 if vld_params.get("ns-net"):
507 if isinstance(vld_params["ns-net"], dict):
508 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
509 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
510 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
511 if "vnfd-connection-point-ref" in vld_params:
512 for cp_params in vld_params["vnfd-connection-point-ref"]:
513 # look for interface
514 for constituent_vnfd in nsd["constituent-vnfd"]:
515 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
516 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
517 break
518 else:
519 raise LcmException(
520 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
521 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
522 match_cp = False
523 for vdu_descriptor in vnf_descriptor["vdu"]:
524 for interface_descriptor in vdu_descriptor["interface"]:
525 if interface_descriptor.get("external-connection-point-ref") == \
526 cp_params["vnfd-connection-point-ref"]:
527 match_cp = True
528 break
529 if match_cp:
530 break
531 else:
532 raise LcmException(
533 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
534 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
535 cp_params["member-vnf-index-ref"],
536 cp_params["vnfd-connection-point-ref"],
537 vnf_descriptor["id"]))
538 if cp_params.get("ip-address"):
539 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
540 vdu_descriptor["id"], "interfaces",
541 interface_descriptor["name"], "ip_address"),
542 cp_params["ip-address"])
543 if cp_params.get("mac-address"):
544 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
545 vdu_descriptor["id"], "interfaces",
546 interface_descriptor["name"], "mac_address"),
547 cp_params["mac-address"])
548 return RO_ns_params
549
550 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
551 # make a copy to do not change
552 vdu_create = copy(vdu_create)
553 vdu_delete = copy(vdu_delete)
554
555 vdurs = db_vnfr.get("vdur")
556 if vdurs is None:
557 vdurs = []
558 vdu_index = len(vdurs)
559 while vdu_index:
560 vdu_index -= 1
561 vdur = vdurs[vdu_index]
562 if vdur.get("pdu-type"):
563 continue
564 vdu_id_ref = vdur["vdu-id-ref"]
565 if vdu_create and vdu_create.get(vdu_id_ref):
566 for index in range(0, vdu_create[vdu_id_ref]):
567 vdur = deepcopy(vdur)
568 vdur["_id"] = str(uuid4())
569 vdur["count-index"] += 1
570 vdurs.insert(vdu_index+1+index, vdur)
571 del vdu_create[vdu_id_ref]
572 if vdu_delete and vdu_delete.get(vdu_id_ref):
573 del vdurs[vdu_index]
574 vdu_delete[vdu_id_ref] -= 1
575 if not vdu_delete[vdu_id_ref]:
576 del vdu_delete[vdu_id_ref]
577 # check all operations are done
578 if vdu_create or vdu_delete:
579 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
580 vdu_create))
581 if vdu_delete:
582 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
583 vdu_delete))
584
585 vnfr_update = {"vdur": vdurs}
586 db_vnfr["vdur"] = vdurs
587 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
588
589 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
590 """
591 Updates database nsr with the RO info for the created vld
592 :param ns_update_nsr: dictionary to be filled with the updated info
593 :param db_nsr: content of db_nsr. This is also modified
594 :param nsr_desc_RO: nsr descriptor from RO
595 :return: Nothing, LcmException is raised on errors
596 """
597
598 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
599 for net_RO in get_iterable(nsr_desc_RO, "nets"):
600 if vld["id"] != net_RO.get("ns_net_osm_id"):
601 continue
602 vld["vim-id"] = net_RO.get("vim_net_id")
603 vld["name"] = net_RO.get("vim_name")
604 vld["status"] = net_RO.get("status")
605 vld["status-detailed"] = net_RO.get("error_msg")
606 ns_update_nsr["vld.{}".format(vld_index)] = vld
607 break
608 else:
609 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
610
611 def set_vnfr_at_error(self, db_vnfrs, error_text):
612 try:
613 for db_vnfr in db_vnfrs.values():
614 vnfr_update = {"status": "ERROR"}
615 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
616 if "status" not in vdur:
617 vdur["status"] = "ERROR"
618 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
619 if error_text:
620 vdur["status-detailed"] = str(error_text)
621 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
622 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
623 except DbException as e:
624 self.logger.error("Cannot update vnf. {}".format(e))
625
626 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
627 """
628 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
629 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
632 """
633 for vnf_index, db_vnfr in db_vnfrs.items():
634 for vnf_RO in nsr_desc_RO["vnfs"]:
635 if vnf_RO["member_vnf_index"] != vnf_index:
636 continue
637 vnfr_update = {}
638 if vnf_RO.get("ip_address"):
639 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
640 elif not db_vnfr.get("ip-address"):
641 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
642 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
643
644 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
645 vdur_RO_count_index = 0
646 if vdur.get("pdu-type"):
647 continue
648 for vdur_RO in get_iterable(vnf_RO, "vms"):
649 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
650 continue
651 if vdur["count-index"] != vdur_RO_count_index:
652 vdur_RO_count_index += 1
653 continue
654 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
655 if vdur_RO.get("ip_address"):
656 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
657 else:
658 vdur["ip-address"] = None
659 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
660 vdur["name"] = vdur_RO.get("vim_name")
661 vdur["status"] = vdur_RO.get("status")
662 vdur["status-detailed"] = vdur_RO.get("error_msg")
663 for ifacer in get_iterable(vdur, "interfaces"):
664 for interface_RO in get_iterable(vdur_RO, "interfaces"):
665 if ifacer["name"] == interface_RO.get("internal_name"):
666 ifacer["ip-address"] = interface_RO.get("ip_address")
667 ifacer["mac-address"] = interface_RO.get("mac_address")
668 break
669 else:
670 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
671 "from VIM info"
672 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
673 vnfr_update["vdur.{}".format(vdu_index)] = vdur
674 break
675 else:
676 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
677 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
678
679 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
680 for net_RO in get_iterable(nsr_desc_RO, "nets"):
681 if vld["id"] != net_RO.get("vnf_net_osm_id"):
682 continue
683 vld["vim-id"] = net_RO.get("vim_net_id")
684 vld["name"] = net_RO.get("vim_name")
685 vld["status"] = net_RO.get("status")
686 vld["status-detailed"] = net_RO.get("error_msg")
687 vnfr_update["vld.{}".format(vld_index)] = vld
688 break
689 else:
690 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
691 vnf_index, vld["id"]))
692
693 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
694 break
695
696 else:
697 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
698
699 def _get_ns_config_info(self, nsr_id):
700 """
701 Generates a mapping between vnf,vdu elements and the N2VC id
702 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
703 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
704 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
705 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
706 """
707 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
708 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
709 mapping = {}
710 ns_config_info = {"osm-config-mapping": mapping}
711 for vca in vca_deployed_list:
712 if not vca["member-vnf-index"]:
713 continue
714 if not vca["vdu_id"]:
715 mapping[vca["member-vnf-index"]] = vca["application"]
716 else:
717 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
718 vca["application"]
719 return ns_config_info
720
721 @staticmethod
722 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
723 """
724 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
725 primitives as verify-ssh-credentials, or config when needed
726 :param desc_primitive_list: information of the descriptor
727 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
728 this element contains a ssh public key
729 :return: The modified list. Can ba an empty list, but always a list
730 """
731 if desc_primitive_list:
732 primitive_list = desc_primitive_list.copy()
733 else:
734 primitive_list = []
735 # look for primitive config, and get the position. None if not present
736 config_position = None
737 for index, primitive in enumerate(primitive_list):
738 if primitive["name"] == "config":
739 config_position = index
740 break
741
742 # for NS, add always a config primitive if not present (bug 874)
743 if not vca_deployed["member-vnf-index"] and config_position is None:
744 primitive_list.insert(0, {"name": "config", "parameter": []})
745 config_position = 0
746 # for VNF/VDU add verify-ssh-credentials after config
747 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
748 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
749 return primitive_list
750
751 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
752 n2vc_key_list, stage):
753 try:
754 db_nsr_update = {}
755 RO_descriptor_number = 0 # number of descriptors created at RO
756 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
757 nslcmop_id = db_nslcmop["_id"]
758 start_deploy = time()
759 ns_params = db_nslcmop.get("operationParams")
760 if ns_params and ns_params.get("timeout_ns_deploy"):
761 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
762 else:
763 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
764
765 # Check for and optionally request placement optimization. Database will be updated if placement activated
766 stage[2] = "Waiting for Placement."
767 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
768
769 # deploy RO
770
771 # get vnfds, instantiate at RO
772 for c_vnf in nsd.get("constituent-vnfd", ()):
773 member_vnf_index = c_vnf["member-vnf-index"]
774 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
775 vnfd_ref = vnfd["id"]
776
777 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
778 db_nsr_update["detailed-status"] = " ".join(stage)
779 self.update_db_2("nsrs", nsr_id, db_nsr_update)
780 self._write_op_status(nslcmop_id, stage)
781
782 # self.logger.debug(logging_text + stage[2])
783 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
784 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
785 RO_descriptor_number += 1
786
787 # look position at deployed.RO.vnfd if not present it will be appended at the end
788 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
789 if vnf_deployed["member-vnf-index"] == member_vnf_index:
790 break
791 else:
792 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
793 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
794
795 # look if present
796 RO_update = {"member-vnf-index": member_vnf_index}
797 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
798 if vnfd_list:
799 RO_update["id"] = vnfd_list[0]["uuid"]
800 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
801 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
802 else:
803 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
804 get("additionalParamsForVnf"), nsr_id)
805 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
806 RO_update["id"] = desc["uuid"]
807 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
808 vnfd_ref, member_vnf_index, desc["uuid"]))
809 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
810 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
811
812 # create nsd at RO
813 nsd_ref = nsd["id"]
814
815 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
816 db_nsr_update["detailed-status"] = " ".join(stage)
817 self.update_db_2("nsrs", nsr_id, db_nsr_update)
818 self._write_op_status(nslcmop_id, stage)
819
820 # self.logger.debug(logging_text + stage[2])
821 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
822 RO_descriptor_number += 1
823 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
824 if nsd_list:
825 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
826 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
827 nsd_ref, RO_nsd_uuid))
828 else:
829 nsd_RO = deepcopy(nsd)
830 nsd_RO["id"] = RO_osm_nsd_id
831 nsd_RO.pop("_id", None)
832 nsd_RO.pop("_admin", None)
833 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
834 member_vnf_index = c_vnf["member-vnf-index"]
835 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
836 for c_vld in nsd_RO.get("vld", ()):
837 for cp in c_vld.get("vnfd-connection-point-ref", ()):
838 member_vnf_index = cp["member-vnf-index-ref"]
839 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
840
841 desc = await self.RO.create("nsd", descriptor=nsd_RO)
842 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
843 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
844 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
845 self.update_db_2("nsrs", nsr_id, db_nsr_update)
846
847 # Crate ns at RO
848 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
849 db_nsr_update["detailed-status"] = " ".join(stage)
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 self._write_op_status(nslcmop_id, stage)
852
853 # if present use it unless in error status
854 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
855 if RO_nsr_id:
856 try:
857 stage[2] = "Looking for existing ns at RO"
858 db_nsr_update["detailed-status"] = " ".join(stage)
859 self.update_db_2("nsrs", nsr_id, db_nsr_update)
860 self._write_op_status(nslcmop_id, stage)
861 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
862 desc = await self.RO.show("ns", RO_nsr_id)
863
864 except ROclient.ROClientException as e:
865 if e.http_code != HTTPStatus.NOT_FOUND:
866 raise
867 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
868 if RO_nsr_id:
869 ns_status, ns_status_info = self.RO.check_ns_status(desc)
870 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
871 if ns_status == "ERROR":
872 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
873 self.logger.debug(logging_text + stage[2])
874 await self.RO.delete("ns", RO_nsr_id)
875 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
876 if not RO_nsr_id:
877 stage[2] = "Checking dependencies"
878 db_nsr_update["detailed-status"] = " ".join(stage)
879 self.update_db_2("nsrs", nsr_id, db_nsr_update)
880 self._write_op_status(nslcmop_id, stage)
881 # self.logger.debug(logging_text + stage[2])
882
883 # check if VIM is creating and wait look if previous tasks in process
884 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
885 if task_dependency:
886 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
887 self.logger.debug(logging_text + stage[2])
888 await asyncio.wait(task_dependency, timeout=3600)
889 if ns_params.get("vnf"):
890 for vnf in ns_params["vnf"]:
891 if "vimAccountId" in vnf:
892 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
893 vnf["vimAccountId"])
894 if task_dependency:
895 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
896 self.logger.debug(logging_text + stage[2])
897 await asyncio.wait(task_dependency, timeout=3600)
898
899 stage[2] = "Checking instantiation parameters."
900 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
901 stage[2] = "Deploying ns at VIM."
902 db_nsr_update["detailed-status"] = " ".join(stage)
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 self._write_op_status(nslcmop_id, stage)
905
906 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
907 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
908 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
909 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
910 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
911
912 # wait until NS is ready
913 stage[2] = "Waiting VIM to deploy ns."
914 db_nsr_update["detailed-status"] = " ".join(stage)
915 self.update_db_2("nsrs", nsr_id, db_nsr_update)
916 self._write_op_status(nslcmop_id, stage)
917 detailed_status_old = None
918 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
919
920 old_desc = None
921 while time() <= start_deploy + timeout_ns_deploy:
922 desc = await self.RO.show("ns", RO_nsr_id)
923
924 # deploymentStatus
925 if desc != old_desc:
926 # desc has changed => update db
927 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
928 old_desc = desc
929
930 ns_status, ns_status_info = self.RO.check_ns_status(desc)
931 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
932 if ns_status == "ERROR":
933 raise ROclient.ROClientException(ns_status_info)
934 elif ns_status == "BUILD":
935 stage[2] = "VIM: ({})".format(ns_status_info)
936 elif ns_status == "ACTIVE":
937 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
938 try:
939 self.ns_update_vnfr(db_vnfrs, desc)
940 break
941 except LcmExceptionNoMgmtIP:
942 pass
943 else:
944 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
945 if stage[2] != detailed_status_old:
946 detailed_status_old = stage[2]
947 db_nsr_update["detailed-status"] = " ".join(stage)
948 self.update_db_2("nsrs", nsr_id, db_nsr_update)
949 self._write_op_status(nslcmop_id, stage)
950 await asyncio.sleep(5, loop=self.loop)
951 else: # timeout_ns_deploy
952 raise ROclient.ROClientException("Timeout waiting ns to be ready")
953
954 # Updating NSR
955 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
956
957 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
958 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
959 stage[2] = "Deployed at VIM"
960 db_nsr_update["detailed-status"] = " ".join(stage)
961 self.update_db_2("nsrs", nsr_id, db_nsr_update)
962 self._write_op_status(nslcmop_id, stage)
963 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
964 # self.logger.debug(logging_text + "Deployed at VIM")
965 except (ROclient.ROClientException, LcmException, DbException) as e:
966 stage[2] = "ERROR deployig at VIM"
967 self.set_vnfr_at_error(db_vnfrs, str(e))
968 raise
969
970 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
971 """
972 Wait for ip addres at RO, and optionally, insert public key in virtual machine
973 :param logging_text: prefix use for logging
974 :param nsr_id:
975 :param vnfr_id:
976 :param vdu_id:
977 :param vdu_index:
978 :param pub_key: public ssh key to inject, None to skip
979 :param user: user to apply the public ssh key
980 :return: IP address
981 """
982
983 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
984 ro_nsr_id = None
985 ip_address = None
986 nb_tries = 0
987 target_vdu_id = None
988 ro_retries = 0
989
990 while True:
991
992 ro_retries += 1
993 if ro_retries >= 360: # 1 hour
994 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
995
996 await asyncio.sleep(10, loop=self.loop)
997
998 # get ip address
999 if not target_vdu_id:
1000 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1001
1002 if not vdu_id: # for the VNF case
1003 if db_vnfr.get("status") == "ERROR":
1004 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1005 ip_address = db_vnfr.get("ip-address")
1006 if not ip_address:
1007 continue
1008 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1009 else: # VDU case
1010 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1011 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1012
1013 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1014 vdur = db_vnfr["vdur"][0]
1015 if not vdur:
1016 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1017 vdu_index))
1018
1019 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1020 ip_address = vdur.get("ip-address")
1021 if not ip_address:
1022 continue
1023 target_vdu_id = vdur["vdu-id-ref"]
1024 elif vdur.get("status") == "ERROR":
1025 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1026
1027 if not target_vdu_id:
1028 continue
1029
1030 # inject public key into machine
1031 if pub_key and user:
1032 # wait until NS is deployed at RO
1033 if not ro_nsr_id:
1034 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1035 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1036 if not ro_nsr_id:
1037 continue
1038
1039 # self.logger.debug(logging_text + "Inserting RO key")
1040 if vdur.get("pdu-type"):
1041 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1042 return ip_address
1043 try:
1044 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1045 result_dict = await self.RO.create_action(
1046 item="ns",
1047 item_id_name=ro_nsr_id,
1048 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1049 )
1050 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1051 if not result_dict or not isinstance(result_dict, dict):
1052 raise LcmException("Unknown response from RO when injecting key")
1053 for result in result_dict.values():
1054 if result.get("vim_result") == 200:
1055 break
1056 else:
1057 raise ROclient.ROClientException("error injecting key: {}".format(
1058 result.get("description")))
1059 break
1060 except ROclient.ROClientException as e:
1061 if not nb_tries:
1062 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1063 format(e, 20*10))
1064 nb_tries += 1
1065 if nb_tries >= 20:
1066 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1067 else:
1068 break
1069
1070 return ip_address
1071
1072 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1073 """
1074 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1075 """
1076 my_vca = vca_deployed_list[vca_index]
1077 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1078 # vdu or kdu: no dependencies
1079 return
1080 timeout = 300
1081 while timeout >= 0:
1082 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1083 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1084 configuration_status_list = db_nsr["configurationStatus"]
1085 for index, vca_deployed in enumerate(configuration_status_list):
1086 if index == vca_index:
1087 # myself
1088 continue
1089 if not my_vca.get("member-vnf-index") or \
1090 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1091 internal_status = configuration_status_list[index].get("status")
1092 if internal_status == 'READY':
1093 continue
1094 elif internal_status == 'BROKEN':
1095 raise LcmException("Configuration aborted because dependent charm/s has failed")
1096 else:
1097 break
1098 else:
1099 # no dependencies, return
1100 return
1101 await asyncio.sleep(10)
1102 timeout -= 1
1103
1104 raise LcmException("Configuration aborted because dependent charm/s timeout")
1105
1106 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1107 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1108 nsr_id = db_nsr["_id"]
1109 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1110 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1111 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1112 db_dict = {
1113 'collection': 'nsrs',
1114 'filter': {'_id': nsr_id},
1115 'path': db_update_entry
1116 }
1117 step = ""
1118 try:
1119
1120 element_type = 'NS'
1121 element_under_configuration = nsr_id
1122
1123 vnfr_id = None
1124 if db_vnfr:
1125 vnfr_id = db_vnfr["_id"]
1126
1127 namespace = "{nsi}.{ns}".format(
1128 nsi=nsi_id if nsi_id else "",
1129 ns=nsr_id)
1130
1131 if vnfr_id:
1132 element_type = 'VNF'
1133 element_under_configuration = vnfr_id
1134 namespace += ".{}".format(vnfr_id)
1135 if vdu_id:
1136 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1137 element_type = 'VDU'
1138 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1139
1140 # Get artifact path
1141 self.fs.sync() # Sync from FSMongo
1142 artifact_path = "{}/{}/charms/{}".format(
1143 base_folder["folder"],
1144 base_folder["pkg-dir"],
1145 config_descriptor["juju"]["charm"]
1146 )
1147
1148 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1149 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1150 is_proxy_charm = False
1151
1152 # n2vc_redesign STEP 3.1
1153
1154 # find old ee_id if exists
1155 ee_id = vca_deployed.get("ee_id")
1156
1157 # create or register execution environment in VCA
1158 if is_proxy_charm:
1159
1160 self._write_configuration_status(
1161 nsr_id=nsr_id,
1162 vca_index=vca_index,
1163 status='CREATING',
1164 element_under_configuration=element_under_configuration,
1165 element_type=element_type
1166 )
1167
1168 step = "create execution environment"
1169 self.logger.debug(logging_text + step)
1170 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1171 reuse_ee_id=ee_id,
1172 db_dict=db_dict)
1173
1174 else:
1175 step = "Waiting to VM being up and getting IP address"
1176 self.logger.debug(logging_text + step)
1177 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1178 user=None, pub_key=None)
1179 credentials = {"hostname": rw_mgmt_ip}
1180 # get username
1181 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1182 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1183 # merged. Meanwhile let's get username from initial-config-primitive
1184 if not username and config_descriptor.get("initial-config-primitive"):
1185 for config_primitive in config_descriptor["initial-config-primitive"]:
1186 for param in config_primitive.get("parameter", ()):
1187 if param["name"] == "ssh-username":
1188 username = param["value"]
1189 break
1190 if not username:
1191 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1192 "'config-access.ssh-access.default-user'")
1193 credentials["username"] = username
1194 # n2vc_redesign STEP 3.2
1195
1196 self._write_configuration_status(
1197 nsr_id=nsr_id,
1198 vca_index=vca_index,
1199 status='REGISTERING',
1200 element_under_configuration=element_under_configuration,
1201 element_type=element_type
1202 )
1203
1204 step = "register execution environment {}".format(credentials)
1205 self.logger.debug(logging_text + step)
1206 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1207 db_dict=db_dict)
1208
1209 # for compatibility with MON/POL modules, the need model and application name at database
1210 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1211 ee_id_parts = ee_id.split('.')
1212 model_name = ee_id_parts[0]
1213 application_name = ee_id_parts[1]
1214 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1215 db_update_entry + "application": application_name,
1216 db_update_entry + "ee_id": ee_id})
1217
1218 # n2vc_redesign STEP 3.3
1219
1220 step = "Install configuration Software"
1221
1222 self._write_configuration_status(
1223 nsr_id=nsr_id,
1224 vca_index=vca_index,
1225 status='INSTALLING SW',
1226 element_under_configuration=element_under_configuration,
1227 element_type=element_type
1228 )
1229
1230 # TODO check if already done
1231 self.logger.debug(logging_text + step)
1232 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1233
1234 # write in db flag of configuration_sw already installed
1235 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1236
1237 # add relations for this VCA (wait for other peers related with this VCA)
1238 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1239
1240 # if SSH access is required, then get execution environment SSH public
1241 if is_proxy_charm: # if native charm we have waited already to VM be UP
1242 pub_key = None
1243 user = None
1244 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1245 # Needed to inject a ssh key
1246 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1247 step = "Install configuration Software, getting public ssh key"
1248 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1249
1250 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1251 else:
1252 step = "Waiting to VM being up and getting IP address"
1253 self.logger.debug(logging_text + step)
1254
1255 # n2vc_redesign STEP 5.1
1256 # wait for RO (ip-address) Insert pub_key into VM
1257 if vnfr_id:
1258 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1259 user=user, pub_key=pub_key)
1260 else:
1261 rw_mgmt_ip = None # This is for a NS configuration
1262
1263 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1264
1265 # store rw_mgmt_ip in deploy params for later replacement
1266 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1267
1268 # n2vc_redesign STEP 6 Execute initial config primitive
1269 step = 'execute initial config primitive'
1270 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1271
1272 # sort initial config primitives by 'seq'
1273 if initial_config_primitive_list:
1274 try:
1275 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1276 except Exception as e:
1277 self.logger.error(logging_text + step + ": " + str(e))
1278 else:
1279 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1280
1281 # add config if not present for NS charm
1282 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1283 vca_deployed)
1284
1285 # wait for dependent primitives execution (NS -> VNF -> VDU)
1286 if initial_config_primitive_list:
1287 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1288
1289 # stage, in function of element type: vdu, kdu, vnf or ns
1290 my_vca = vca_deployed_list[vca_index]
1291 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1292 # VDU or KDU
1293 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1294 elif my_vca.get("member-vnf-index"):
1295 # VNF
1296 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1297 else:
1298 # NS
1299 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1300
1301 self._write_configuration_status(
1302 nsr_id=nsr_id,
1303 vca_index=vca_index,
1304 status='EXECUTING PRIMITIVE'
1305 )
1306
1307 self._write_op_status(
1308 op_id=nslcmop_id,
1309 stage=stage
1310 )
1311
1312 check_if_terminated_needed = True
1313 for initial_config_primitive in initial_config_primitive_list:
1314 # adding information on the vca_deployed if it is a NS execution environment
1315 if not vca_deployed["member-vnf-index"]:
1316 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1317 # TODO check if already done
1318 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1319
1320 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1321 self.logger.debug(logging_text + step)
1322 await self.n2vc.exec_primitive(
1323 ee_id=ee_id,
1324 primitive_name=initial_config_primitive["name"],
1325 params_dict=primitive_params_,
1326 db_dict=db_dict
1327 )
1328 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1329 if check_if_terminated_needed:
1330 if config_descriptor.get('terminate-config-primitive'):
1331 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1332 check_if_terminated_needed = False
1333
1334 # TODO register in database that primitive is done
1335
1336 step = "instantiated at VCA"
1337 self.logger.debug(logging_text + step)
1338
1339 self._write_configuration_status(
1340 nsr_id=nsr_id,
1341 vca_index=vca_index,
1342 status='READY'
1343 )
1344
1345 except Exception as e: # TODO not use Exception but N2VC exception
1346 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1347 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1348 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1349 self._write_configuration_status(
1350 nsr_id=nsr_id,
1351 vca_index=vca_index,
1352 status='BROKEN'
1353 )
1354 raise LcmException("{} {}".format(step, e)) from e
1355
1356 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1357 error_description: str = None, error_detail: str = None, other_update: dict = None):
1358 """
1359 Update db_nsr fields.
1360 :param nsr_id:
1361 :param ns_state:
1362 :param current_operation:
1363 :param current_operation_id:
1364 :param error_description:
1365 :param error_detail:
1366 :param other_update: Other required changes at database if provided, will be cleared
1367 :return:
1368 """
1369 try:
1370 db_dict = other_update or {}
1371 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1372 db_dict["_admin.current-operation"] = current_operation_id
1373 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1374 db_dict["currentOperation"] = current_operation
1375 db_dict["currentOperationID"] = current_operation_id
1376 db_dict["errorDescription"] = error_description
1377 db_dict["errorDetail"] = error_detail
1378
1379 if ns_state:
1380 db_dict["nsState"] = ns_state
1381 self.update_db_2("nsrs", nsr_id, db_dict)
1382 except DbException as e:
1383 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1384
1385 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1386 operation_state: str = None, other_update: dict = None):
1387 try:
1388 db_dict = other_update or {}
1389 db_dict['queuePosition'] = queuePosition
1390 if isinstance(stage, list):
1391 db_dict['stage'] = stage[0]
1392 db_dict['detailed-status'] = " ".join(stage)
1393 elif stage is not None:
1394 db_dict['stage'] = str(stage)
1395
1396 if error_message is not None:
1397 db_dict['errorMessage'] = error_message
1398 if operation_state is not None:
1399 db_dict['operationState'] = operation_state
1400 db_dict["statusEnteredTime"] = time()
1401 self.update_db_2("nslcmops", op_id, db_dict)
1402 except DbException as e:
1403 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1404
1405 def _write_all_config_status(self, nsr_id: str, status: str):
1406 try:
1407 # nsrs record
1408 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1409 # configurationStatus
1410 config_status = db_nsr.get('configurationStatus')
1411 if config_status:
1412 # update status
1413 db_dict = dict()
1414 db_dict['configurationStatus'] = list()
1415 for c in config_status:
1416 c['status'] = status
1417 db_dict['configurationStatus'].append(c)
1418 self.update_db_2("nsrs", nsr_id, db_dict)
1419
1420 except DbException as e:
1421 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1422
1423 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1424 element_under_configuration: str = None, element_type: str = None):
1425
1426 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1427 # .format(vca_index, status))
1428
1429 try:
1430 db_path = 'configurationStatus.{}.'.format(vca_index)
1431 db_dict = dict()
1432 if status:
1433 db_dict[db_path + 'status'] = status
1434 if element_under_configuration:
1435 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1436 if element_type:
1437 db_dict[db_path + 'elementType'] = element_type
1438 self.update_db_2("nsrs", nsr_id, db_dict)
1439 except DbException as e:
1440 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1441 .format(status, nsr_id, vca_index, e))
1442
1443 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1444 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1445 if placement_engine == "PLA":
1446 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1447 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1448 db_poll_interval = 5
1449 wait = db_poll_interval * 4
1450 pla_result = None
1451 while not pla_result and wait >= 0:
1452 await asyncio.sleep(db_poll_interval)
1453 wait -= db_poll_interval
1454 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1455 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1456
1457 if not pla_result:
1458 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1459
1460 for pla_vnf in pla_result['vnf']:
1461 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1462 if not pla_vnf.get('vimAccountId') or not vnfr:
1463 continue
1464 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1465 return
1466
1467 def update_nsrs_with_pla_result(self, params):
1468 try:
1469 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1470 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1471 except Exception as e:
1472 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1473
1474 async def instantiate(self, nsr_id, nslcmop_id):
1475 """
1476
1477 :param nsr_id: ns instance to deploy
1478 :param nslcmop_id: operation to run
1479 :return:
1480 """
1481
1482 # Try to lock HA task here
1483 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1484 if not task_is_locked_by_me:
1485 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1486 return
1487
1488 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1489 self.logger.debug(logging_text + "Enter")
1490
1491 # get all needed from database
1492
1493 # database nsrs record
1494 db_nsr = None
1495
1496 # database nslcmops record
1497 db_nslcmop = None
1498
1499 # update operation on nsrs
1500 db_nsr_update = {}
1501 # update operation on nslcmops
1502 db_nslcmop_update = {}
1503
1504 nslcmop_operation_state = None
1505 db_vnfrs = {} # vnf's info indexed by member-index
1506 # n2vc_info = {}
1507 tasks_dict_info = {} # from task to info text
1508 exc = None
1509 error_list = []
1510 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1511 # ^ stage, step, VIM progress
1512 try:
1513 # wait for any previous tasks in process
1514 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1515
1516 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1517 stage[1] = "Reading from database,"
1518 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1519 db_nsr_update["detailed-status"] = "creating"
1520 db_nsr_update["operational-status"] = "init"
1521 self._write_ns_status(
1522 nsr_id=nsr_id,
1523 ns_state="BUILDING",
1524 current_operation="INSTANTIATING",
1525 current_operation_id=nslcmop_id,
1526 other_update=db_nsr_update
1527 )
1528 self._write_op_status(
1529 op_id=nslcmop_id,
1530 stage=stage,
1531 queuePosition=0
1532 )
1533
1534 # read from db: operation
1535 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1536 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1537 ns_params = db_nslcmop.get("operationParams")
1538 if ns_params and ns_params.get("timeout_ns_deploy"):
1539 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1540 else:
1541 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1542
1543 # read from db: ns
1544 stage[1] = "Getting nsr={} from db".format(nsr_id)
1545 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1546 # nsd is replicated into ns (no db read)
1547 nsd = db_nsr["nsd"]
1548 # nsr_name = db_nsr["name"] # TODO short-name??
1549
1550 # read from db: vnf's of this ns
1551 stage[1] = "Getting vnfrs from db"
1552 self.logger.debug(logging_text + stage[1])
1553 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1554
1555 # read from db: vnfd's for every vnf
1556 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1557 db_vnfds = {} # every vnfd data indexed by vnf id
1558 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1559
1560 # for each vnf in ns, read vnfd
1561 for vnfr in db_vnfrs_list:
1562 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1563 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1564 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1565 # if we haven't this vnfd, read it from db
1566 if vnfd_id not in db_vnfds:
1567 # read from db
1568 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1569 self.logger.debug(logging_text + stage[1])
1570 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1571
1572 # store vnfd
1573 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1574 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1575 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1576
1577 # Get or generates the _admin.deployed.VCA list
1578 vca_deployed_list = None
1579 if db_nsr["_admin"].get("deployed"):
1580 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1581 if vca_deployed_list is None:
1582 vca_deployed_list = []
1583 configuration_status_list = []
1584 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1585 db_nsr_update["configurationStatus"] = configuration_status_list
1586 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1587 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1588 elif isinstance(vca_deployed_list, dict):
1589 # maintain backward compatibility. Change a dict to list at database
1590 vca_deployed_list = list(vca_deployed_list.values())
1591 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1592 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1593
1594 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1595 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1596 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1597
1598 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1599 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1600 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1601
1602 # n2vc_redesign STEP 2 Deploy Network Scenario
1603 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1604 self._write_op_status(
1605 op_id=nslcmop_id,
1606 stage=stage
1607 )
1608
1609 stage[1] = "Deploying KDUs,"
1610 # self.logger.debug(logging_text + "Before deploy_kdus")
1611 # Call to deploy_kdus in case exists the "vdu:kdu" param
1612 await self.deploy_kdus(
1613 logging_text=logging_text,
1614 nsr_id=nsr_id,
1615 nslcmop_id=nslcmop_id,
1616 db_vnfrs=db_vnfrs,
1617 db_vnfds=db_vnfds,
1618 task_instantiation_info=tasks_dict_info,
1619 )
1620
1621 stage[1] = "Getting VCA public key."
1622 # n2vc_redesign STEP 1 Get VCA public ssh-key
1623 # feature 1429. Add n2vc public key to needed VMs
1624 n2vc_key = self.n2vc.get_public_key()
1625 n2vc_key_list = [n2vc_key]
1626 if self.vca_config.get("public_key"):
1627 n2vc_key_list.append(self.vca_config["public_key"])
1628
1629 stage[1] = "Deploying NS at VIM."
1630 task_ro = asyncio.ensure_future(
1631 self.instantiate_RO(
1632 logging_text=logging_text,
1633 nsr_id=nsr_id,
1634 nsd=nsd,
1635 db_nsr=db_nsr,
1636 db_nslcmop=db_nslcmop,
1637 db_vnfrs=db_vnfrs,
1638 db_vnfds_ref=db_vnfds_ref,
1639 n2vc_key_list=n2vc_key_list,
1640 stage=stage
1641 )
1642 )
1643 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1644 tasks_dict_info[task_ro] = "Deploying at VIM"
1645
1646 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1647 stage[1] = "Deploying Execution Environments."
1648 self.logger.debug(logging_text + stage[1])
1649
1650 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1651 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1652 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1653 vnfd_id = c_vnf["vnfd-id-ref"]
1654 vnfd = db_vnfds_ref[vnfd_id]
1655 member_vnf_index = str(c_vnf["member-vnf-index"])
1656 db_vnfr = db_vnfrs[member_vnf_index]
1657 base_folder = vnfd["_admin"]["storage"]
1658 vdu_id = None
1659 vdu_index = 0
1660 vdu_name = None
1661 kdu_name = None
1662
1663 # Get additional parameters
1664 deploy_params = {}
1665 if db_vnfr.get("additionalParamsForVnf"):
1666 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1667
1668 descriptor_config = vnfd.get("vnf-configuration")
1669 if descriptor_config and descriptor_config.get("juju"):
1670 self._deploy_n2vc(
1671 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1672 db_nsr=db_nsr,
1673 db_vnfr=db_vnfr,
1674 nslcmop_id=nslcmop_id,
1675 nsr_id=nsr_id,
1676 nsi_id=nsi_id,
1677 vnfd_id=vnfd_id,
1678 vdu_id=vdu_id,
1679 kdu_name=kdu_name,
1680 member_vnf_index=member_vnf_index,
1681 vdu_index=vdu_index,
1682 vdu_name=vdu_name,
1683 deploy_params=deploy_params,
1684 descriptor_config=descriptor_config,
1685 base_folder=base_folder,
1686 task_instantiation_info=tasks_dict_info,
1687 stage=stage
1688 )
1689
1690 # Deploy charms for each VDU that supports one.
1691 for vdud in get_iterable(vnfd, 'vdu'):
1692 vdu_id = vdud["id"]
1693 descriptor_config = vdud.get('vdu-configuration')
1694 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1695 if vdur.get("additionalParams"):
1696 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1697 else:
1698 deploy_params_vdu = deploy_params
1699 if descriptor_config and descriptor_config.get("juju"):
1700 # look for vdu index in the db_vnfr["vdu"] section
1701 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1702 # if vdur["vdu-id-ref"] == vdu_id:
1703 # break
1704 # else:
1705 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1706 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1707 # vdu_name = vdur.get("name")
1708 vdu_name = None
1709 kdu_name = None
1710 for vdu_index in range(int(vdud.get("count", 1))):
1711 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1712 self._deploy_n2vc(
1713 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1714 member_vnf_index, vdu_id, vdu_index),
1715 db_nsr=db_nsr,
1716 db_vnfr=db_vnfr,
1717 nslcmop_id=nslcmop_id,
1718 nsr_id=nsr_id,
1719 nsi_id=nsi_id,
1720 vnfd_id=vnfd_id,
1721 vdu_id=vdu_id,
1722 kdu_name=kdu_name,
1723 member_vnf_index=member_vnf_index,
1724 vdu_index=vdu_index,
1725 vdu_name=vdu_name,
1726 deploy_params=deploy_params_vdu,
1727 descriptor_config=descriptor_config,
1728 base_folder=base_folder,
1729 task_instantiation_info=tasks_dict_info
1730 )
1731 for kdud in get_iterable(vnfd, 'kdu'):
1732 kdu_name = kdud["name"]
1733 descriptor_config = kdud.get('kdu-configuration')
1734 if descriptor_config and descriptor_config.get("juju"):
1735 vdu_id = None
1736 vdu_index = 0
1737 vdu_name = None
1738 # look for vdu index in the db_vnfr["vdu"] section
1739 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1740 # if vdur["vdu-id-ref"] == vdu_id:
1741 # break
1742 # else:
1743 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1744 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1745 # vdu_name = vdur.get("name")
1746 # vdu_name = None
1747
1748 self._deploy_n2vc(
1749 logging_text=logging_text,
1750 db_nsr=db_nsr,
1751 db_vnfr=db_vnfr,
1752 nslcmop_id=nslcmop_id,
1753 nsr_id=nsr_id,
1754 nsi_id=nsi_id,
1755 vnfd_id=vnfd_id,
1756 vdu_id=vdu_id,
1757 kdu_name=kdu_name,
1758 member_vnf_index=member_vnf_index,
1759 vdu_index=vdu_index,
1760 vdu_name=vdu_name,
1761 deploy_params=deploy_params,
1762 descriptor_config=descriptor_config,
1763 base_folder=base_folder,
1764 task_instantiation_info=tasks_dict_info
1765 )
1766
1767 # Check if this NS has a charm configuration
1768 descriptor_config = nsd.get("ns-configuration")
1769 if descriptor_config and descriptor_config.get("juju"):
1770 vnfd_id = None
1771 db_vnfr = None
1772 member_vnf_index = None
1773 vdu_id = None
1774 kdu_name = None
1775 vdu_index = 0
1776 vdu_name = None
1777
1778 # Get additional parameters
1779 deploy_params = {}
1780 if db_nsr.get("additionalParamsForNs"):
1781 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1782 base_folder = nsd["_admin"]["storage"]
1783 self._deploy_n2vc(
1784 logging_text=logging_text,
1785 db_nsr=db_nsr,
1786 db_vnfr=db_vnfr,
1787 nslcmop_id=nslcmop_id,
1788 nsr_id=nsr_id,
1789 nsi_id=nsi_id,
1790 vnfd_id=vnfd_id,
1791 vdu_id=vdu_id,
1792 kdu_name=kdu_name,
1793 member_vnf_index=member_vnf_index,
1794 vdu_index=vdu_index,
1795 vdu_name=vdu_name,
1796 deploy_params=deploy_params,
1797 descriptor_config=descriptor_config,
1798 base_folder=base_folder,
1799 task_instantiation_info=tasks_dict_info
1800 )
1801
1802 # rest of staff will be done at finally
1803
1804 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1805 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1806 exc = e
1807 except asyncio.CancelledError:
1808 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1809 exc = "Operation was cancelled"
1810 except Exception as e:
1811 exc = traceback.format_exc()
1812 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1813 finally:
1814 if exc:
1815 error_list.append(str(exc))
1816 try:
1817 # wait for pending tasks
1818 if tasks_dict_info:
1819 stage[1] = "Waiting for instantiate pending tasks."
1820 self.logger.debug(logging_text + stage[1])
1821 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1822 stage, nslcmop_id, nsr_id=nsr_id)
1823 stage[1] = stage[2] = ""
1824 except asyncio.CancelledError:
1825 error_list.append("Cancelled")
1826 # TODO cancel all tasks
1827 except Exception as exc:
1828 error_list.append(str(exc))
1829
1830 # update operation-status
1831 db_nsr_update["operational-status"] = "running"
1832 # let's begin with VCA 'configured' status (later we can change it)
1833 db_nsr_update["config-status"] = "configured"
1834 for task, task_name in tasks_dict_info.items():
1835 if not task.done() or task.cancelled() or task.exception():
1836 if task_name.startswith(self.task_name_deploy_vca):
1837 # A N2VC task is pending
1838 db_nsr_update["config-status"] = "failed"
1839 else:
1840 # RO or KDU task is pending
1841 db_nsr_update["operational-status"] = "failed"
1842
1843 # update status at database
1844 if error_list:
1845 error_detail = ". ".join(error_list)
1846 self.logger.error(logging_text + error_detail)
1847 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1848 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1849
1850 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1851 db_nslcmop_update["detailed-status"] = error_detail
1852 nslcmop_operation_state = "FAILED"
1853 ns_state = "BROKEN"
1854 else:
1855 error_detail = None
1856 error_description_nsr = error_description_nslcmop = None
1857 ns_state = "READY"
1858 db_nsr_update["detailed-status"] = "Done"
1859 db_nslcmop_update["detailed-status"] = "Done"
1860 nslcmop_operation_state = "COMPLETED"
1861
1862 if db_nsr:
1863 self._write_ns_status(
1864 nsr_id=nsr_id,
1865 ns_state=ns_state,
1866 current_operation="IDLE",
1867 current_operation_id=None,
1868 error_description=error_description_nsr,
1869 error_detail=error_detail,
1870 other_update=db_nsr_update
1871 )
1872 if db_nslcmop:
1873 self._write_op_status(
1874 op_id=nslcmop_id,
1875 stage="",
1876 error_message=error_description_nslcmop,
1877 operation_state=nslcmop_operation_state,
1878 other_update=db_nslcmop_update,
1879 )
1880
1881 if nslcmop_operation_state:
1882 try:
1883 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1884 "operationState": nslcmop_operation_state},
1885 loop=self.loop)
1886 except Exception as e:
1887 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1888
1889 self.logger.debug(logging_text + "Exit")
1890 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1891
1892 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1893
1894 # steps:
1895 # 1. find all relations for this VCA
1896 # 2. wait for other peers related
1897 # 3. add relations
1898
1899 try:
1900
1901 # STEP 1: find all relations for this VCA
1902
1903 # read nsr record
1904 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1905
1906 # this VCA data
1907 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1908
1909 # read all ns-configuration relations
1910 ns_relations = list()
1911 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1912 if db_ns_relations:
1913 for r in db_ns_relations:
1914 # check if this VCA is in the relation
1915 if my_vca.get('member-vnf-index') in\
1916 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1917 ns_relations.append(r)
1918
1919 # read all vnf-configuration relations
1920 vnf_relations = list()
1921 db_vnfd_list = db_nsr.get('vnfd-id')
1922 if db_vnfd_list:
1923 for vnfd in db_vnfd_list:
1924 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1925 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1926 if db_vnf_relations:
1927 for r in db_vnf_relations:
1928 # check if this VCA is in the relation
1929 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1930 vnf_relations.append(r)
1931
1932 # if no relations, terminate
1933 if not ns_relations and not vnf_relations:
1934 self.logger.debug(logging_text + ' No relations')
1935 return True
1936
1937 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1938
1939 # add all relations
1940 start = time()
1941 while True:
1942 # check timeout
1943 now = time()
1944 if now - start >= timeout:
1945 self.logger.error(logging_text + ' : timeout adding relations')
1946 return False
1947
1948 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1949 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1950
1951 # for each defined NS relation, find the VCA's related
1952 for r in ns_relations:
1953 from_vca_ee_id = None
1954 to_vca_ee_id = None
1955 from_vca_endpoint = None
1956 to_vca_endpoint = None
1957 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1958 for vca in vca_list:
1959 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1960 and vca.get('config_sw_installed'):
1961 from_vca_ee_id = vca.get('ee_id')
1962 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1963 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1964 and vca.get('config_sw_installed'):
1965 to_vca_ee_id = vca.get('ee_id')
1966 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1967 if from_vca_ee_id and to_vca_ee_id:
1968 # add relation
1969 await self.n2vc.add_relation(
1970 ee_id_1=from_vca_ee_id,
1971 ee_id_2=to_vca_ee_id,
1972 endpoint_1=from_vca_endpoint,
1973 endpoint_2=to_vca_endpoint)
1974 # remove entry from relations list
1975 ns_relations.remove(r)
1976 else:
1977 # check failed peers
1978 try:
1979 vca_status_list = db_nsr.get('configurationStatus')
1980 if vca_status_list:
1981 for i in range(len(vca_list)):
1982 vca = vca_list[i]
1983 vca_status = vca_status_list[i]
1984 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1985 if vca_status.get('status') == 'BROKEN':
1986 # peer broken: remove relation from list
1987 ns_relations.remove(r)
1988 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1989 if vca_status.get('status') == 'BROKEN':
1990 # peer broken: remove relation from list
1991 ns_relations.remove(r)
1992 except Exception:
1993 # ignore
1994 pass
1995
1996 # for each defined VNF relation, find the VCA's related
1997 for r in vnf_relations:
1998 from_vca_ee_id = None
1999 to_vca_ee_id = None
2000 from_vca_endpoint = None
2001 to_vca_endpoint = None
2002 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2003 for vca in vca_list:
2004 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2005 from_vca_ee_id = vca.get('ee_id')
2006 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2007 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2008 to_vca_ee_id = vca.get('ee_id')
2009 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2010 if from_vca_ee_id and to_vca_ee_id:
2011 # add relation
2012 await self.n2vc.add_relation(
2013 ee_id_1=from_vca_ee_id,
2014 ee_id_2=to_vca_ee_id,
2015 endpoint_1=from_vca_endpoint,
2016 endpoint_2=to_vca_endpoint)
2017 # remove entry from relations list
2018 vnf_relations.remove(r)
2019 else:
2020 # check failed peers
2021 try:
2022 vca_status_list = db_nsr.get('configurationStatus')
2023 if vca_status_list:
2024 for i in range(len(vca_list)):
2025 vca = vca_list[i]
2026 vca_status = vca_status_list[i]
2027 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2028 if vca_status.get('status') == 'BROKEN':
2029 # peer broken: remove relation from list
2030 ns_relations.remove(r)
2031 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2032 if vca_status.get('status') == 'BROKEN':
2033 # peer broken: remove relation from list
2034 ns_relations.remove(r)
2035 except Exception:
2036 # ignore
2037 pass
2038
2039 # wait for next try
2040 await asyncio.sleep(5.0)
2041
2042 if not ns_relations and not vnf_relations:
2043 self.logger.debug('Relations added')
2044 break
2045
2046 return True
2047
2048 except Exception as e:
2049 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2050 return False
2051
2052 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2053 # Launch kdus if present in the descriptor
2054
2055 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2056
2057 def _get_cluster_id(cluster_id, cluster_type):
2058 nonlocal k8scluster_id_2_uuic
2059 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2060 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2061
2062 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2063 if not db_k8scluster:
2064 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2065 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2066 if not k8s_id:
2067 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2068 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2069 return k8s_id
2070
2071 logging_text += "Deploy kdus: "
2072 step = ""
2073 try:
2074 db_nsr_update = {"_admin.deployed.K8s": []}
2075 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2076
2077 index = 0
2078 updated_cluster_list = []
2079
2080 for vnfr_data in db_vnfrs.values():
2081 for kdur in get_iterable(vnfr_data, "kdur"):
2082 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2083 vnfd_id = vnfr_data.get('vnfd-id')
2084 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2085 if kdur.get("helm-chart"):
2086 kdumodel = kdur["helm-chart"]
2087 k8sclustertype = "helm-chart"
2088 elif kdur.get("juju-bundle"):
2089 kdumodel = kdur["juju-bundle"]
2090 k8sclustertype = "juju-bundle"
2091 else:
2092 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2093 "juju-bundle. Maybe an old NBI version is running".
2094 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2095 # check if kdumodel is a file and exists
2096 try:
2097 # path format: /vnfdid/pkkdir/kdumodel
2098 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
2099 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2100 kdumodel = self.fs.path + filename
2101 except asyncio.CancelledError:
2102 raise
2103 except Exception: # it is not a file
2104 pass
2105
2106 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2107 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2108 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2109
2110 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2111 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2112 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2113 if del_repo_list or added_repo_dict:
2114 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2115 updated = {'_admin.helm_charts_added.' +
2116 item: name for item, name in added_repo_dict.items()}
2117 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2118 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2119 added_repo_dict))
2120 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2121 updated_cluster_list.append(cluster_uuid)
2122
2123 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2124 kdur["kdu-name"], k8s_cluster_id)
2125
2126 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2127 "k8scluster-type": k8sclustertype,
2128 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2129 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2130 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2131
2132 db_dict = {"collection": "nsrs",
2133 "filter": {"_id": nsr_id},
2134 "path": "_admin.deployed.K8s.{}".format(index)}
2135
2136 task = asyncio.ensure_future(
2137 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2138 atomic=True, params=desc_params,
2139 db_dict=db_dict, timeout=600,
2140 kdu_name=kdur["kdu-name"]))
2141
2142 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2143 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2144
2145 index += 1
2146
2147 except (LcmException, asyncio.CancelledError):
2148 raise
2149 except Exception as e:
2150 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2151 if isinstance(e, (N2VCException, DbException)):
2152 self.logger.error(logging_text + msg)
2153 else:
2154 self.logger.critical(logging_text + msg, exc_info=True)
2155 raise LcmException(msg)
2156 finally:
2157 if db_nsr_update:
2158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2159
2160 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2161 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2162 base_folder, task_instantiation_info, stage):
2163 # launch instantiate_N2VC in a asyncio task and register task object
2164 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2165 # if not found, create one entry and update database
2166
2167 # fill db_nsr._admin.deployed.VCA.<index>
2168 vca_index = -1
2169 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2170 if not vca_deployed:
2171 continue
2172 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2173 vca_deployed.get("vdu_id") == vdu_id and \
2174 vca_deployed.get("kdu_name") == kdu_name and \
2175 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2176 break
2177 else:
2178 # not found, create one.
2179 vca_deployed = {
2180 "member-vnf-index": member_vnf_index,
2181 "vdu_id": vdu_id,
2182 "kdu_name": kdu_name,
2183 "vdu_count_index": vdu_index,
2184 "operational-status": "init", # TODO revise
2185 "detailed-status": "", # TODO revise
2186 "step": "initial-deploy", # TODO revise
2187 "vnfd_id": vnfd_id,
2188 "vdu_name": vdu_name,
2189 }
2190 vca_index += 1
2191
2192 # create VCA and configurationStatus in db
2193 db_dict = {
2194 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2195 "configurationStatus.{}".format(vca_index): dict()
2196 }
2197 self.update_db_2("nsrs", nsr_id, db_dict)
2198
2199 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2200
2201 # Launch task
2202 task_n2vc = asyncio.ensure_future(
2203 self.instantiate_N2VC(
2204 logging_text=logging_text,
2205 vca_index=vca_index,
2206 nsi_id=nsi_id,
2207 db_nsr=db_nsr,
2208 db_vnfr=db_vnfr,
2209 vdu_id=vdu_id,
2210 kdu_name=kdu_name,
2211 vdu_index=vdu_index,
2212 deploy_params=deploy_params,
2213 config_descriptor=descriptor_config,
2214 base_folder=base_folder,
2215 nslcmop_id=nslcmop_id,
2216 stage=stage
2217 )
2218 )
2219 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2220 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2221 member_vnf_index or "", vdu_id or "")
2222
2223 # Check if this VNFD has a configured terminate action
2224 def _has_terminate_config_primitive(self, vnfd):
2225 vnf_config = vnfd.get("vnf-configuration")
2226 if vnf_config and vnf_config.get("terminate-config-primitive"):
2227 return True
2228 else:
2229 return False
2230
2231 @staticmethod
2232 def _get_terminate_config_primitive_seq_list(vnfd):
2233 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2234 # No need to check for existing primitive twice, already done before
2235 vnf_config = vnfd.get("vnf-configuration")
2236 seq_list = vnf_config.get("terminate-config-primitive")
2237 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2238 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2239 return seq_list_sorted
2240
2241 @staticmethod
2242 def _create_nslcmop(nsr_id, operation, params):
2243 """
2244 Creates a ns-lcm-opp content to be stored at database.
2245 :param nsr_id: internal id of the instance
2246 :param operation: instantiate, terminate, scale, action, ...
2247 :param params: user parameters for the operation
2248 :return: dictionary following SOL005 format
2249 """
2250 # Raise exception if invalid arguments
2251 if not (nsr_id and operation and params):
2252 raise LcmException(
2253 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2254 now = time()
2255 _id = str(uuid4())
2256 nslcmop = {
2257 "id": _id,
2258 "_id": _id,
2259 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2260 "operationState": "PROCESSING",
2261 "statusEnteredTime": now,
2262 "nsInstanceId": nsr_id,
2263 "lcmOperationType": operation,
2264 "startTime": now,
2265 "isAutomaticInvocation": False,
2266 "operationParams": params,
2267 "isCancelPending": False,
2268 "links": {
2269 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2270 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2271 }
2272 }
2273 return nslcmop
2274
2275 def _format_additional_params(self, params):
2276 params = params or {}
2277 for key, value in params.items():
2278 if str(value).startswith("!!yaml "):
2279 params[key] = yaml.safe_load(value[7:])
2280 return params
2281
2282 def _get_terminate_primitive_params(self, seq, vnf_index):
2283 primitive = seq.get('name')
2284 primitive_params = {}
2285 params = {
2286 "member_vnf_index": vnf_index,
2287 "primitive": primitive,
2288 "primitive_params": primitive_params,
2289 }
2290 desc_params = {}
2291 return self._map_primitive_params(seq, params, desc_params)
2292
2293 # sub-operations
2294
2295 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2296 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2297 if (op.get('operationState') == 'COMPLETED'):
2298 # b. Skip sub-operation
2299 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2300 return self.SUBOPERATION_STATUS_SKIP
2301 else:
2302 # c. Reintent executing sub-operation
2303 # The sub-operation exists, and operationState != 'COMPLETED'
2304 # Update operationState = 'PROCESSING' to indicate a reintent.
2305 operationState = 'PROCESSING'
2306 detailed_status = 'In progress'
2307 self._update_suboperation_status(
2308 db_nslcmop, op_index, operationState, detailed_status)
2309 # Return the sub-operation index
2310 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2311 # with arguments extracted from the sub-operation
2312 return op_index
2313
2314 # Find a sub-operation where all keys in a matching dictionary must match
2315 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2316 def _find_suboperation(self, db_nslcmop, match):
2317 if (db_nslcmop and match):
2318 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2319 for i, op in enumerate(op_list):
2320 if all(op.get(k) == match[k] for k in match):
2321 return i
2322 return self.SUBOPERATION_STATUS_NOT_FOUND
2323
2324 # Update status for a sub-operation given its index
2325 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2326 # Update DB for HA tasks
2327 q_filter = {'_id': db_nslcmop['_id']}
2328 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2329 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2330 self.db.set_one("nslcmops",
2331 q_filter=q_filter,
2332 update_dict=update_dict,
2333 fail_on_empty=False)
2334
2335 # Add sub-operation, return the index of the added sub-operation
2336 # Optionally, set operationState, detailed-status, and operationType
2337 # Status and type are currently set for 'scale' sub-operations:
2338 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2339 # 'detailed-status' : status message
2340 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2341 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2342 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2343 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2344 RO_nsr_id=None, RO_scaling_info=None):
2345 if not db_nslcmop:
2346 return self.SUBOPERATION_STATUS_NOT_FOUND
2347 # Get the "_admin.operations" list, if it exists
2348 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2349 op_list = db_nslcmop_admin.get('operations')
2350 # Create or append to the "_admin.operations" list
2351 new_op = {'member_vnf_index': vnf_index,
2352 'vdu_id': vdu_id,
2353 'vdu_count_index': vdu_count_index,
2354 'primitive': primitive,
2355 'primitive_params': mapped_primitive_params}
2356 if operationState:
2357 new_op['operationState'] = operationState
2358 if detailed_status:
2359 new_op['detailed-status'] = detailed_status
2360 if operationType:
2361 new_op['lcmOperationType'] = operationType
2362 if RO_nsr_id:
2363 new_op['RO_nsr_id'] = RO_nsr_id
2364 if RO_scaling_info:
2365 new_op['RO_scaling_info'] = RO_scaling_info
2366 if not op_list:
2367 # No existing operations, create key 'operations' with current operation as first list element
2368 db_nslcmop_admin.update({'operations': [new_op]})
2369 op_list = db_nslcmop_admin.get('operations')
2370 else:
2371 # Existing operations, append operation to list
2372 op_list.append(new_op)
2373
2374 db_nslcmop_update = {'_admin.operations': op_list}
2375 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2376 op_index = len(op_list) - 1
2377 return op_index
2378
2379 # Helper methods for scale() sub-operations
2380
2381 # pre-scale/post-scale:
2382 # Check for 3 different cases:
2383 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2384 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2385 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2386 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2387 operationType, RO_nsr_id=None, RO_scaling_info=None):
2388 # Find this sub-operation
2389 if (RO_nsr_id and RO_scaling_info):
2390 operationType = 'SCALE-RO'
2391 match = {
2392 'member_vnf_index': vnf_index,
2393 'RO_nsr_id': RO_nsr_id,
2394 'RO_scaling_info': RO_scaling_info,
2395 }
2396 else:
2397 match = {
2398 'member_vnf_index': vnf_index,
2399 'primitive': vnf_config_primitive,
2400 'primitive_params': primitive_params,
2401 'lcmOperationType': operationType
2402 }
2403 op_index = self._find_suboperation(db_nslcmop, match)
2404 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2405 # a. New sub-operation
2406 # The sub-operation does not exist, add it.
2407 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2408 # The following parameters are set to None for all kind of scaling:
2409 vdu_id = None
2410 vdu_count_index = None
2411 vdu_name = None
2412 if (RO_nsr_id and RO_scaling_info):
2413 vnf_config_primitive = None
2414 primitive_params = None
2415 else:
2416 RO_nsr_id = None
2417 RO_scaling_info = None
2418 # Initial status for sub-operation
2419 operationState = 'PROCESSING'
2420 detailed_status = 'In progress'
2421 # Add sub-operation for pre/post-scaling (zero or more operations)
2422 self._add_suboperation(db_nslcmop,
2423 vnf_index,
2424 vdu_id,
2425 vdu_count_index,
2426 vdu_name,
2427 vnf_config_primitive,
2428 primitive_params,
2429 operationState,
2430 detailed_status,
2431 operationType,
2432 RO_nsr_id,
2433 RO_scaling_info)
2434 return self.SUBOPERATION_STATUS_NEW
2435 else:
2436 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2437 # or op_index (operationState != 'COMPLETED')
2438 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2439
2440 # Function to return execution_environment id
2441
2442 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2443 # TODO vdu_index_count
2444 for vca in vca_deployed_list:
2445 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2446 return vca["ee_id"]
2447
2448 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2449 """
2450 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2451 :param logging_text:
2452 :param db_nslcmop:
2453 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2454 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2455 :param vca_index: index in the database _admin.deployed.VCA
2456 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2457 :return: None or exception
2458 """
2459 # execute terminate_primitives
2460 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2461 vdu_id = vca_deployed.get("vdu_id")
2462 vdu_count_index = vca_deployed.get("vdu_count_index")
2463 vdu_name = vca_deployed.get("vdu_name")
2464 vnf_index = vca_deployed.get("member-vnf-index")
2465 if terminate_primitives and vca_deployed.get("needed_terminate"):
2466 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2467 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2468 for seq in terminate_primitives:
2469 # For each sequence in list, get primitive and call _ns_execute_primitive()
2470 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2471 vnf_index, seq.get("name"))
2472 self.logger.debug(logging_text + step)
2473 # Create the primitive for each sequence, i.e. "primitive": "touch"
2474 primitive = seq.get('name')
2475 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2476 # The following 3 parameters are currently set to None for 'terminate':
2477 # vdu_id, vdu_count_index, vdu_name
2478
2479 # Add sub-operation
2480 self._add_suboperation(db_nslcmop,
2481 vnf_index,
2482 vdu_id,
2483 vdu_count_index,
2484 vdu_name,
2485 primitive,
2486 mapped_primitive_params)
2487 # Sub-operations: Call _ns_execute_primitive() instead of action()
2488 try:
2489 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2490 mapped_primitive_params)
2491 except LcmException:
2492 # this happens when VCA is not deployed. In this case it is not needed to terminate
2493 continue
2494 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2495 if result not in result_ok:
2496 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2497 "error {}".format(seq.get("name"), vnf_index, result_detail))
2498 # set that this VCA do not need terminated
2499 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2500 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2501
2502 if destroy_ee:
2503 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2504
2505 async def _delete_N2VC(self, nsr_id: str):
2506 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2507 namespace = "." + nsr_id
2508 await self.n2vc.delete_namespace(namespace=namespace)
2509 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2510
2511 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2512 """
2513 Terminates a deployment from RO
2514 :param logging_text:
2515 :param nsr_deployed: db_nsr._admin.deployed
2516 :param nsr_id:
2517 :param nslcmop_id:
2518 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2519 this method will update only the index 2, but it will write on database the concatenated content of the list
2520 :return:
2521 """
2522 db_nsr_update = {}
2523 failed_detail = []
2524 ro_nsr_id = ro_delete_action = None
2525 if nsr_deployed and nsr_deployed.get("RO"):
2526 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2527 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2528 try:
2529 if ro_nsr_id:
2530 stage[2] = "Deleting ns from VIM."
2531 db_nsr_update["detailed-status"] = " ".join(stage)
2532 self._write_op_status(nslcmop_id, stage)
2533 self.logger.debug(logging_text + stage[2])
2534 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2535 self._write_op_status(nslcmop_id, stage)
2536 desc = await self.RO.delete("ns", ro_nsr_id)
2537 ro_delete_action = desc["action_id"]
2538 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2539 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2540 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2541 if ro_delete_action:
2542 # wait until NS is deleted from VIM
2543 stage[2] = "Waiting ns deleted from VIM."
2544 detailed_status_old = None
2545 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2546 ro_delete_action))
2547 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2548 self._write_op_status(nslcmop_id, stage)
2549
2550 delete_timeout = 20 * 60 # 20 minutes
2551 while delete_timeout > 0:
2552 desc = await self.RO.show(
2553 "ns",
2554 item_id_name=ro_nsr_id,
2555 extra_item="action",
2556 extra_item_id=ro_delete_action)
2557
2558 # deploymentStatus
2559 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2560
2561 ns_status, ns_status_info = self.RO.check_action_status(desc)
2562 if ns_status == "ERROR":
2563 raise ROclient.ROClientException(ns_status_info)
2564 elif ns_status == "BUILD":
2565 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2566 elif ns_status == "ACTIVE":
2567 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2568 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2569 break
2570 else:
2571 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2572 if stage[2] != detailed_status_old:
2573 detailed_status_old = stage[2]
2574 db_nsr_update["detailed-status"] = " ".join(stage)
2575 self._write_op_status(nslcmop_id, stage)
2576 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2577 await asyncio.sleep(5, loop=self.loop)
2578 delete_timeout -= 5
2579 else: # delete_timeout <= 0:
2580 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2581
2582 except Exception as e:
2583 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2584 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2585 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2586 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2587 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2588 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2589 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2590 failed_detail.append("delete conflict: {}".format(e))
2591 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2592 else:
2593 failed_detail.append("delete error: {}".format(e))
2594 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2595
2596 # Delete nsd
2597 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2598 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2599 try:
2600 stage[2] = "Deleting nsd from RO."
2601 db_nsr_update["detailed-status"] = " ".join(stage)
2602 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2603 self._write_op_status(nslcmop_id, stage)
2604 await self.RO.delete("nsd", ro_nsd_id)
2605 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2606 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2607 except Exception as e:
2608 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2609 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2610 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2611 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2612 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2613 self.logger.debug(logging_text + failed_detail[-1])
2614 else:
2615 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2616 self.logger.error(logging_text + failed_detail[-1])
2617
2618 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2619 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2620 if not vnf_deployed or not vnf_deployed["id"]:
2621 continue
2622 try:
2623 ro_vnfd_id = vnf_deployed["id"]
2624 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2625 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2626 db_nsr_update["detailed-status"] = " ".join(stage)
2627 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2628 self._write_op_status(nslcmop_id, stage)
2629 await self.RO.delete("vnfd", ro_vnfd_id)
2630 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2631 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2632 except Exception as e:
2633 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2634 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2635 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2636 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2637 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2638 self.logger.debug(logging_text + failed_detail[-1])
2639 else:
2640 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2641 self.logger.error(logging_text + failed_detail[-1])
2642
2643 if failed_detail:
2644 stage[2] = "Error deleting from VIM"
2645 else:
2646 stage[2] = "Deleted from VIM"
2647 db_nsr_update["detailed-status"] = " ".join(stage)
2648 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2649 self._write_op_status(nslcmop_id, stage)
2650
2651 if failed_detail:
2652 raise LcmException("; ".join(failed_detail))
2653
2654 async def terminate(self, nsr_id, nslcmop_id):
2655 # Try to lock HA task here
2656 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2657 if not task_is_locked_by_me:
2658 return
2659
2660 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2661 self.logger.debug(logging_text + "Enter")
2662 timeout_ns_terminate = self.timeout_ns_terminate
2663 db_nsr = None
2664 db_nslcmop = None
2665 exc = None
2666 error_list = [] # annotates all failed error messages
2667 db_nslcmop_update = {}
2668 autoremove = False # autoremove after terminated
2669 tasks_dict_info = {}
2670 db_nsr_update = {}
2671 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2672 # ^ contains [stage, step, VIM-status]
2673 try:
2674 # wait for any previous tasks in process
2675 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2676
2677 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2678 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2679 operation_params = db_nslcmop.get("operationParams") or {}
2680 if operation_params.get("timeout_ns_terminate"):
2681 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2682 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2683 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2684
2685 db_nsr_update["operational-status"] = "terminating"
2686 db_nsr_update["config-status"] = "terminating"
2687 self._write_ns_status(
2688 nsr_id=nsr_id,
2689 ns_state="TERMINATING",
2690 current_operation="TERMINATING",
2691 current_operation_id=nslcmop_id,
2692 other_update=db_nsr_update
2693 )
2694 self._write_op_status(
2695 op_id=nslcmop_id,
2696 queuePosition=0,
2697 stage=stage
2698 )
2699 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2700 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2701 return
2702
2703 stage[1] = "Getting vnf descriptors from db."
2704 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2705 db_vnfds_from_id = {}
2706 db_vnfds_from_member_index = {}
2707 # Loop over VNFRs
2708 for vnfr in db_vnfrs_list:
2709 vnfd_id = vnfr["vnfd-id"]
2710 if vnfd_id not in db_vnfds_from_id:
2711 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2712 db_vnfds_from_id[vnfd_id] = vnfd
2713 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2714
2715 # Destroy individual execution environments when there are terminating primitives.
2716 # Rest of EE will be deleted at once
2717 if not operation_params.get("skip_terminate_primitives"):
2718 stage[0] = "Stage 2/3 execute terminating primitives."
2719 stage[1] = "Looking execution environment that needs terminate."
2720 self.logger.debug(logging_text + stage[1])
2721 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2722 config_descriptor = None
2723 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2724 continue
2725 if not vca.get("member-vnf-index"):
2726 # ns
2727 config_descriptor = db_nsr.get("ns-configuration")
2728 elif vca.get("vdu_id"):
2729 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2730 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2731 if vdud:
2732 config_descriptor = vdud.get("vdu-configuration")
2733 elif vca.get("kdu_name"):
2734 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2735 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2736 if kdud:
2737 config_descriptor = kdud.get("kdu-configuration")
2738 else:
2739 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2740 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2741 vca_index, False))
2742 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2743
2744 # wait for pending tasks of terminate primitives
2745 if tasks_dict_info:
2746 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2747 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2748 min(self.timeout_charm_delete, timeout_ns_terminate),
2749 stage, nslcmop_id)
2750 if error_list:
2751 return # raise LcmException("; ".join(error_list))
2752 tasks_dict_info.clear()
2753
2754 # remove All execution environments at once
2755 stage[0] = "Stage 3/3 delete all."
2756 stage[1] = "Deleting all execution environments."
2757 self.logger.debug(logging_text + stage[1])
2758
2759 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2760 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2761 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2762
2763 # Delete from k8scluster
2764 stage[1] = "Deleting KDUs."
2765 self.logger.debug(logging_text + stage[1])
2766 # print(nsr_deployed)
2767 for kdu in get_iterable(nsr_deployed, "K8s"):
2768 if not kdu or not kdu.get("kdu-instance"):
2769 continue
2770 kdu_instance = kdu.get("kdu-instance")
2771 if kdu.get("k8scluster-type") in self.k8scluster_map:
2772 task_delete_kdu_instance = asyncio.ensure_future(
2773 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2774 cluster_uuid=kdu.get("k8scluster-uuid"),
2775 kdu_instance=kdu_instance))
2776 else:
2777 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2778 format(kdu.get("k8scluster-type")))
2779 continue
2780 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2781
2782 # remove from RO
2783 stage[1] = "Deleting ns from VIM."
2784 task_delete_ro = asyncio.ensure_future(
2785 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2786 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2787
2788 # rest of staff will be done at finally
2789
2790 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2791 self.logger.error(logging_text + "Exit Exception {}".format(e))
2792 exc = e
2793 except asyncio.CancelledError:
2794 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2795 exc = "Operation was cancelled"
2796 except Exception as e:
2797 exc = traceback.format_exc()
2798 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2799 finally:
2800 if exc:
2801 error_list.append(str(exc))
2802 try:
2803 # wait for pending tasks
2804 if tasks_dict_info:
2805 stage[1] = "Waiting for terminate pending tasks."
2806 self.logger.debug(logging_text + stage[1])
2807 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2808 stage, nslcmop_id)
2809 stage[1] = stage[2] = ""
2810 except asyncio.CancelledError:
2811 error_list.append("Cancelled")
2812 # TODO cancell all tasks
2813 except Exception as exc:
2814 error_list.append(str(exc))
2815 # update status at database
2816 if error_list:
2817 error_detail = "; ".join(error_list)
2818 # self.logger.error(logging_text + error_detail)
2819 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2820 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2821
2822 db_nsr_update["operational-status"] = "failed"
2823 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2824 db_nslcmop_update["detailed-status"] = error_detail
2825 nslcmop_operation_state = "FAILED"
2826 ns_state = "BROKEN"
2827 else:
2828 error_detail = None
2829 error_description_nsr = error_description_nslcmop = None
2830 ns_state = "NOT_INSTANTIATED"
2831 db_nsr_update["operational-status"] = "terminated"
2832 db_nsr_update["detailed-status"] = "Done"
2833 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2834 db_nslcmop_update["detailed-status"] = "Done"
2835 nslcmop_operation_state = "COMPLETED"
2836
2837 if db_nsr:
2838 self._write_ns_status(
2839 nsr_id=nsr_id,
2840 ns_state=ns_state,
2841 current_operation="IDLE",
2842 current_operation_id=None,
2843 error_description=error_description_nsr,
2844 error_detail=error_detail,
2845 other_update=db_nsr_update
2846 )
2847 if db_nslcmop:
2848 self._write_op_status(
2849 op_id=nslcmop_id,
2850 stage="",
2851 error_message=error_description_nslcmop,
2852 operation_state=nslcmop_operation_state,
2853 other_update=db_nslcmop_update,
2854 )
2855 autoremove = operation_params.get("autoremove", False)
2856 if nslcmop_operation_state:
2857 try:
2858 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2859 "operationState": nslcmop_operation_state,
2860 "autoremove": autoremove},
2861 loop=self.loop)
2862 except Exception as e:
2863 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2864
2865 self.logger.debug(logging_text + "Exit")
2866 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2867
2868 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2869 time_start = time()
2870 error_detail_list = []
2871 error_list = []
2872 pending_tasks = list(created_tasks_info.keys())
2873 num_tasks = len(pending_tasks)
2874 num_done = 0
2875 stage[1] = "{}/{}.".format(num_done, num_tasks)
2876 self._write_op_status(nslcmop_id, stage)
2877 while pending_tasks:
2878 new_error = None
2879 _timeout = timeout + time_start - time()
2880 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2881 return_when=asyncio.FIRST_COMPLETED)
2882 num_done += len(done)
2883 if not done: # Timeout
2884 for task in pending_tasks:
2885 new_error = created_tasks_info[task] + ": Timeout"
2886 error_detail_list.append(new_error)
2887 error_list.append(new_error)
2888 break
2889 for task in done:
2890 if task.cancelled():
2891 new_error = created_tasks_info[task] + ": Cancelled"
2892 self.logger.warn(logging_text + new_error)
2893 error_detail_list.append(new_error)
2894 error_list.append(new_error)
2895 else:
2896 exc = task.exception()
2897 if exc:
2898 new_error = created_tasks_info[task] + ": {}".format(exc)
2899 error_list.append(created_tasks_info[task])
2900 error_detail_list.append(new_error)
2901 if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)):
2902 self.logger.error(logging_text + new_error)
2903 else:
2904 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2905 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2906 else:
2907 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2908 stage[1] = "{}/{}.".format(num_done, num_tasks)
2909 if new_error:
2910 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2911 if nsr_id: # update also nsr
2912 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2913 "errorDetail": ". ".join(error_detail_list)})
2914 self._write_op_status(nslcmop_id, stage)
2915 return error_detail_list
2916
2917 @staticmethod
2918 def _map_primitive_params(primitive_desc, params, instantiation_params):
2919 """
2920 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2921 The default-value is used. If it is between < > it look for a value at instantiation_params
2922 :param primitive_desc: portion of VNFD/NSD that describes primitive
2923 :param params: Params provided by user
2924 :param instantiation_params: Instantiation params provided by user
2925 :return: a dictionary with the calculated params
2926 """
2927 calculated_params = {}
2928 for parameter in primitive_desc.get("parameter", ()):
2929 param_name = parameter["name"]
2930 if param_name in params:
2931 calculated_params[param_name] = params[param_name]
2932 elif "default-value" in parameter or "value" in parameter:
2933 if "value" in parameter:
2934 calculated_params[param_name] = parameter["value"]
2935 else:
2936 calculated_params[param_name] = parameter["default-value"]
2937 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2938 and calculated_params[param_name].endswith(">"):
2939 if calculated_params[param_name][1:-1] in instantiation_params:
2940 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2941 else:
2942 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2943 format(calculated_params[param_name], primitive_desc["name"]))
2944 else:
2945 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2946 format(param_name, primitive_desc["name"]))
2947
2948 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2949 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2950 width=256)
2951 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2952 calculated_params[param_name] = calculated_params[param_name][7:]
2953
2954 # add always ns_config_info if primitive name is config
2955 if primitive_desc["name"] == "config":
2956 if "ns_config_info" in instantiation_params:
2957 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2958 return calculated_params
2959
2960 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None):
2961 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2962 for vca in deployed_vca:
2963 if not vca:
2964 continue
2965 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2966 continue
2967 if vdu_name and vdu_name != vca["vdu_name"]:
2968 continue
2969 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2970 continue
2971 if kdu_name and kdu_name != vca["kdu_name"]:
2972 continue
2973 break
2974 else:
2975 # vca_deployed not found
2976 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2977 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2978
2979 # get ee_id
2980 ee_id = vca.get("ee_id")
2981 if not ee_id:
2982 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2983 "execution environment"
2984 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2985 return ee_id
2986
2987 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
2988 retries_interval=30) -> (str, str):
2989 try:
2990 if primitive == "config":
2991 primitive_params = {"params": primitive_params}
2992
2993 while retries >= 0:
2994 try:
2995 output = await self.n2vc.exec_primitive(
2996 ee_id=ee_id,
2997 primitive_name=primitive,
2998 params_dict=primitive_params
2999 )
3000 # execution was OK
3001 break
3002 except Exception as e:
3003 retries -= 1
3004 if retries >= 0:
3005 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3006 # wait and retry
3007 await asyncio.sleep(retries_interval, loop=self.loop)
3008 else:
3009 return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e)
3010
3011 return 'COMPLETED', output
3012
3013 except LcmException:
3014 raise
3015 except Exception as e:
3016 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3017
3018 async def action(self, nsr_id, nslcmop_id):
3019
3020 # Try to lock HA task here
3021 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3022 if not task_is_locked_by_me:
3023 return
3024
3025 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3026 self.logger.debug(logging_text + "Enter")
3027 # get all needed from database
3028 db_nsr = None
3029 db_nslcmop = None
3030 db_nsr_update = {}
3031 db_nslcmop_update = {}
3032 nslcmop_operation_state = None
3033 nslcmop_operation_state_detail = None
3034 exc = None
3035 try:
3036 # wait for any previous tasks in process
3037 step = "Waiting for previous operations to terminate"
3038 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3039
3040 self._write_ns_status(
3041 nsr_id=nsr_id,
3042 ns_state=None,
3043 current_operation="RUNNING ACTION",
3044 current_operation_id=nslcmop_id
3045 )
3046
3047 step = "Getting information from database"
3048 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3049 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3050
3051 nsr_deployed = db_nsr["_admin"].get("deployed")
3052 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3053 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3054 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3055 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3056 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3057
3058 if vnf_index:
3059 step = "Getting vnfr from database"
3060 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3061 step = "Getting vnfd from database"
3062 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3063 else:
3064 if db_nsr.get("nsd"):
3065 db_nsd = db_nsr.get("nsd") # TODO this will be removed
3066 else:
3067 step = "Getting nsd from database"
3068 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3069
3070 # for backward compatibility
3071 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3072 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3073 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3074 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3075
3076 primitive = db_nslcmop["operationParams"]["primitive"]
3077 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3078
3079 # look for primitive
3080 config_primitive_desc = None
3081 if vdu_id:
3082 for vdu in get_iterable(db_vnfd, "vdu"):
3083 if vdu_id == vdu["id"]:
3084 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
3085 if config_primitive["name"] == primitive:
3086 config_primitive_desc = config_primitive
3087 break
3088 elif kdu_name:
3089 self.logger.debug(logging_text + "Checking actions in KDUs")
3090 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3091 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
3092 if primitive_params:
3093 desc_params.update(primitive_params)
3094 # TODO Check if we will need something at vnf level
3095 index = 0
3096 for kdu in get_iterable(nsr_deployed, "K8s"):
3097 if kdu_name == kdu["kdu-name"]:
3098 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3099 "path": "_admin.deployed.K8s.{}".format(index)}
3100 if primitive == "upgrade":
3101 if desc_params.get("kdu_model"):
3102 kdu_model = desc_params.get("kdu_model")
3103 del desc_params["kdu_model"]
3104 else:
3105 kdu_model = kdu.get("kdu-model")
3106 parts = kdu_model.split(sep=":")
3107 if len(parts) == 2:
3108 kdu_model = parts[0]
3109
3110 if kdu.get("k8scluster-type") in self.k8scluster_map:
3111 output = await self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3112 cluster_uuid=kdu.get("k8scluster-uuid"),
3113 kdu_instance=kdu.get("kdu-instance"),
3114 atomic=True, kdu_model=kdu_model,
3115 params=desc_params, db_dict=db_dict,
3116 timeout=300)
3117
3118 else:
3119 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3120 raise LcmException(msg)
3121
3122 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3123 break
3124 elif primitive == "rollback":
3125 if kdu.get("k8scluster-type") in self.k8scluster_map:
3126 output = await self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3127 cluster_uuid=kdu.get("k8scluster-uuid"),
3128 kdu_instance=kdu.get("kdu-instance"),
3129 db_dict=db_dict)
3130 else:
3131 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3132 raise LcmException(msg)
3133 break
3134 elif primitive == "status":
3135 if kdu.get("k8scluster-type") in self.k8scluster_map:
3136 output = await self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3137 cluster_uuid=kdu.get("k8scluster-uuid"),
3138 kdu_instance=kdu.get("kdu-instance"))
3139 else:
3140 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3141 raise LcmException(msg)
3142 break
3143 index += 1
3144
3145 else:
3146 raise LcmException("KDU '{}' not found".format(kdu_name))
3147 if output:
3148 db_nslcmop_update["detailed-status"] = output
3149 db_nslcmop_update["operationState"] = 'COMPLETED'
3150 db_nslcmop_update["statusEnteredTime"] = time()
3151 else:
3152 db_nslcmop_update["detailed-status"] = ''
3153 db_nslcmop_update["operationState"] = 'FAILED'
3154 db_nslcmop_update["statusEnteredTime"] = time()
3155 return
3156 elif vnf_index:
3157 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3158 if config_primitive["name"] == primitive:
3159 config_primitive_desc = config_primitive
3160 break
3161 else:
3162 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3163 if config_primitive["name"] == primitive:
3164 config_primitive_desc = config_primitive
3165 break
3166
3167 if not config_primitive_desc:
3168 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3169 format(primitive))
3170
3171 desc_params = {}
3172 if vnf_index:
3173 if db_vnfr.get("additionalParamsForVnf"):
3174 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3175 if vdu_id:
3176 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3177 if vdur.get("additionalParams"):
3178 desc_params = self._format_additional_params(vdur["additionalParams"])
3179 else:
3180 if db_nsr.get("additionalParamsForNs"):
3181 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3182
3183 # TODO check if ns is in a proper status
3184 result, detailed_status = await self._ns_execute_primitive(
3185 self._look_for_deployed_vca(nsr_deployed["VCA"],
3186 member_vnf_index=vnf_index,
3187 vdu_id=vdu_id,
3188 vdu_name=vdu_name,
3189 vdu_count_index=vdu_count_index),
3190 primitive=primitive,
3191 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3192
3193 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3194 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3195 db_nslcmop_update["statusEnteredTime"] = time()
3196 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3197 return # database update is called inside finally
3198
3199 except (DbException, LcmException) as e:
3200 self.logger.error(logging_text + "Exit Exception {}".format(e))
3201 exc = e
3202 except asyncio.CancelledError:
3203 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3204 exc = "Operation was cancelled"
3205 except Exception as e:
3206 exc = traceback.format_exc()
3207 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3208 finally:
3209 if exc and db_nslcmop:
3210 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3211 "FAILED {}: {}".format(step, exc)
3212 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3213 db_nslcmop_update["statusEnteredTime"] = time()
3214 try:
3215 if db_nslcmop_update:
3216 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3217 if db_nsr:
3218 self._write_ns_status(
3219 nsr_id=nsr_id,
3220 ns_state=None,
3221 current_operation="IDLE",
3222 current_operation_id=None,
3223 other_update=db_nsr_update
3224 )
3225 if exc:
3226 self._write_op_status(
3227 op_id=nslcmop_id,
3228 error_message=nslcmop_operation_state_detail
3229 )
3230 except DbException as e:
3231 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3232 self.logger.debug(logging_text + "Exit")
3233 if nslcmop_operation_state:
3234 try:
3235 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3236 "operationState": nslcmop_operation_state},
3237 loop=self.loop)
3238 except Exception as e:
3239 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3240 self.logger.debug(logging_text + "Exit")
3241 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3242 return nslcmop_operation_state, nslcmop_operation_state_detail
3243
3244 async def scale(self, nsr_id, nslcmop_id):
3245
3246 # Try to lock HA task here
3247 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3248 if not task_is_locked_by_me:
3249 return
3250
3251 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3252 self.logger.debug(logging_text + "Enter")
3253 # get all needed from database
3254 db_nsr = None
3255 db_nslcmop = None
3256 db_nslcmop_update = {}
3257 nslcmop_operation_state = None
3258 db_nsr_update = {}
3259 exc = None
3260 # in case of error, indicates what part of scale was failed to put nsr at error status
3261 scale_process = None
3262 old_operational_status = ""
3263 old_config_status = ""
3264 vnfr_scaled = False
3265 try:
3266 # wait for any previous tasks in process
3267 step = "Waiting for previous operations to terminate"
3268 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3269
3270 self._write_ns_status(
3271 nsr_id=nsr_id,
3272 ns_state=None,
3273 current_operation="SCALING",
3274 current_operation_id=nslcmop_id
3275 )
3276
3277 step = "Getting nslcmop from database"
3278 self.logger.debug(step + " after having waited for previous tasks to be completed")
3279 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3280 step = "Getting nsr from database"
3281 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3282
3283 old_operational_status = db_nsr["operational-status"]
3284 old_config_status = db_nsr["config-status"]
3285 step = "Parsing scaling parameters"
3286 # self.logger.debug(step)
3287 db_nsr_update["operational-status"] = "scaling"
3288 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3289 nsr_deployed = db_nsr["_admin"].get("deployed")
3290
3291 #######
3292 nsr_deployed = db_nsr["_admin"].get("deployed")
3293 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3294 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3295 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3296 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3297 #######
3298
3299 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3300 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3301 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3302 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3303 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3304
3305 # for backward compatibility
3306 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3307 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3308 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3309 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3310
3311 step = "Getting vnfr from database"
3312 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3313 step = "Getting vnfd from database"
3314 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3315
3316 step = "Getting scaling-group-descriptor"
3317 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3318 if scaling_descriptor["name"] == scaling_group:
3319 break
3320 else:
3321 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3322 "at vnfd:scaling-group-descriptor".format(scaling_group))
3323
3324 # cooldown_time = 0
3325 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3326 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3327 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3328 # break
3329
3330 # TODO check if ns is in a proper status
3331 step = "Sending scale order to VIM"
3332 nb_scale_op = 0
3333 if not db_nsr["_admin"].get("scaling-group"):
3334 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3335 admin_scale_index = 0
3336 else:
3337 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3338 if admin_scale_info["name"] == scaling_group:
3339 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3340 break
3341 else: # not found, set index one plus last element and add new entry with the name
3342 admin_scale_index += 1
3343 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3344 RO_scaling_info = []
3345 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3346 if scaling_type == "SCALE_OUT":
3347 # count if max-instance-count is reached
3348 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3349 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3350 if nb_scale_op >= max_instance_count:
3351 raise LcmException("reached the limit of {} (max-instance-count) "
3352 "scaling-out operations for the "
3353 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3354
3355 nb_scale_op += 1
3356 vdu_scaling_info["scaling_direction"] = "OUT"
3357 vdu_scaling_info["vdu-create"] = {}
3358 for vdu_scale_info in scaling_descriptor["vdu"]:
3359 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3360 "type": "create", "count": vdu_scale_info.get("count", 1)})
3361 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3362
3363 elif scaling_type == "SCALE_IN":
3364 # count if min-instance-count is reached
3365 min_instance_count = 0
3366 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3367 min_instance_count = int(scaling_descriptor["min-instance-count"])
3368 if nb_scale_op <= min_instance_count:
3369 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3370 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3371 nb_scale_op -= 1
3372 vdu_scaling_info["scaling_direction"] = "IN"
3373 vdu_scaling_info["vdu-delete"] = {}
3374 for vdu_scale_info in scaling_descriptor["vdu"]:
3375 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3376 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3377 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3378
3379 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3380 vdu_create = vdu_scaling_info.get("vdu-create")
3381 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3382 if vdu_scaling_info["scaling_direction"] == "IN":
3383 for vdur in reversed(db_vnfr["vdur"]):
3384 if vdu_delete.get(vdur["vdu-id-ref"]):
3385 vdu_delete[vdur["vdu-id-ref"]] -= 1
3386 vdu_scaling_info["vdu"].append({
3387 "name": vdur["name"],
3388 "vdu_id": vdur["vdu-id-ref"],
3389 "interface": []
3390 })
3391 for interface in vdur["interfaces"]:
3392 vdu_scaling_info["vdu"][-1]["interface"].append({
3393 "name": interface["name"],
3394 "ip_address": interface["ip-address"],
3395 "mac_address": interface.get("mac-address"),
3396 })
3397 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3398
3399 # PRE-SCALE BEGIN
3400 step = "Executing pre-scale vnf-config-primitive"
3401 if scaling_descriptor.get("scaling-config-action"):
3402 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3403 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3404 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3405 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3406 step = db_nslcmop_update["detailed-status"] = \
3407 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3408
3409 # look for primitive
3410 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3411 if config_primitive["name"] == vnf_config_primitive:
3412 break
3413 else:
3414 raise LcmException(
3415 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3416 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3417 "primitive".format(scaling_group, config_primitive))
3418
3419 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3420 if db_vnfr.get("additionalParamsForVnf"):
3421 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3422
3423 scale_process = "VCA"
3424 db_nsr_update["config-status"] = "configuring pre-scaling"
3425 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3426
3427 # Pre-scale reintent check: Check if this sub-operation has been executed before
3428 op_index = self._check_or_add_scale_suboperation(
3429 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3430 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3431 # Skip sub-operation
3432 result = 'COMPLETED'
3433 result_detail = 'Done'
3434 self.logger.debug(logging_text +
3435 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3436 vnf_config_primitive, result, result_detail))
3437 else:
3438 if (op_index == self.SUBOPERATION_STATUS_NEW):
3439 # New sub-operation: Get index of this sub-operation
3440 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3441 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3442 format(vnf_config_primitive))
3443 else:
3444 # Reintent: Get registered params for this existing sub-operation
3445 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3446 vnf_index = op.get('member_vnf_index')
3447 vnf_config_primitive = op.get('primitive')
3448 primitive_params = op.get('primitive_params')
3449 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3450 format(vnf_config_primitive))
3451 # Execute the primitive, either with new (first-time) or registered (reintent) args
3452 result, result_detail = await self._ns_execute_primitive(
3453 self._look_for_deployed_vca(nsr_deployed["VCA"],
3454 member_vnf_index=vnf_index,
3455 vdu_id=None,
3456 vdu_name=None,
3457 vdu_count_index=None),
3458 vnf_config_primitive, primitive_params)
3459 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3460 vnf_config_primitive, result, result_detail))
3461 # Update operationState = COMPLETED | FAILED
3462 self._update_suboperation_status(
3463 db_nslcmop, op_index, result, result_detail)
3464
3465 if result == "FAILED":
3466 raise LcmException(result_detail)
3467 db_nsr_update["config-status"] = old_config_status
3468 scale_process = None
3469 # PRE-SCALE END
3470
3471 # SCALE RO - BEGIN
3472 # Should this block be skipped if 'RO_nsr_id' == None ?
3473 # if (RO_nsr_id and RO_scaling_info):
3474 if RO_scaling_info:
3475 scale_process = "RO"
3476 # Scale RO reintent check: Check if this sub-operation has been executed before
3477 op_index = self._check_or_add_scale_suboperation(
3478 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3479 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3480 # Skip sub-operation
3481 result = 'COMPLETED'
3482 result_detail = 'Done'
3483 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3484 result, result_detail))
3485 else:
3486 if (op_index == self.SUBOPERATION_STATUS_NEW):
3487 # New sub-operation: Get index of this sub-operation
3488 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3489 self.logger.debug(logging_text + "New sub-operation RO")
3490 else:
3491 # Reintent: Get registered params for this existing sub-operation
3492 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3493 RO_nsr_id = op.get('RO_nsr_id')
3494 RO_scaling_info = op.get('RO_scaling_info')
3495 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3496 vnf_config_primitive))
3497
3498 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3499 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3500 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3501 # wait until ready
3502 RO_nslcmop_id = RO_desc["instance_action_id"]
3503 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3504
3505 RO_task_done = False
3506 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3507 detailed_status_old = None
3508 self.logger.debug(logging_text + step)
3509
3510 deployment_timeout = 1 * 3600 # One hour
3511 while deployment_timeout > 0:
3512 if not RO_task_done:
3513 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3514 extra_item_id=RO_nslcmop_id)
3515
3516 # deploymentStatus
3517 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3518
3519 ns_status, ns_status_info = self.RO.check_action_status(desc)
3520 if ns_status == "ERROR":
3521 raise ROclient.ROClientException(ns_status_info)
3522 elif ns_status == "BUILD":
3523 detailed_status = step + "; {}".format(ns_status_info)
3524 elif ns_status == "ACTIVE":
3525 RO_task_done = True
3526 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3527 self.logger.debug(logging_text + step)
3528 else:
3529 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3530 else:
3531
3532 if ns_status == "ERROR":
3533 raise ROclient.ROClientException(ns_status_info)
3534 elif ns_status == "BUILD":
3535 detailed_status = step + "; {}".format(ns_status_info)
3536 elif ns_status == "ACTIVE":
3537 step = detailed_status = \
3538 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3539 if not vnfr_scaled:
3540 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3541 vnfr_scaled = True
3542 try:
3543 desc = await self.RO.show("ns", RO_nsr_id)
3544
3545 # deploymentStatus
3546 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3547
3548 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3549 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3550 break
3551 except LcmExceptionNoMgmtIP:
3552 pass
3553 else:
3554 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3555 if detailed_status != detailed_status_old:
3556 self._update_suboperation_status(
3557 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3558 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3559 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3560
3561 await asyncio.sleep(5, loop=self.loop)
3562 deployment_timeout -= 5
3563 if deployment_timeout <= 0:
3564 self._update_suboperation_status(
3565 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3566 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3567
3568 # update VDU_SCALING_INFO with the obtained ip_addresses
3569 if vdu_scaling_info["scaling_direction"] == "OUT":
3570 for vdur in reversed(db_vnfr["vdur"]):
3571 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3572 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3573 vdu_scaling_info["vdu"].append({
3574 "name": vdur["name"],
3575 "vdu_id": vdur["vdu-id-ref"],
3576 "interface": []
3577 })
3578 for interface in vdur["interfaces"]:
3579 vdu_scaling_info["vdu"][-1]["interface"].append({
3580 "name": interface["name"],
3581 "ip_address": interface["ip-address"],
3582 "mac_address": interface.get("mac-address"),
3583 })
3584 del vdu_scaling_info["vdu-create"]
3585
3586 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3587 # SCALE RO - END
3588
3589 scale_process = None
3590 if db_nsr_update:
3591 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3592
3593 # POST-SCALE BEGIN
3594 # execute primitive service POST-SCALING
3595 step = "Executing post-scale vnf-config-primitive"
3596 if scaling_descriptor.get("scaling-config-action"):
3597 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3598 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3599 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3600 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3601 step = db_nslcmop_update["detailed-status"] = \
3602 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3603
3604 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3605 if db_vnfr.get("additionalParamsForVnf"):
3606 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3607
3608 # look for primitive
3609 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3610 if config_primitive["name"] == vnf_config_primitive:
3611 break
3612 else:
3613 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3614 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3615 "match any vnf-configuration:config-primitive".format(scaling_group,
3616 config_primitive))
3617 scale_process = "VCA"
3618 db_nsr_update["config-status"] = "configuring post-scaling"
3619 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3620
3621 # Post-scale reintent check: Check if this sub-operation has been executed before
3622 op_index = self._check_or_add_scale_suboperation(
3623 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3624 if op_index == self.SUBOPERATION_STATUS_SKIP:
3625 # Skip sub-operation
3626 result = 'COMPLETED'
3627 result_detail = 'Done'
3628 self.logger.debug(logging_text +
3629 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3630 format(vnf_config_primitive, result, result_detail))
3631 else:
3632 if op_index == self.SUBOPERATION_STATUS_NEW:
3633 # New sub-operation: Get index of this sub-operation
3634 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3635 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3636 format(vnf_config_primitive))
3637 else:
3638 # Reintent: Get registered params for this existing sub-operation
3639 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3640 vnf_index = op.get('member_vnf_index')
3641 vnf_config_primitive = op.get('primitive')
3642 primitive_params = op.get('primitive_params')
3643 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3644 format(vnf_config_primitive))
3645 # Execute the primitive, either with new (first-time) or registered (reintent) args
3646 result, result_detail = await self._ns_execute_primitive(
3647 self._look_for_deployed_vca(nsr_deployed["VCA"],
3648 member_vnf_index=vnf_index,
3649 vdu_id=None,
3650 vdu_name=None,
3651 vdu_count_index=None),
3652 vnf_config_primitive, primitive_params)
3653 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3654 vnf_config_primitive, result, result_detail))
3655 # Update operationState = COMPLETED | FAILED
3656 self._update_suboperation_status(
3657 db_nslcmop, op_index, result, result_detail)
3658
3659 if result == "FAILED":
3660 raise LcmException(result_detail)
3661 db_nsr_update["config-status"] = old_config_status
3662 scale_process = None
3663 # POST-SCALE END
3664
3665 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3666 db_nslcmop_update["statusEnteredTime"] = time()
3667 db_nslcmop_update["detailed-status"] = "done"
3668 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3669 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3670 else old_operational_status
3671 db_nsr_update["config-status"] = old_config_status
3672 return
3673 except (ROclient.ROClientException, DbException, LcmException) as e:
3674 self.logger.error(logging_text + "Exit Exception {}".format(e))
3675 exc = e
3676 except asyncio.CancelledError:
3677 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3678 exc = "Operation was cancelled"
3679 except Exception as e:
3680 exc = traceback.format_exc()
3681 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3682 finally:
3683 self._write_ns_status(
3684 nsr_id=nsr_id,
3685 ns_state=None,
3686 current_operation="IDLE",
3687 current_operation_id=None
3688 )
3689 if exc:
3690 if db_nslcmop:
3691 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3692 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3693 db_nslcmop_update["statusEnteredTime"] = time()
3694 if db_nsr:
3695 db_nsr_update["operational-status"] = old_operational_status
3696 db_nsr_update["config-status"] = old_config_status
3697 db_nsr_update["detailed-status"] = ""
3698 if scale_process:
3699 if "VCA" in scale_process:
3700 db_nsr_update["config-status"] = "failed"
3701 if "RO" in scale_process:
3702 db_nsr_update["operational-status"] = "failed"
3703 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3704 exc)
3705 try:
3706 if db_nslcmop and db_nslcmop_update:
3707 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3708 if db_nsr:
3709 self._write_ns_status(
3710 nsr_id=nsr_id,
3711 ns_state=None,
3712 current_operation="IDLE",
3713 current_operation_id=None,
3714 other_update=db_nsr_update
3715 )
3716
3717 except DbException as e:
3718 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3719 if nslcmop_operation_state:
3720 try:
3721 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3722 "operationState": nslcmop_operation_state},
3723 loop=self.loop)
3724 # if cooldown_time:
3725 # await asyncio.sleep(cooldown_time, loop=self.loop)
3726 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3727 except Exception as e:
3728 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3729 self.logger.debug(logging_text + "Exit")
3730 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")