Feature 8157. Parses new configuration information model
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_juju_conn import K8sJujuConnector
32
33 from osm_common.dbbase import DbException
34 from osm_common.fsbase import FsException
35
36 from n2vc.n2vc_juju_conn import N2VCJujuConnector
37 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
38
39 from osm_lcm.lcm_helm_conn import LCMHelmConn
40
41 from copy import copy, deepcopy
42 from http import HTTPStatus
43 from time import time
44 from uuid import uuid4
45 from functools import partial
46
47 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
48
49
50 class N2VCJujuConnectorLCM(N2VCJujuConnector):
51
52 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
53 progress_timeout: float = None, total_timeout: float = None,
54 artifact_path: str = None, vca_type: str = None) -> (str, dict):
55 # admit two new parameters, artifact_path and vca_type
56 if vca_type == "k8s_proxy_charm":
57 ee_id = await self.n2vc.install_k8s_proxy_charm(
58 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
59 namespace=namespace,
60 artifact_path=artifact_path,
61 db_dict=db_dict)
62 return ee_id, None
63 else:
64 return await super().create_execution_environment(
65 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
66 progress_timeout=progress_timeout, total_timeout=total_timeout)
67
68 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
69 progress_timeout: float = None, total_timeout: float = None,
70 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
71 if vca_type == "k8s_proxy_charm":
72 return
73 return await super().install_configuration_sw(
74 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
75 total_timeout=total_timeout, config=config, num_units=num_units)
76
77
78 class NsLcm(LcmBase):
79 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
80 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
81 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
82 timeout_charm_delete = 10 * 60
83 timeout_primitive = 30 * 60 # timeout for primitive execution
84 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
85
86 SUBOPERATION_STATUS_NOT_FOUND = -1
87 SUBOPERATION_STATUS_NEW = -2
88 SUBOPERATION_STATUS_SKIP = -3
89 task_name_deploy_vca = "Deploying VCA"
90
91 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
92 """
93 Init, Connect to database, filesystem storage, and messaging
94 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
95 :return: None
96 """
97 super().__init__(
98 db=db,
99 msg=msg,
100 fs=fs,
101 logger=logging.getLogger('lcm.ns')
102 )
103
104 self.loop = loop
105 self.lcm_tasks = lcm_tasks
106 self.timeout = config["timeout"]
107 self.ro_config = config["ro_config"]
108 self.ng_ro = config["ro_config"].get("ng")
109 self.vca_config = config["VCA"].copy()
110
111 # create N2VC connector
112 self.n2vc = N2VCJujuConnectorLCM(
113 db=self.db,
114 fs=self.fs,
115 log=self.logger,
116 loop=self.loop,
117 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
118 username=self.vca_config.get('user', None),
119 vca_config=self.vca_config,
120 on_update_db=self._on_update_n2vc_db
121 )
122
123 self.conn_helm_ee = LCMHelmConn(
124 db=self.db,
125 fs=self.fs,
126 log=self.logger,
127 loop=self.loop,
128 url=None,
129 username=None,
130 vca_config=self.vca_config,
131 on_update_db=self._on_update_n2vc_db
132 )
133
134 self.k8sclusterhelm = K8sHelmConnector(
135 kubectl_command=self.vca_config.get("kubectlpath"),
136 helm_command=self.vca_config.get("helmpath"),
137 fs=self.fs,
138 log=self.logger,
139 db=self.db,
140 on_update_db=None,
141 )
142
143 self.k8sclusterjuju = K8sJujuConnector(
144 kubectl_command=self.vca_config.get("kubectlpath"),
145 juju_command=self.vca_config.get("jujupath"),
146 fs=self.fs,
147 log=self.logger,
148 db=self.db,
149 on_update_db=None,
150 )
151
152 self.k8scluster_map = {
153 "helm-chart": self.k8sclusterhelm,
154 "chart": self.k8sclusterhelm,
155 "juju-bundle": self.k8sclusterjuju,
156 "juju": self.k8sclusterjuju,
157 }
158
159 self.vca_map = {
160 "lxc_proxy_charm": self.n2vc,
161 "native_charm": self.n2vc,
162 "k8s_proxy_charm": self.n2vc,
163 "helm": self.conn_helm_ee
164 }
165
166 # create RO client
167 if self.ng_ro:
168 self.RO = NgRoClient(self.loop, **self.ro_config)
169 else:
170 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
171
172 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
173
174 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
175
176 try:
177 # TODO filter RO descriptor fields...
178
179 # write to database
180 db_dict = dict()
181 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
182 db_dict['deploymentStatus'] = ro_descriptor
183 self.update_db_2("nsrs", nsrs_id, db_dict)
184
185 except Exception as e:
186 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
187
188 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
189
190 # remove last dot from path (if exists)
191 if path.endswith('.'):
192 path = path[:-1]
193
194 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
195 # .format(table, filter, path, updated_data))
196
197 try:
198
199 nsr_id = filter.get('_id')
200
201 # read ns record from database
202 nsr = self.db.get_one(table='nsrs', q_filter=filter)
203 current_ns_status = nsr.get('nsState')
204
205 # get vca status for NS
206 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
207
208 # vcaStatus
209 db_dict = dict()
210 db_dict['vcaStatus'] = status_dict
211
212 # update configurationStatus for this VCA
213 try:
214 vca_index = int(path[path.rfind(".")+1:])
215
216 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
217 vca_status = vca_list[vca_index].get('status')
218
219 configuration_status_list = nsr.get('configurationStatus')
220 config_status = configuration_status_list[vca_index].get('status')
221
222 if config_status == 'BROKEN' and vca_status != 'failed':
223 db_dict['configurationStatus'][vca_index] = 'READY'
224 elif config_status != 'BROKEN' and vca_status == 'failed':
225 db_dict['configurationStatus'][vca_index] = 'BROKEN'
226 except Exception as e:
227 # not update configurationStatus
228 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
229
230 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
231 # if nsState = 'DEGRADED' check if all is OK
232 is_degraded = False
233 if current_ns_status in ('READY', 'DEGRADED'):
234 error_description = ''
235 # check machines
236 if status_dict.get('machines'):
237 for machine_id in status_dict.get('machines'):
238 machine = status_dict.get('machines').get(machine_id)
239 # check machine agent-status
240 if machine.get('agent-status'):
241 s = machine.get('agent-status').get('status')
242 if s != 'started':
243 is_degraded = True
244 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
245 # check machine instance status
246 if machine.get('instance-status'):
247 s = machine.get('instance-status').get('status')
248 if s != 'running':
249 is_degraded = True
250 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
251 # check applications
252 if status_dict.get('applications'):
253 for app_id in status_dict.get('applications'):
254 app = status_dict.get('applications').get(app_id)
255 # check application status
256 if app.get('status'):
257 s = app.get('status').get('status')
258 if s != 'active':
259 is_degraded = True
260 error_description += 'application {} status={} ; '.format(app_id, s)
261
262 if error_description:
263 db_dict['errorDescription'] = error_description
264 if current_ns_status == 'READY' and is_degraded:
265 db_dict['nsState'] = 'DEGRADED'
266 if current_ns_status == 'DEGRADED' and not is_degraded:
267 db_dict['nsState'] = 'READY'
268
269 # write to database
270 self.update_db_2("nsrs", nsr_id, db_dict)
271
272 except (asyncio.CancelledError, asyncio.TimeoutError):
273 raise
274 except Exception as e:
275 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
276
277 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
278 """
279 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
280 :param vnfd: input vnfd
281 :param new_id: overrides vnf id if provided
282 :param additionalParams: Instantiation params for VNFs provided
283 :param nsrId: Id of the NSR
284 :return: copy of vnfd
285 """
286 try:
287 vnfd_RO = deepcopy(vnfd)
288 # remove unused by RO configuration, monitoring, scaling and internal keys
289 vnfd_RO.pop("_id", None)
290 vnfd_RO.pop("_admin", None)
291 vnfd_RO.pop("vnf-configuration", None)
292 vnfd_RO.pop("monitoring-param", None)
293 vnfd_RO.pop("scaling-group-descriptor", None)
294 vnfd_RO.pop("kdu", None)
295 vnfd_RO.pop("k8s-cluster", None)
296 if new_id:
297 vnfd_RO["id"] = new_id
298
299 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
300 for vdu in get_iterable(vnfd_RO, "vdu"):
301 cloud_init_file = None
302 if vdu.get("cloud-init-file"):
303 base_folder = vnfd["_admin"]["storage"]
304 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
305 vdu["cloud-init-file"])
306 with self.fs.file_open(cloud_init_file, "r") as ci_file:
307 cloud_init_content = ci_file.read()
308 vdu.pop("cloud-init-file", None)
309 elif vdu.get("cloud-init"):
310 cloud_init_content = vdu["cloud-init"]
311 else:
312 continue
313
314 env = Environment()
315 ast = env.parse(cloud_init_content)
316 mandatory_vars = meta.find_undeclared_variables(ast)
317 if mandatory_vars:
318 for var in mandatory_vars:
319 if not additionalParams or var not in additionalParams.keys():
320 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
321 "file, must be provided in the instantiation parameters inside the "
322 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
323 template = Template(cloud_init_content)
324 cloud_init_content = template.render(additionalParams or {})
325 vdu["cloud-init"] = cloud_init_content
326
327 return vnfd_RO
328 except FsException as e:
329 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
330 format(vnfd["id"], vdu["id"], cloud_init_file, e))
331 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
332 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
333 format(vnfd["id"], vdu["id"], e))
334
335 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
336 """
337 Creates a RO ns descriptor from OSM ns_instantiate params
338 :param ns_params: OSM instantiate params
339 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
340 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
341 :return: The RO ns descriptor
342 """
343 vim_2_RO = {}
344 wim_2_RO = {}
345 # TODO feature 1417: Check that no instantiation is set over PDU
346 # check if PDU forces a concrete vim-network-id and add it
347 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
348
349 def vim_account_2_RO(vim_account):
350 if vim_account in vim_2_RO:
351 return vim_2_RO[vim_account]
352
353 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
354 if db_vim["_admin"]["operationalState"] != "ENABLED":
355 raise LcmException("VIM={} is not available. operationalState={}".format(
356 vim_account, db_vim["_admin"]["operationalState"]))
357 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
358 vim_2_RO[vim_account] = RO_vim_id
359 return RO_vim_id
360
361 def wim_account_2_RO(wim_account):
362 if isinstance(wim_account, str):
363 if wim_account in wim_2_RO:
364 return wim_2_RO[wim_account]
365
366 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
367 if db_wim["_admin"]["operationalState"] != "ENABLED":
368 raise LcmException("WIM={} is not available. operationalState={}".format(
369 wim_account, db_wim["_admin"]["operationalState"]))
370 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
371 wim_2_RO[wim_account] = RO_wim_id
372 return RO_wim_id
373 else:
374 return wim_account
375
376 def ip_profile_2_RO(ip_profile):
377 RO_ip_profile = deepcopy((ip_profile))
378 if "dns-server" in RO_ip_profile:
379 if isinstance(RO_ip_profile["dns-server"], list):
380 RO_ip_profile["dns-address"] = []
381 for ds in RO_ip_profile.pop("dns-server"):
382 RO_ip_profile["dns-address"].append(ds['address'])
383 else:
384 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
385 if RO_ip_profile.get("ip-version") == "ipv4":
386 RO_ip_profile["ip-version"] = "IPv4"
387 if RO_ip_profile.get("ip-version") == "ipv6":
388 RO_ip_profile["ip-version"] = "IPv6"
389 if "dhcp-params" in RO_ip_profile:
390 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
391 return RO_ip_profile
392
393 if not ns_params:
394 return None
395 RO_ns_params = {
396 # "name": ns_params["nsName"],
397 # "description": ns_params.get("nsDescription"),
398 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
399 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
400 # "scenario": ns_params["nsdId"],
401 }
402 # set vim_account of each vnf if different from general vim_account.
403 # Get this information from <vnfr> database content, key vim-account-id
404 # Vim account can be set by placement_engine and it may be different from
405 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
406 for vnf_index, vnfr in db_vnfrs.items():
407 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
408 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
409
410 n2vc_key_list = n2vc_key_list or []
411 for vnfd_ref, vnfd in vnfd_dict.items():
412 vdu_needed_access = []
413 mgmt_cp = None
414 if vnfd.get("vnf-configuration"):
415 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
416 if ssh_required and vnfd.get("mgmt-interface"):
417 if vnfd["mgmt-interface"].get("vdu-id"):
418 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
419 elif vnfd["mgmt-interface"].get("cp"):
420 mgmt_cp = vnfd["mgmt-interface"]["cp"]
421
422 for vdu in vnfd.get("vdu", ()):
423 if vdu.get("vdu-configuration"):
424 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
425 if ssh_required:
426 vdu_needed_access.append(vdu["id"])
427 elif mgmt_cp:
428 for vdu_interface in vdu.get("interface"):
429 if vdu_interface.get("external-connection-point-ref") and \
430 vdu_interface["external-connection-point-ref"] == mgmt_cp:
431 vdu_needed_access.append(vdu["id"])
432 mgmt_cp = None
433 break
434
435 if vdu_needed_access:
436 for vnf_member in nsd.get("constituent-vnfd"):
437 if vnf_member["vnfd-id-ref"] != vnfd_ref:
438 continue
439 for vdu in vdu_needed_access:
440 populate_dict(RO_ns_params,
441 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
442 n2vc_key_list)
443
444 if ns_params.get("vduImage"):
445 RO_ns_params["vduImage"] = ns_params["vduImage"]
446
447 if ns_params.get("ssh_keys"):
448 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
449 for vnf_params in get_iterable(ns_params, "vnf"):
450 for constituent_vnfd in nsd["constituent-vnfd"]:
451 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
452 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
456 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
457
458 for vdu_params in get_iterable(vnf_params, "vdu"):
459 # TODO feature 1417: check that this VDU exist and it is not a PDU
460 if vdu_params.get("volume"):
461 for volume_params in vdu_params["volume"]:
462 if volume_params.get("vim-volume-id"):
463 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
464 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
465 volume_params["vim-volume-id"])
466 if vdu_params.get("interface"):
467 for interface_params in vdu_params["interface"]:
468 if interface_params.get("ip-address"):
469 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
470 vdu_params["id"], "interfaces", interface_params["name"],
471 "ip_address"),
472 interface_params["ip-address"])
473 if interface_params.get("mac-address"):
474 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
475 vdu_params["id"], "interfaces", interface_params["name"],
476 "mac_address"),
477 interface_params["mac-address"])
478 if interface_params.get("floating-ip-required"):
479 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
480 vdu_params["id"], "interfaces", interface_params["name"],
481 "floating-ip"),
482 interface_params["floating-ip-required"])
483
484 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
485 if internal_vld_params.get("vim-network-name"):
486 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
487 internal_vld_params["name"], "vim-network-name"),
488 internal_vld_params["vim-network-name"])
489 if internal_vld_params.get("vim-network-id"):
490 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
491 internal_vld_params["name"], "vim-network-id"),
492 internal_vld_params["vim-network-id"])
493 if internal_vld_params.get("ip-profile"):
494 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
495 internal_vld_params["name"], "ip-profile"),
496 ip_profile_2_RO(internal_vld_params["ip-profile"]))
497 if internal_vld_params.get("provider-network"):
498
499 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
500 internal_vld_params["name"], "provider-network"),
501 internal_vld_params["provider-network"].copy())
502
503 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
504 # look for interface
505 iface_found = False
506 for vdu_descriptor in vnf_descriptor["vdu"]:
507 for vdu_interface in vdu_descriptor["interface"]:
508 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
509 if icp_params.get("ip-address"):
510 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
511 vdu_descriptor["id"], "interfaces",
512 vdu_interface["name"], "ip_address"),
513 icp_params["ip-address"])
514
515 if icp_params.get("mac-address"):
516 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
517 vdu_descriptor["id"], "interfaces",
518 vdu_interface["name"], "mac_address"),
519 icp_params["mac-address"])
520 iface_found = True
521 break
522 if iface_found:
523 break
524 else:
525 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
526 "internal-vld:id-ref={} is not present at vnfd:internal-"
527 "connection-point".format(vnf_params["member-vnf-index"],
528 icp_params["id-ref"]))
529
530 for vld_params in get_iterable(ns_params, "vld"):
531 if "ip-profile" in vld_params:
532 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
533 ip_profile_2_RO(vld_params["ip-profile"]))
534
535 if vld_params.get("provider-network"):
536
537 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
538 vld_params["provider-network"].copy())
539
540 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
541 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
542 wim_account_2_RO(vld_params["wimAccountId"])),
543 if vld_params.get("vim-network-name"):
544 RO_vld_sites = []
545 if isinstance(vld_params["vim-network-name"], dict):
546 for vim_account, vim_net in vld_params["vim-network-name"].items():
547 RO_vld_sites.append({
548 "netmap-use": vim_net,
549 "datacenter": vim_account_2_RO(vim_account)
550 })
551 else: # isinstance str
552 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
553 if RO_vld_sites:
554 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
555
556 if vld_params.get("vim-network-id"):
557 RO_vld_sites = []
558 if isinstance(vld_params["vim-network-id"], dict):
559 for vim_account, vim_net in vld_params["vim-network-id"].items():
560 RO_vld_sites.append({
561 "netmap-use": vim_net,
562 "datacenter": vim_account_2_RO(vim_account)
563 })
564 else: # isinstance str
565 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
566 if RO_vld_sites:
567 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
568 if vld_params.get("ns-net"):
569 if isinstance(vld_params["ns-net"], dict):
570 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
571 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
572 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
573 if "vnfd-connection-point-ref" in vld_params:
574 for cp_params in vld_params["vnfd-connection-point-ref"]:
575 # look for interface
576 for constituent_vnfd in nsd["constituent-vnfd"]:
577 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
578 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
579 break
580 else:
581 raise LcmException(
582 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
583 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
584 match_cp = False
585 for vdu_descriptor in vnf_descriptor["vdu"]:
586 for interface_descriptor in vdu_descriptor["interface"]:
587 if interface_descriptor.get("external-connection-point-ref") == \
588 cp_params["vnfd-connection-point-ref"]:
589 match_cp = True
590 break
591 if match_cp:
592 break
593 else:
594 raise LcmException(
595 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
596 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
597 cp_params["member-vnf-index-ref"],
598 cp_params["vnfd-connection-point-ref"],
599 vnf_descriptor["id"]))
600 if cp_params.get("ip-address"):
601 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
602 vdu_descriptor["id"], "interfaces",
603 interface_descriptor["name"], "ip_address"),
604 cp_params["ip-address"])
605 if cp_params.get("mac-address"):
606 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
607 vdu_descriptor["id"], "interfaces",
608 interface_descriptor["name"], "mac_address"),
609 cp_params["mac-address"])
610 return RO_ns_params
611
612 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
613 # make a copy to do not change
614 vdu_create = copy(vdu_create)
615 vdu_delete = copy(vdu_delete)
616
617 vdurs = db_vnfr.get("vdur")
618 if vdurs is None:
619 vdurs = []
620 vdu_index = len(vdurs)
621 while vdu_index:
622 vdu_index -= 1
623 vdur = vdurs[vdu_index]
624 if vdur.get("pdu-type"):
625 continue
626 vdu_id_ref = vdur["vdu-id-ref"]
627 if vdu_create and vdu_create.get(vdu_id_ref):
628 for index in range(0, vdu_create[vdu_id_ref]):
629 vdur = deepcopy(vdur)
630 vdur["_id"] = str(uuid4())
631 vdur["count-index"] += 1
632 vdurs.insert(vdu_index+1+index, vdur)
633 del vdu_create[vdu_id_ref]
634 if vdu_delete and vdu_delete.get(vdu_id_ref):
635 del vdurs[vdu_index]
636 vdu_delete[vdu_id_ref] -= 1
637 if not vdu_delete[vdu_id_ref]:
638 del vdu_delete[vdu_id_ref]
639 # check all operations are done
640 if vdu_create or vdu_delete:
641 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
642 vdu_create))
643 if vdu_delete:
644 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
645 vdu_delete))
646
647 vnfr_update = {"vdur": vdurs}
648 db_vnfr["vdur"] = vdurs
649 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
650
651 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
652 """
653 Updates database nsr with the RO info for the created vld
654 :param ns_update_nsr: dictionary to be filled with the updated info
655 :param db_nsr: content of db_nsr. This is also modified
656 :param nsr_desc_RO: nsr descriptor from RO
657 :return: Nothing, LcmException is raised on errors
658 """
659
660 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
661 for net_RO in get_iterable(nsr_desc_RO, "nets"):
662 if vld["id"] != net_RO.get("ns_net_osm_id"):
663 continue
664 vld["vim-id"] = net_RO.get("vim_net_id")
665 vld["name"] = net_RO.get("vim_name")
666 vld["status"] = net_RO.get("status")
667 vld["status-detailed"] = net_RO.get("error_msg")
668 ns_update_nsr["vld.{}".format(vld_index)] = vld
669 break
670 else:
671 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
672
673 def set_vnfr_at_error(self, db_vnfrs, error_text):
674 try:
675 for db_vnfr in db_vnfrs.values():
676 vnfr_update = {"status": "ERROR"}
677 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
678 if "status" not in vdur:
679 vdur["status"] = "ERROR"
680 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
681 if error_text:
682 vdur["status-detailed"] = str(error_text)
683 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
684 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
685 except DbException as e:
686 self.logger.error("Cannot update vnf. {}".format(e))
687
688 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
689 """
690 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
691 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
692 :param nsr_desc_RO: nsr descriptor from RO
693 :return: Nothing, LcmException is raised on errors
694 """
695 for vnf_index, db_vnfr in db_vnfrs.items():
696 for vnf_RO in nsr_desc_RO["vnfs"]:
697 if vnf_RO["member_vnf_index"] != vnf_index:
698 continue
699 vnfr_update = {}
700 if vnf_RO.get("ip_address"):
701 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
702 elif not db_vnfr.get("ip-address"):
703 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
704 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
705
706 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
707 vdur_RO_count_index = 0
708 if vdur.get("pdu-type"):
709 continue
710 for vdur_RO in get_iterable(vnf_RO, "vms"):
711 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
712 continue
713 if vdur["count-index"] != vdur_RO_count_index:
714 vdur_RO_count_index += 1
715 continue
716 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
717 if vdur_RO.get("ip_address"):
718 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
719 else:
720 vdur["ip-address"] = None
721 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
722 vdur["name"] = vdur_RO.get("vim_name")
723 vdur["status"] = vdur_RO.get("status")
724 vdur["status-detailed"] = vdur_RO.get("error_msg")
725 for ifacer in get_iterable(vdur, "interfaces"):
726 for interface_RO in get_iterable(vdur_RO, "interfaces"):
727 if ifacer["name"] == interface_RO.get("internal_name"):
728 ifacer["ip-address"] = interface_RO.get("ip_address")
729 ifacer["mac-address"] = interface_RO.get("mac_address")
730 break
731 else:
732 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
733 "from VIM info"
734 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
735 vnfr_update["vdur.{}".format(vdu_index)] = vdur
736 break
737 else:
738 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
739 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
740
741 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
742 for net_RO in get_iterable(nsr_desc_RO, "nets"):
743 if vld["id"] != net_RO.get("vnf_net_osm_id"):
744 continue
745 vld["vim-id"] = net_RO.get("vim_net_id")
746 vld["name"] = net_RO.get("vim_name")
747 vld["status"] = net_RO.get("status")
748 vld["status-detailed"] = net_RO.get("error_msg")
749 vnfr_update["vld.{}".format(vld_index)] = vld
750 break
751 else:
752 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
753 vnf_index, vld["id"]))
754
755 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
756 break
757
758 else:
759 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
760
761 def _get_ns_config_info(self, nsr_id):
762 """
763 Generates a mapping between vnf,vdu elements and the N2VC id
764 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
765 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
766 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
767 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
768 """
769 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
770 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
771 mapping = {}
772 ns_config_info = {"osm-config-mapping": mapping}
773 for vca in vca_deployed_list:
774 if not vca["member-vnf-index"]:
775 continue
776 if not vca["vdu_id"]:
777 mapping[vca["member-vnf-index"]] = vca["application"]
778 else:
779 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
780 vca["application"]
781 return ns_config_info
782
783 @staticmethod
784 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
785 """
786 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
787 primitives as verify-ssh-credentials, or config when needed
788 :param desc_primitive_list: information of the descriptor
789 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
790 this element contains a ssh public key
791 :return: The modified list. Can ba an empty list, but always a list
792 """
793 if desc_primitive_list:
794 primitive_list = desc_primitive_list.copy()
795 else:
796 primitive_list = []
797 # look for primitive config, and get the position. None if not present
798 config_position = None
799 for index, primitive in enumerate(primitive_list):
800 if primitive["name"] == "config":
801 config_position = index
802 break
803
804 # for NS, add always a config primitive if not present (bug 874)
805 if not vca_deployed["member-vnf-index"] and config_position is None:
806 primitive_list.insert(0, {"name": "config", "parameter": []})
807 config_position = 0
808 # for VNF/VDU add verify-ssh-credentials after config
809 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
810 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
811 return primitive_list
812
813 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
814 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
815 nslcmop_id = db_nslcmop["_id"]
816 target = {
817 "name": db_nsr["name"],
818 "ns": {"vld": []},
819 "vnf": [],
820 "image": deepcopy(db_nsr["image"]),
821 "flavor": deepcopy(db_nsr["flavor"]),
822 "action_id": nslcmop_id,
823 }
824 for image in target["image"]:
825 image["vim_info"] = []
826 for flavor in target["flavor"]:
827 flavor["vim_info"] = []
828
829 ns_params = db_nslcmop.get("operationParams")
830 ssh_keys = []
831 if ns_params.get("ssh_keys"):
832 ssh_keys += ns_params.get("ssh_keys")
833 if n2vc_key_list:
834 ssh_keys += n2vc_key_list
835
836 cp2target = {}
837 for vld_index, vld in enumerate(nsd.get("vld")):
838 target_vld = {"id": vld["id"],
839 "name": vld["name"],
840 "mgmt-network": vld.get("mgmt-network", False),
841 "type": vld.get("type"),
842 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
843 "vim_account_id": ns_params["vimAccountId"]}],
844 }
845 for cp in vld["vnfd-connection-point-ref"]:
846 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
847 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
848 target["ns"]["vld"].append(target_vld)
849 for vnfr in db_vnfrs.values():
850 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
851 target_vnf = deepcopy(vnfr)
852 for vld in target_vnf.get("vld", ()):
853 # check if connected to a ns.vld
854 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
855 cp.get("internal-vld-ref") == vld["id"]), None)
856 if vnf_cp:
857 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
858 if cp2target.get(ns_cp):
859 vld["target"] = cp2target[ns_cp]
860 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
861 "vim_account_id": vnfr["vim-account-id"]}]
862
863 for vdur in target_vnf.get("vdur", ()):
864 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
865 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
866 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
867
868 if ssh_keys:
869 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
870 vdur["ssh-keys"] = ssh_keys
871 vdur["ssh-access-required"] = True
872 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
873 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
874 vdur["ssh-keys"] = ssh_keys
875 vdur["ssh-access-required"] = True
876
877 # cloud-init
878 if vdud.get("cloud-init-file"):
879 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
880 elif vdud.get("cloud-init"):
881 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
882
883 # flavor
884 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
885 if not next((vi for vi in ns_flavor["vim_info"] if
886 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
887 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
888 # image
889 ns_image = target["image"][int(vdur["ns-image-id"])]
890 if not next((vi for vi in ns_image["vim_info"] if
891 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
892 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
893
894 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
895 target["vnf"].append(target_vnf)
896
897 desc = await self.RO.deploy(nsr_id, target)
898 action_id = desc["action_id"]
899 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
900
901 # Updating NSR
902 db_nsr_update = {
903 "_admin.deployed.RO.operational-status": "running",
904 "detailed-status": " ".join(stage)
905 }
906 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
907 self.update_db_2("nsrs", nsr_id, db_nsr_update)
908 self._write_op_status(nslcmop_id, stage)
909 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
910 return
911
912 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
913 detailed_status_old = None
914 db_nsr_update = {}
915 while time() <= start_time + timeout:
916 desc_status = await self.RO.status(nsr_id, action_id)
917 if desc_status["status"] == "FAILED":
918 raise NgRoException(desc_status["details"])
919 elif desc_status["status"] == "BUILD":
920 stage[2] = "VIM: ({})".format(desc_status["details"])
921 elif desc_status["status"] == "DONE":
922 stage[2] = "Deployed at VIM"
923 break
924 else:
925 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
926 if stage[2] != detailed_status_old:
927 detailed_status_old = stage[2]
928 db_nsr_update["detailed-status"] = " ".join(stage)
929 self.update_db_2("nsrs", nsr_id, db_nsr_update)
930 self._write_op_status(nslcmop_id, stage)
931 await asyncio.sleep(5, loop=self.loop)
932 else: # timeout_ns_deploy
933 raise NgRoException("Timeout waiting ns to deploy")
934
935 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
936 db_nsr_update = {}
937 failed_detail = []
938 action_id = None
939 start_deploy = time()
940 try:
941 target = {
942 "ns": {"vld": []},
943 "vnf": [],
944 "image": [],
945 "flavor": [],
946 }
947 desc = await self.RO.deploy(nsr_id, target)
948 action_id = desc["action_id"]
949 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
950 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
951 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
952
953 # wait until done
954 delete_timeout = 20 * 60 # 20 minutes
955 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
956
957 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
958 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
959 # delete all nsr
960 await self.RO.delete(nsr_id)
961 except Exception as e:
962 if isinstance(e, NgRoException) and e.http_code == 404: # not found
963 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
964 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
965 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
966 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
967 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
968 failed_detail.append("delete conflict: {}".format(e))
969 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
970 else:
971 failed_detail.append("delete error: {}".format(e))
972 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
973
974 if failed_detail:
975 stage[2] = "Error deleting from VIM"
976 else:
977 stage[2] = "Deleted from VIM"
978 db_nsr_update["detailed-status"] = " ".join(stage)
979 self.update_db_2("nsrs", nsr_id, db_nsr_update)
980 self._write_op_status(nslcmop_id, stage)
981
982 if failed_detail:
983 raise LcmException("; ".join(failed_detail))
984 return
985
986 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
987 n2vc_key_list, stage):
988 """
989 Instantiate at RO
990 :param logging_text: preffix text to use at logging
991 :param nsr_id: nsr identity
992 :param nsd: database content of ns descriptor
993 :param db_nsr: database content of ns record
994 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
995 :param db_vnfrs:
996 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
997 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
998 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
999 :return: None or exception
1000 """
1001 try:
1002 db_nsr_update = {}
1003 RO_descriptor_number = 0 # number of descriptors created at RO
1004 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1005 nslcmop_id = db_nslcmop["_id"]
1006 start_deploy = time()
1007 ns_params = db_nslcmop.get("operationParams")
1008 if ns_params and ns_params.get("timeout_ns_deploy"):
1009 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1010 else:
1011 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1012
1013 # Check for and optionally request placement optimization. Database will be updated if placement activated
1014 stage[2] = "Waiting for Placement."
1015 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1016 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1017 for vnfr in db_vnfrs.values():
1018 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1019 break
1020 else:
1021 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1022
1023 if self.ng_ro:
1024 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1025 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1026 timeout_ns_deploy)
1027 # deploy RO
1028 # get vnfds, instantiate at RO
1029 for c_vnf in nsd.get("constituent-vnfd", ()):
1030 member_vnf_index = c_vnf["member-vnf-index"]
1031 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1032 vnfd_ref = vnfd["id"]
1033
1034 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1035 db_nsr_update["detailed-status"] = " ".join(stage)
1036 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1037 self._write_op_status(nslcmop_id, stage)
1038
1039 # self.logger.debug(logging_text + stage[2])
1040 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1041 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1042 RO_descriptor_number += 1
1043
1044 # look position at deployed.RO.vnfd if not present it will be appended at the end
1045 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1046 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1047 break
1048 else:
1049 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1050 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1051
1052 # look if present
1053 RO_update = {"member-vnf-index": member_vnf_index}
1054 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1055 if vnfd_list:
1056 RO_update["id"] = vnfd_list[0]["uuid"]
1057 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1058 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1059 else:
1060 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1061 get("additionalParamsForVnf"), nsr_id)
1062 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1063 RO_update["id"] = desc["uuid"]
1064 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1065 vnfd_ref, member_vnf_index, desc["uuid"]))
1066 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1067 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1068
1069 # create nsd at RO
1070 nsd_ref = nsd["id"]
1071
1072 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1073 db_nsr_update["detailed-status"] = " ".join(stage)
1074 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1075 self._write_op_status(nslcmop_id, stage)
1076
1077 # self.logger.debug(logging_text + stage[2])
1078 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1079 RO_descriptor_number += 1
1080 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1081 if nsd_list:
1082 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1083 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1084 nsd_ref, RO_nsd_uuid))
1085 else:
1086 nsd_RO = deepcopy(nsd)
1087 nsd_RO["id"] = RO_osm_nsd_id
1088 nsd_RO.pop("_id", None)
1089 nsd_RO.pop("_admin", None)
1090 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1091 member_vnf_index = c_vnf["member-vnf-index"]
1092 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1093 for c_vld in nsd_RO.get("vld", ()):
1094 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1095 member_vnf_index = cp["member-vnf-index-ref"]
1096 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1097
1098 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1099 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1100 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1101 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1102 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1103
1104 # Crate ns at RO
1105 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1106 db_nsr_update["detailed-status"] = " ".join(stage)
1107 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1108 self._write_op_status(nslcmop_id, stage)
1109
1110 # if present use it unless in error status
1111 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1112 if RO_nsr_id:
1113 try:
1114 stage[2] = "Looking for existing ns at RO"
1115 db_nsr_update["detailed-status"] = " ".join(stage)
1116 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1117 self._write_op_status(nslcmop_id, stage)
1118 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1119 desc = await self.RO.show("ns", RO_nsr_id)
1120
1121 except ROclient.ROClientException as e:
1122 if e.http_code != HTTPStatus.NOT_FOUND:
1123 raise
1124 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1125 if RO_nsr_id:
1126 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1127 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1128 if ns_status == "ERROR":
1129 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1130 self.logger.debug(logging_text + stage[2])
1131 await self.RO.delete("ns", RO_nsr_id)
1132 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1133 if not RO_nsr_id:
1134 stage[2] = "Checking dependencies"
1135 db_nsr_update["detailed-status"] = " ".join(stage)
1136 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1137 self._write_op_status(nslcmop_id, stage)
1138 # self.logger.debug(logging_text + stage[2])
1139
1140 # check if VIM is creating and wait look if previous tasks in process
1141 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1142 if task_dependency:
1143 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1144 self.logger.debug(logging_text + stage[2])
1145 await asyncio.wait(task_dependency, timeout=3600)
1146 if ns_params.get("vnf"):
1147 for vnf in ns_params["vnf"]:
1148 if "vimAccountId" in vnf:
1149 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1150 vnf["vimAccountId"])
1151 if task_dependency:
1152 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1153 self.logger.debug(logging_text + stage[2])
1154 await asyncio.wait(task_dependency, timeout=3600)
1155
1156 stage[2] = "Checking instantiation parameters."
1157 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1158 stage[2] = "Deploying ns at VIM."
1159 db_nsr_update["detailed-status"] = " ".join(stage)
1160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1161 self._write_op_status(nslcmop_id, stage)
1162
1163 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1164 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1165 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1166 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1167 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1168
1169 # wait until NS is ready
1170 stage[2] = "Waiting VIM to deploy ns."
1171 db_nsr_update["detailed-status"] = " ".join(stage)
1172 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1173 self._write_op_status(nslcmop_id, stage)
1174 detailed_status_old = None
1175 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1176
1177 old_desc = None
1178 while time() <= start_deploy + timeout_ns_deploy:
1179 desc = await self.RO.show("ns", RO_nsr_id)
1180
1181 # deploymentStatus
1182 if desc != old_desc:
1183 # desc has changed => update db
1184 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1185 old_desc = desc
1186
1187 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1188 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1189 if ns_status == "ERROR":
1190 raise ROclient.ROClientException(ns_status_info)
1191 elif ns_status == "BUILD":
1192 stage[2] = "VIM: ({})".format(ns_status_info)
1193 elif ns_status == "ACTIVE":
1194 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1195 try:
1196 self.ns_update_vnfr(db_vnfrs, desc)
1197 break
1198 except LcmExceptionNoMgmtIP:
1199 pass
1200 else:
1201 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1202 if stage[2] != detailed_status_old:
1203 detailed_status_old = stage[2]
1204 db_nsr_update["detailed-status"] = " ".join(stage)
1205 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1206 self._write_op_status(nslcmop_id, stage)
1207 await asyncio.sleep(5, loop=self.loop)
1208 else: # timeout_ns_deploy
1209 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1210
1211 # Updating NSR
1212 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1213
1214 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1215 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1216 stage[2] = "Deployed at VIM"
1217 db_nsr_update["detailed-status"] = " ".join(stage)
1218 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1219 self._write_op_status(nslcmop_id, stage)
1220 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1221 # self.logger.debug(logging_text + "Deployed at VIM")
1222 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1223 stage[2] = "ERROR deploying at VIM"
1224 self.set_vnfr_at_error(db_vnfrs, str(e))
1225 raise
1226
1227 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1228 """
1229 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1230 :param logging_text: prefix use for logging
1231 :param nsr_id:
1232 :param vnfr_id:
1233 :param vdu_id:
1234 :param vdu_index:
1235 :param pub_key: public ssh key to inject, None to skip
1236 :param user: user to apply the public ssh key
1237 :return: IP address
1238 """
1239
1240 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1241 ro_nsr_id = None
1242 ip_address = None
1243 nb_tries = 0
1244 target_vdu_id = None
1245 ro_retries = 0
1246
1247 while True:
1248
1249 ro_retries += 1
1250 if ro_retries >= 360: # 1 hour
1251 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1252
1253 await asyncio.sleep(10, loop=self.loop)
1254
1255 # get ip address
1256 if not target_vdu_id:
1257 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1258
1259 if not vdu_id: # for the VNF case
1260 if db_vnfr.get("status") == "ERROR":
1261 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1262 ip_address = db_vnfr.get("ip-address")
1263 if not ip_address:
1264 continue
1265 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1266 else: # VDU case
1267 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1268 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1269
1270 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1271 vdur = db_vnfr["vdur"][0]
1272 if not vdur:
1273 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1274 vdu_index))
1275
1276 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1277 ip_address = vdur.get("ip-address")
1278 if not ip_address:
1279 continue
1280 target_vdu_id = vdur["vdu-id-ref"]
1281 elif vdur.get("status") == "ERROR":
1282 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1283
1284 if not target_vdu_id:
1285 continue
1286
1287 # inject public key into machine
1288 if pub_key and user:
1289 # wait until NS is deployed at RO
1290 if not ro_nsr_id:
1291 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1292 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1293 if not ro_nsr_id:
1294 continue
1295
1296 # self.logger.debug(logging_text + "Inserting RO key")
1297 if vdur.get("pdu-type"):
1298 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1299 return ip_address
1300 try:
1301 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1302 if self.ng_ro:
1303 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1304 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1305 }
1306 await self.RO.deploy(nsr_id, target)
1307 else:
1308 result_dict = await self.RO.create_action(
1309 item="ns",
1310 item_id_name=ro_nsr_id,
1311 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1312 )
1313 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1314 if not result_dict or not isinstance(result_dict, dict):
1315 raise LcmException("Unknown response from RO when injecting key")
1316 for result in result_dict.values():
1317 if result.get("vim_result") == 200:
1318 break
1319 else:
1320 raise ROclient.ROClientException("error injecting key: {}".format(
1321 result.get("description")))
1322 break
1323 except NgRoException as e:
1324 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1325 except ROclient.ROClientException as e:
1326 if not nb_tries:
1327 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1328 format(e, 20*10))
1329 nb_tries += 1
1330 if nb_tries >= 20:
1331 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1332 else:
1333 break
1334
1335 return ip_address
1336
1337 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1338 """
1339 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1340 """
1341 my_vca = vca_deployed_list[vca_index]
1342 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1343 # vdu or kdu: no dependencies
1344 return
1345 timeout = 300
1346 while timeout >= 0:
1347 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1348 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1349 configuration_status_list = db_nsr["configurationStatus"]
1350 for index, vca_deployed in enumerate(configuration_status_list):
1351 if index == vca_index:
1352 # myself
1353 continue
1354 if not my_vca.get("member-vnf-index") or \
1355 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1356 internal_status = configuration_status_list[index].get("status")
1357 if internal_status == 'READY':
1358 continue
1359 elif internal_status == 'BROKEN':
1360 raise LcmException("Configuration aborted because dependent charm/s has failed")
1361 else:
1362 break
1363 else:
1364 # no dependencies, return
1365 return
1366 await asyncio.sleep(10)
1367 timeout -= 1
1368
1369 raise LcmException("Configuration aborted because dependent charm/s timeout")
1370
1371 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1372 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
1373 nsr_id = db_nsr["_id"]
1374 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1375 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1376 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1377 db_dict = {
1378 'collection': 'nsrs',
1379 'filter': {'_id': nsr_id},
1380 'path': db_update_entry
1381 }
1382 step = ""
1383 try:
1384
1385 element_type = 'NS'
1386 element_under_configuration = nsr_id
1387
1388 vnfr_id = None
1389 if db_vnfr:
1390 vnfr_id = db_vnfr["_id"]
1391
1392 namespace = "{nsi}.{ns}".format(
1393 nsi=nsi_id if nsi_id else "",
1394 ns=nsr_id)
1395
1396 if vnfr_id:
1397 element_type = 'VNF'
1398 element_under_configuration = vnfr_id
1399 namespace += ".{}".format(vnfr_id)
1400 if vdu_id:
1401 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1402 element_type = 'VDU'
1403 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1404 elif kdu_name:
1405 namespace += ".{}".format(kdu_name)
1406 element_type = 'KDU'
1407 element_under_configuration = kdu_name
1408
1409 # Get artifact path
1410 artifact_path = "{}/{}/{}/{}".format(
1411 base_folder["folder"],
1412 base_folder["pkg-dir"],
1413 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1414 vca_name
1415 )
1416
1417 # n2vc_redesign STEP 3.1
1418
1419 # find old ee_id if exists
1420 ee_id = vca_deployed.get("ee_id")
1421
1422 # create or register execution environment in VCA
1423 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1424
1425 self._write_configuration_status(
1426 nsr_id=nsr_id,
1427 vca_index=vca_index,
1428 status='CREATING',
1429 element_under_configuration=element_under_configuration,
1430 element_type=element_type
1431 )
1432
1433 step = "create execution environment"
1434 self.logger.debug(logging_text + step)
1435 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1436 namespace=namespace,
1437 reuse_ee_id=ee_id,
1438 db_dict=db_dict,
1439 artifact_path=artifact_path,
1440 vca_type=vca_type)
1441
1442 elif vca_type == "native_charm":
1443 step = "Waiting to VM being up and getting IP address"
1444 self.logger.debug(logging_text + step)
1445 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1446 user=None, pub_key=None)
1447 credentials = {"hostname": rw_mgmt_ip}
1448 # get username
1449 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1450 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1451 # merged. Meanwhile let's get username from initial-config-primitive
1452 if not username and config_descriptor.get("initial-config-primitive"):
1453 for config_primitive in config_descriptor["initial-config-primitive"]:
1454 for param in config_primitive.get("parameter", ()):
1455 if param["name"] == "ssh-username":
1456 username = param["value"]
1457 break
1458 if not username:
1459 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1460 "'config-access.ssh-access.default-user'")
1461 credentials["username"] = username
1462 # n2vc_redesign STEP 3.2
1463
1464 self._write_configuration_status(
1465 nsr_id=nsr_id,
1466 vca_index=vca_index,
1467 status='REGISTERING',
1468 element_under_configuration=element_under_configuration,
1469 element_type=element_type
1470 )
1471
1472 step = "register execution environment {}".format(credentials)
1473 self.logger.debug(logging_text + step)
1474 ee_id = await self.vca_map[vca_type].register_execution_environment(
1475 credentials=credentials, namespace=namespace, db_dict=db_dict)
1476
1477 # for compatibility with MON/POL modules, the need model and application name at database
1478 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1479 ee_id_parts = ee_id.split('.')
1480 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1481 if len(ee_id_parts) >= 2:
1482 model_name = ee_id_parts[0]
1483 application_name = ee_id_parts[1]
1484 db_nsr_update[db_update_entry + "model"] = model_name
1485 db_nsr_update[db_update_entry + "application"] = application_name
1486
1487 # n2vc_redesign STEP 3.3
1488 step = "Install configuration Software"
1489
1490 self._write_configuration_status(
1491 nsr_id=nsr_id,
1492 vca_index=vca_index,
1493 status='INSTALLING SW',
1494 element_under_configuration=element_under_configuration,
1495 element_type=element_type,
1496 other_update=db_nsr_update
1497 )
1498
1499 # TODO check if already done
1500 self.logger.debug(logging_text + step)
1501 config = None
1502 if vca_type == "native_charm":
1503 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1504 if initial_config_primitive_list:
1505 for primitive in initial_config_primitive_list:
1506 if primitive["name"] == "config":
1507 config = self._map_primitive_params(
1508 primitive,
1509 {},
1510 deploy_params
1511 )
1512 break
1513 num_units = 1
1514 if vca_type == "lxc_proxy_charm":
1515 if element_type == "NS":
1516 num_units = db_nsr.get("config-units") or 1
1517 elif element_type == "VNF":
1518 num_units = db_vnfr.get("config-units") or 1
1519 elif element_type == "VDU":
1520 for v in db_vnfr["vdur"]:
1521 if vdu_id == v["vdu-id-ref"]:
1522 num_units = v.get("config-units") or 1
1523 break
1524
1525 await self.vca_map[vca_type].install_configuration_sw(
1526 ee_id=ee_id,
1527 artifact_path=artifact_path,
1528 db_dict=db_dict,
1529 config=config,
1530 num_units=num_units,
1531 vca_type=vca_type
1532 )
1533
1534 # write in db flag of configuration_sw already installed
1535 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1536
1537 # add relations for this VCA (wait for other peers related with this VCA)
1538 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1539 vca_index=vca_index, vca_type=vca_type)
1540
1541 # if SSH access is required, then get execution environment SSH public
1542 if vca_type in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP
1543 pub_key = None
1544 user = None
1545 # self.logger.debug("get ssh key block")
1546 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1547 # self.logger.debug("ssh key needed")
1548 # Needed to inject a ssh key
1549 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1550 step = "Install configuration Software, getting public ssh key"
1551 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1552
1553 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1554 else:
1555 # self.logger.debug("no need to get ssh key")
1556 step = "Waiting to VM being up and getting IP address"
1557 self.logger.debug(logging_text + step)
1558
1559 # n2vc_redesign STEP 5.1
1560 # wait for RO (ip-address) Insert pub_key into VM
1561 if vnfr_id:
1562 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1563 user=user, pub_key=pub_key)
1564 else:
1565 rw_mgmt_ip = None # This is for a NS configuration
1566
1567 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1568
1569 # store rw_mgmt_ip in deploy params for later replacement
1570 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1571
1572 # n2vc_redesign STEP 6 Execute initial config primitive
1573 step = 'execute initial config primitive'
1574 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1575
1576 # sort initial config primitives by 'seq'
1577 if initial_config_primitive_list:
1578 try:
1579 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1580 except Exception as e:
1581 self.logger.error(logging_text + step + ": " + str(e))
1582 else:
1583 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1584
1585 # add config if not present for NS charm
1586 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1587 vca_deployed)
1588
1589 # wait for dependent primitives execution (NS -> VNF -> VDU)
1590 if initial_config_primitive_list:
1591 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1592
1593 # stage, in function of element type: vdu, kdu, vnf or ns
1594 my_vca = vca_deployed_list[vca_index]
1595 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1596 # VDU or KDU
1597 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1598 elif my_vca.get("member-vnf-index"):
1599 # VNF
1600 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1601 else:
1602 # NS
1603 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1604
1605 self._write_configuration_status(
1606 nsr_id=nsr_id,
1607 vca_index=vca_index,
1608 status='EXECUTING PRIMITIVE'
1609 )
1610
1611 self._write_op_status(
1612 op_id=nslcmop_id,
1613 stage=stage
1614 )
1615
1616 check_if_terminated_needed = True
1617 for initial_config_primitive in initial_config_primitive_list:
1618 # adding information on the vca_deployed if it is a NS execution environment
1619 if not vca_deployed["member-vnf-index"]:
1620 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1621 # TODO check if already done
1622 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1623
1624 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1625 self.logger.debug(logging_text + step)
1626 await self.vca_map[vca_type].exec_primitive(
1627 ee_id=ee_id,
1628 primitive_name=initial_config_primitive["name"],
1629 params_dict=primitive_params_,
1630 db_dict=db_dict
1631 )
1632 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1633 if check_if_terminated_needed:
1634 if config_descriptor.get('terminate-config-primitive'):
1635 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1636 check_if_terminated_needed = False
1637
1638 # TODO register in database that primitive is done
1639
1640 step = "instantiated at VCA"
1641 self.logger.debug(logging_text + step)
1642
1643 self._write_configuration_status(
1644 nsr_id=nsr_id,
1645 vca_index=vca_index,
1646 status='READY'
1647 )
1648
1649 except Exception as e: # TODO not use Exception but N2VC exception
1650 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1651 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1652 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1653 self._write_configuration_status(
1654 nsr_id=nsr_id,
1655 vca_index=vca_index,
1656 status='BROKEN'
1657 )
1658 raise LcmException("{} {}".format(step, e)) from e
1659
1660 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1661 error_description: str = None, error_detail: str = None, other_update: dict = None):
1662 """
1663 Update db_nsr fields.
1664 :param nsr_id:
1665 :param ns_state:
1666 :param current_operation:
1667 :param current_operation_id:
1668 :param error_description:
1669 :param error_detail:
1670 :param other_update: Other required changes at database if provided, will be cleared
1671 :return:
1672 """
1673 try:
1674 db_dict = other_update or {}
1675 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1676 db_dict["_admin.current-operation"] = current_operation_id
1677 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1678 db_dict["currentOperation"] = current_operation
1679 db_dict["currentOperationID"] = current_operation_id
1680 db_dict["errorDescription"] = error_description
1681 db_dict["errorDetail"] = error_detail
1682
1683 if ns_state:
1684 db_dict["nsState"] = ns_state
1685 self.update_db_2("nsrs", nsr_id, db_dict)
1686 except DbException as e:
1687 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1688
1689 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1690 operation_state: str = None, other_update: dict = None):
1691 try:
1692 db_dict = other_update or {}
1693 db_dict['queuePosition'] = queuePosition
1694 if isinstance(stage, list):
1695 db_dict['stage'] = stage[0]
1696 db_dict['detailed-status'] = " ".join(stage)
1697 elif stage is not None:
1698 db_dict['stage'] = str(stage)
1699
1700 if error_message is not None:
1701 db_dict['errorMessage'] = error_message
1702 if operation_state is not None:
1703 db_dict['operationState'] = operation_state
1704 db_dict["statusEnteredTime"] = time()
1705 self.update_db_2("nslcmops", op_id, db_dict)
1706 except DbException as e:
1707 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1708
1709 def _write_all_config_status(self, db_nsr: dict, status: str):
1710 try:
1711 nsr_id = db_nsr["_id"]
1712 # configurationStatus
1713 config_status = db_nsr.get('configurationStatus')
1714 if config_status:
1715 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1716 enumerate(config_status) if v}
1717 # update status
1718 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1719
1720 except DbException as e:
1721 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1722
1723 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1724 element_under_configuration: str = None, element_type: str = None,
1725 other_update: dict = None):
1726
1727 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1728 # .format(vca_index, status))
1729
1730 try:
1731 db_path = 'configurationStatus.{}.'.format(vca_index)
1732 db_dict = other_update or {}
1733 if status:
1734 db_dict[db_path + 'status'] = status
1735 if element_under_configuration:
1736 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1737 if element_type:
1738 db_dict[db_path + 'elementType'] = element_type
1739 self.update_db_2("nsrs", nsr_id, db_dict)
1740 except DbException as e:
1741 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1742 .format(status, nsr_id, vca_index, e))
1743
1744 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1745 """
1746 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1747 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1748 Database is used because the result can be obtained from a different LCM worker in case of HA.
1749 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1750 :param db_nslcmop: database content of nslcmop
1751 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1752 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1753 computed 'vim-account-id'
1754 """
1755 modified = False
1756 nslcmop_id = db_nslcmop['_id']
1757 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1758 if placement_engine == "PLA":
1759 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1760 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1761 db_poll_interval = 5
1762 wait = db_poll_interval * 10
1763 pla_result = None
1764 while not pla_result and wait >= 0:
1765 await asyncio.sleep(db_poll_interval)
1766 wait -= db_poll_interval
1767 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1768 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1769
1770 if not pla_result:
1771 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1772
1773 for pla_vnf in pla_result['vnf']:
1774 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1775 if not pla_vnf.get('vimAccountId') or not vnfr:
1776 continue
1777 modified = True
1778 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1779 # Modifies db_vnfrs
1780 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1781 return modified
1782
1783 def update_nsrs_with_pla_result(self, params):
1784 try:
1785 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1786 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1787 except Exception as e:
1788 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1789
1790 async def instantiate(self, nsr_id, nslcmop_id):
1791 """
1792
1793 :param nsr_id: ns instance to deploy
1794 :param nslcmop_id: operation to run
1795 :return:
1796 """
1797
1798 # Try to lock HA task here
1799 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1800 if not task_is_locked_by_me:
1801 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1802 return
1803
1804 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1805 self.logger.debug(logging_text + "Enter")
1806
1807 # Sync from FSMongo
1808 self.fs.sync()
1809
1810 # get all needed from database
1811
1812 # database nsrs record
1813 db_nsr = None
1814
1815 # database nslcmops record
1816 db_nslcmop = None
1817
1818 # update operation on nsrs
1819 db_nsr_update = {}
1820 # update operation on nslcmops
1821 db_nslcmop_update = {}
1822
1823 nslcmop_operation_state = None
1824 db_vnfrs = {} # vnf's info indexed by member-index
1825 # n2vc_info = {}
1826 tasks_dict_info = {} # from task to info text
1827 exc = None
1828 error_list = []
1829 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1830 # ^ stage, step, VIM progress
1831 try:
1832 # wait for any previous tasks in process
1833 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1834
1835 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1836 stage[1] = "Reading from database,"
1837 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1838 db_nsr_update["detailed-status"] = "creating"
1839 db_nsr_update["operational-status"] = "init"
1840 self._write_ns_status(
1841 nsr_id=nsr_id,
1842 ns_state="BUILDING",
1843 current_operation="INSTANTIATING",
1844 current_operation_id=nslcmop_id,
1845 other_update=db_nsr_update
1846 )
1847 self._write_op_status(
1848 op_id=nslcmop_id,
1849 stage=stage,
1850 queuePosition=0
1851 )
1852
1853 # read from db: operation
1854 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1855 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1856 ns_params = db_nslcmop.get("operationParams")
1857 if ns_params and ns_params.get("timeout_ns_deploy"):
1858 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1859 else:
1860 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1861
1862 # read from db: ns
1863 stage[1] = "Getting nsr={} from db".format(nsr_id)
1864 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1865 stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"])
1866 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1867 db_nsr["nsd"] = nsd
1868 # nsr_name = db_nsr["name"] # TODO short-name??
1869
1870 # read from db: vnf's of this ns
1871 stage[1] = "Getting vnfrs from db"
1872 self.logger.debug(logging_text + stage[1])
1873 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1874
1875 # read from db: vnfd's for every vnf
1876 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1877 db_vnfds = {} # every vnfd data indexed by vnf id
1878 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1879
1880 # for each vnf in ns, read vnfd
1881 for vnfr in db_vnfrs_list:
1882 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1883 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1884 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1885 # if we haven't this vnfd, read it from db
1886 if vnfd_id not in db_vnfds:
1887 # read from db
1888 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1889 self.logger.debug(logging_text + stage[1])
1890 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1891
1892 # store vnfd
1893 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1894 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1895 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1896
1897 # Get or generates the _admin.deployed.VCA list
1898 vca_deployed_list = None
1899 if db_nsr["_admin"].get("deployed"):
1900 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1901 if vca_deployed_list is None:
1902 vca_deployed_list = []
1903 configuration_status_list = []
1904 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1905 db_nsr_update["configurationStatus"] = configuration_status_list
1906 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1907 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1908 elif isinstance(vca_deployed_list, dict):
1909 # maintain backward compatibility. Change a dict to list at database
1910 vca_deployed_list = list(vca_deployed_list.values())
1911 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1912 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1913
1914 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1915 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1916 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1917
1918 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1919 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1920 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1921
1922 # n2vc_redesign STEP 2 Deploy Network Scenario
1923 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1924 self._write_op_status(
1925 op_id=nslcmop_id,
1926 stage=stage
1927 )
1928
1929 stage[1] = "Deploying KDUs,"
1930 # self.logger.debug(logging_text + "Before deploy_kdus")
1931 # Call to deploy_kdus in case exists the "vdu:kdu" param
1932 await self.deploy_kdus(
1933 logging_text=logging_text,
1934 nsr_id=nsr_id,
1935 nslcmop_id=nslcmop_id,
1936 db_vnfrs=db_vnfrs,
1937 db_vnfds=db_vnfds,
1938 task_instantiation_info=tasks_dict_info,
1939 )
1940
1941 stage[1] = "Getting VCA public key."
1942 # n2vc_redesign STEP 1 Get VCA public ssh-key
1943 # feature 1429. Add n2vc public key to needed VMs
1944 n2vc_key = self.n2vc.get_public_key()
1945 n2vc_key_list = [n2vc_key]
1946 if self.vca_config.get("public_key"):
1947 n2vc_key_list.append(self.vca_config["public_key"])
1948
1949 stage[1] = "Deploying NS at VIM."
1950 task_ro = asyncio.ensure_future(
1951 self.instantiate_RO(
1952 logging_text=logging_text,
1953 nsr_id=nsr_id,
1954 nsd=nsd,
1955 db_nsr=db_nsr,
1956 db_nslcmop=db_nslcmop,
1957 db_vnfrs=db_vnfrs,
1958 db_vnfds_ref=db_vnfds_ref,
1959 n2vc_key_list=n2vc_key_list,
1960 stage=stage
1961 )
1962 )
1963 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1964 tasks_dict_info[task_ro] = "Deploying at VIM"
1965
1966 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1967 stage[1] = "Deploying Execution Environments."
1968 self.logger.debug(logging_text + stage[1])
1969
1970 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1971 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1972 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1973 vnfd_id = c_vnf["vnfd-id-ref"]
1974 vnfd = db_vnfds_ref[vnfd_id]
1975 member_vnf_index = str(c_vnf["member-vnf-index"])
1976 db_vnfr = db_vnfrs[member_vnf_index]
1977 base_folder = vnfd["_admin"]["storage"]
1978 vdu_id = None
1979 vdu_index = 0
1980 vdu_name = None
1981 kdu_name = None
1982
1983 # Get additional parameters
1984 deploy_params = {}
1985 if db_vnfr.get("additionalParamsForVnf"):
1986 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1987
1988 descriptor_config = vnfd.get("vnf-configuration")
1989 if descriptor_config:
1990 self._deploy_n2vc(
1991 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1992 db_nsr=db_nsr,
1993 db_vnfr=db_vnfr,
1994 nslcmop_id=nslcmop_id,
1995 nsr_id=nsr_id,
1996 nsi_id=nsi_id,
1997 vnfd_id=vnfd_id,
1998 vdu_id=vdu_id,
1999 kdu_name=kdu_name,
2000 member_vnf_index=member_vnf_index,
2001 vdu_index=vdu_index,
2002 vdu_name=vdu_name,
2003 deploy_params=deploy_params,
2004 descriptor_config=descriptor_config,
2005 base_folder=base_folder,
2006 task_instantiation_info=tasks_dict_info,
2007 stage=stage
2008 )
2009
2010 # Deploy charms for each VDU that supports one.
2011 for vdud in get_iterable(vnfd, 'vdu'):
2012 vdu_id = vdud["id"]
2013 descriptor_config = vdud.get('vdu-configuration')
2014 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2015 if vdur.get("additionalParams"):
2016 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2017 else:
2018 deploy_params_vdu = deploy_params
2019 if descriptor_config:
2020 # look for vdu index in the db_vnfr["vdu"] section
2021 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2022 # if vdur["vdu-id-ref"] == vdu_id:
2023 # break
2024 # else:
2025 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2026 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2027 # vdu_name = vdur.get("name")
2028 vdu_name = None
2029 kdu_name = None
2030 for vdu_index in range(int(vdud.get("count", 1))):
2031 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2032 self._deploy_n2vc(
2033 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2034 member_vnf_index, vdu_id, vdu_index),
2035 db_nsr=db_nsr,
2036 db_vnfr=db_vnfr,
2037 nslcmop_id=nslcmop_id,
2038 nsr_id=nsr_id,
2039 nsi_id=nsi_id,
2040 vnfd_id=vnfd_id,
2041 vdu_id=vdu_id,
2042 kdu_name=kdu_name,
2043 member_vnf_index=member_vnf_index,
2044 vdu_index=vdu_index,
2045 vdu_name=vdu_name,
2046 deploy_params=deploy_params_vdu,
2047 descriptor_config=descriptor_config,
2048 base_folder=base_folder,
2049 task_instantiation_info=tasks_dict_info,
2050 stage=stage
2051 )
2052 for kdud in get_iterable(vnfd, 'kdu'):
2053 kdu_name = kdud["name"]
2054 descriptor_config = kdud.get('kdu-configuration')
2055 if descriptor_config:
2056 vdu_id = None
2057 vdu_index = 0
2058 vdu_name = None
2059 # look for vdu index in the db_vnfr["vdu"] section
2060 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2061 # if vdur["vdu-id-ref"] == vdu_id:
2062 # break
2063 # else:
2064 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2065 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2066 # vdu_name = vdur.get("name")
2067 # vdu_name = None
2068
2069 self._deploy_n2vc(
2070 logging_text=logging_text,
2071 db_nsr=db_nsr,
2072 db_vnfr=db_vnfr,
2073 nslcmop_id=nslcmop_id,
2074 nsr_id=nsr_id,
2075 nsi_id=nsi_id,
2076 vnfd_id=vnfd_id,
2077 vdu_id=vdu_id,
2078 kdu_name=kdu_name,
2079 member_vnf_index=member_vnf_index,
2080 vdu_index=vdu_index,
2081 vdu_name=vdu_name,
2082 deploy_params=deploy_params,
2083 descriptor_config=descriptor_config,
2084 base_folder=base_folder,
2085 task_instantiation_info=tasks_dict_info,
2086 stage=stage
2087 )
2088
2089 # Check if this NS has a charm configuration
2090 descriptor_config = nsd.get("ns-configuration")
2091 if descriptor_config and descriptor_config.get("juju"):
2092 vnfd_id = None
2093 db_vnfr = None
2094 member_vnf_index = None
2095 vdu_id = None
2096 kdu_name = None
2097 vdu_index = 0
2098 vdu_name = None
2099
2100 # Get additional parameters
2101 deploy_params = {}
2102 if db_nsr.get("additionalParamsForNs"):
2103 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
2104 base_folder = nsd["_admin"]["storage"]
2105 self._deploy_n2vc(
2106 logging_text=logging_text,
2107 db_nsr=db_nsr,
2108 db_vnfr=db_vnfr,
2109 nslcmop_id=nslcmop_id,
2110 nsr_id=nsr_id,
2111 nsi_id=nsi_id,
2112 vnfd_id=vnfd_id,
2113 vdu_id=vdu_id,
2114 kdu_name=kdu_name,
2115 member_vnf_index=member_vnf_index,
2116 vdu_index=vdu_index,
2117 vdu_name=vdu_name,
2118 deploy_params=deploy_params,
2119 descriptor_config=descriptor_config,
2120 base_folder=base_folder,
2121 task_instantiation_info=tasks_dict_info,
2122 stage=stage
2123 )
2124
2125 # rest of staff will be done at finally
2126
2127 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2128 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2129 exc = e
2130 except asyncio.CancelledError:
2131 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2132 exc = "Operation was cancelled"
2133 except Exception as e:
2134 exc = traceback.format_exc()
2135 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2136 finally:
2137 if exc:
2138 error_list.append(str(exc))
2139 try:
2140 # wait for pending tasks
2141 if tasks_dict_info:
2142 stage[1] = "Waiting for instantiate pending tasks."
2143 self.logger.debug(logging_text + stage[1])
2144 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2145 stage, nslcmop_id, nsr_id=nsr_id)
2146 stage[1] = stage[2] = ""
2147 except asyncio.CancelledError:
2148 error_list.append("Cancelled")
2149 # TODO cancel all tasks
2150 except Exception as exc:
2151 error_list.append(str(exc))
2152
2153 # update operation-status
2154 db_nsr_update["operational-status"] = "running"
2155 # let's begin with VCA 'configured' status (later we can change it)
2156 db_nsr_update["config-status"] = "configured"
2157 for task, task_name in tasks_dict_info.items():
2158 if not task.done() or task.cancelled() or task.exception():
2159 if task_name.startswith(self.task_name_deploy_vca):
2160 # A N2VC task is pending
2161 db_nsr_update["config-status"] = "failed"
2162 else:
2163 # RO or KDU task is pending
2164 db_nsr_update["operational-status"] = "failed"
2165
2166 # update status at database
2167 if error_list:
2168 error_detail = ". ".join(error_list)
2169 self.logger.error(logging_text + error_detail)
2170 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2171 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
2172
2173 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2174 db_nslcmop_update["detailed-status"] = error_detail
2175 nslcmop_operation_state = "FAILED"
2176 ns_state = "BROKEN"
2177 else:
2178 error_detail = None
2179 error_description_nsr = error_description_nslcmop = None
2180 ns_state = "READY"
2181 db_nsr_update["detailed-status"] = "Done"
2182 db_nslcmop_update["detailed-status"] = "Done"
2183 nslcmop_operation_state = "COMPLETED"
2184
2185 if db_nsr:
2186 self._write_ns_status(
2187 nsr_id=nsr_id,
2188 ns_state=ns_state,
2189 current_operation="IDLE",
2190 current_operation_id=None,
2191 error_description=error_description_nsr,
2192 error_detail=error_detail,
2193 other_update=db_nsr_update
2194 )
2195 self._write_op_status(
2196 op_id=nslcmop_id,
2197 stage="",
2198 error_message=error_description_nslcmop,
2199 operation_state=nslcmop_operation_state,
2200 other_update=db_nslcmop_update,
2201 )
2202
2203 if nslcmop_operation_state:
2204 try:
2205 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2206 "operationState": nslcmop_operation_state},
2207 loop=self.loop)
2208 except Exception as e:
2209 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2210
2211 self.logger.debug(logging_text + "Exit")
2212 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2213
2214 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2215 timeout: int = 3600, vca_type: str = None) -> bool:
2216
2217 # steps:
2218 # 1. find all relations for this VCA
2219 # 2. wait for other peers related
2220 # 3. add relations
2221
2222 try:
2223 vca_type = vca_type or "lxc_proxy_charm"
2224
2225 # STEP 1: find all relations for this VCA
2226
2227 # read nsr record
2228 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2229 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2230
2231 # this VCA data
2232 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2233
2234 # read all ns-configuration relations
2235 ns_relations = list()
2236 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2237 if db_ns_relations:
2238 for r in db_ns_relations:
2239 # check if this VCA is in the relation
2240 if my_vca.get('member-vnf-index') in\
2241 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2242 ns_relations.append(r)
2243
2244 # read all vnf-configuration relations
2245 vnf_relations = list()
2246 db_vnfd_list = db_nsr.get('vnfd-id')
2247 if db_vnfd_list:
2248 for vnfd in db_vnfd_list:
2249 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2250 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2251 if db_vnf_relations:
2252 for r in db_vnf_relations:
2253 # check if this VCA is in the relation
2254 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2255 vnf_relations.append(r)
2256
2257 # if no relations, terminate
2258 if not ns_relations and not vnf_relations:
2259 self.logger.debug(logging_text + ' No relations')
2260 return True
2261
2262 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2263
2264 # add all relations
2265 start = time()
2266 while True:
2267 # check timeout
2268 now = time()
2269 if now - start >= timeout:
2270 self.logger.error(logging_text + ' : timeout adding relations')
2271 return False
2272
2273 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2274 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2275
2276 # for each defined NS relation, find the VCA's related
2277 for r in ns_relations:
2278 from_vca_ee_id = None
2279 to_vca_ee_id = None
2280 from_vca_endpoint = None
2281 to_vca_endpoint = None
2282 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2283 for vca in vca_list:
2284 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2285 and vca.get('config_sw_installed'):
2286 from_vca_ee_id = vca.get('ee_id')
2287 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2288 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2289 and vca.get('config_sw_installed'):
2290 to_vca_ee_id = vca.get('ee_id')
2291 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2292 if from_vca_ee_id and to_vca_ee_id:
2293 # add relation
2294 await self.vca_map[vca_type].add_relation(
2295 ee_id_1=from_vca_ee_id,
2296 ee_id_2=to_vca_ee_id,
2297 endpoint_1=from_vca_endpoint,
2298 endpoint_2=to_vca_endpoint)
2299 # remove entry from relations list
2300 ns_relations.remove(r)
2301 else:
2302 # check failed peers
2303 try:
2304 vca_status_list = db_nsr.get('configurationStatus')
2305 if vca_status_list:
2306 for i in range(len(vca_list)):
2307 vca = vca_list[i]
2308 vca_status = vca_status_list[i]
2309 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2310 if vca_status.get('status') == 'BROKEN':
2311 # peer broken: remove relation from list
2312 ns_relations.remove(r)
2313 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2314 if vca_status.get('status') == 'BROKEN':
2315 # peer broken: remove relation from list
2316 ns_relations.remove(r)
2317 except Exception:
2318 # ignore
2319 pass
2320
2321 # for each defined VNF relation, find the VCA's related
2322 for r in vnf_relations:
2323 from_vca_ee_id = None
2324 to_vca_ee_id = None
2325 from_vca_endpoint = None
2326 to_vca_endpoint = None
2327 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2328 for vca in vca_list:
2329 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2330 from_vca_ee_id = vca.get('ee_id')
2331 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2332 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2333 to_vca_ee_id = vca.get('ee_id')
2334 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2335 if from_vca_ee_id and to_vca_ee_id:
2336 # add relation
2337 await self.vca_map[vca_type].add_relation(
2338 ee_id_1=from_vca_ee_id,
2339 ee_id_2=to_vca_ee_id,
2340 endpoint_1=from_vca_endpoint,
2341 endpoint_2=to_vca_endpoint)
2342 # remove entry from relations list
2343 vnf_relations.remove(r)
2344 else:
2345 # check failed peers
2346 try:
2347 vca_status_list = db_nsr.get('configurationStatus')
2348 if vca_status_list:
2349 for i in range(len(vca_list)):
2350 vca = vca_list[i]
2351 vca_status = vca_status_list[i]
2352 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2353 if vca_status.get('status') == 'BROKEN':
2354 # peer broken: remove relation from list
2355 ns_relations.remove(r)
2356 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2357 if vca_status.get('status') == 'BROKEN':
2358 # peer broken: remove relation from list
2359 ns_relations.remove(r)
2360 except Exception:
2361 # ignore
2362 pass
2363
2364 # wait for next try
2365 await asyncio.sleep(5.0)
2366
2367 if not ns_relations and not vnf_relations:
2368 self.logger.debug('Relations added')
2369 break
2370
2371 return True
2372
2373 except Exception as e:
2374 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2375 return False
2376
2377 def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
2378 """
2379 callback for kdu install intended to store the returned kdu_instance at database
2380 :return: None
2381 """
2382 db_update = {}
2383 try:
2384 result = task.result()
2385 if on_done:
2386 db_update[on_done] = str(result)
2387 except Exception as e:
2388 if on_exc:
2389 db_update[on_exc] = str(e)
2390 if db_update:
2391 try:
2392 self.update_db_2(item, _id, db_update)
2393 except Exception:
2394 pass
2395
2396 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2397 # Launch kdus if present in the descriptor
2398
2399 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2400
2401 def _get_cluster_id(cluster_id, cluster_type):
2402 nonlocal k8scluster_id_2_uuic
2403 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2404 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2405
2406 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2407 if not db_k8scluster:
2408 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2409 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2410 if not k8s_id:
2411 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2412 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2413 return k8s_id
2414
2415 logging_text += "Deploy kdus: "
2416 step = ""
2417 try:
2418 db_nsr_update = {"_admin.deployed.K8s": []}
2419 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2420
2421 index = 0
2422 updated_cluster_list = []
2423
2424 for vnfr_data in db_vnfrs.values():
2425 for kdur in get_iterable(vnfr_data, "kdur"):
2426 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2427 vnfd_id = vnfr_data.get('vnfd-id')
2428 namespace = kdur.get("k8s-namespace")
2429 if kdur.get("helm-chart"):
2430 kdumodel = kdur["helm-chart"]
2431 k8sclustertype = "helm-chart"
2432 elif kdur.get("juju-bundle"):
2433 kdumodel = kdur["juju-bundle"]
2434 k8sclustertype = "juju-bundle"
2435 else:
2436 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2437 "juju-bundle. Maybe an old NBI version is running".
2438 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2439 # check if kdumodel is a file and exists
2440 try:
2441 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2442 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2443 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2444 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2445 kdumodel)
2446 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2447 kdumodel = self.fs.path + filename
2448 except (asyncio.TimeoutError, asyncio.CancelledError):
2449 raise
2450 except Exception: # it is not a file
2451 pass
2452
2453 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2454 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2455 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2456
2457 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2458 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2459 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2460 if del_repo_list or added_repo_dict:
2461 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2462 updated = {'_admin.helm_charts_added.' +
2463 item: name for item, name in added_repo_dict.items()}
2464 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2465 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2466 added_repo_dict))
2467 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2468 updated_cluster_list.append(cluster_uuid)
2469
2470 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2471 kdur["kdu-name"], k8s_cluster_id)
2472
2473 k8s_instace_info = {"kdu-instance": None,
2474 "k8scluster-uuid": cluster_uuid,
2475 "k8scluster-type": k8sclustertype,
2476 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2477 "kdu-name": kdur["kdu-name"],
2478 "kdu-model": kdumodel,
2479 "namespace": namespace}
2480 db_path = "_admin.deployed.K8s.{}".format(index)
2481 db_nsr_update[db_path] = k8s_instace_info
2482 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2483
2484 db_dict = {"collection": "nsrs",
2485 "filter": {"_id": nsr_id},
2486 "path": db_path}
2487
2488 task = asyncio.ensure_future(
2489 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2490 atomic=True, params=desc_params,
2491 db_dict=db_dict, timeout=600,
2492 kdu_name=kdur["kdu-name"], namespace=namespace))
2493
2494 task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
2495 on_done=db_path + ".kdu-instance",
2496 on_exc=db_path + ".detailed-status"))
2497 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2498 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2499
2500 index += 1
2501
2502 except (LcmException, asyncio.CancelledError):
2503 raise
2504 except Exception as e:
2505 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2506 if isinstance(e, (N2VCException, DbException)):
2507 self.logger.error(logging_text + msg)
2508 else:
2509 self.logger.critical(logging_text + msg, exc_info=True)
2510 raise LcmException(msg)
2511 finally:
2512 if db_nsr_update:
2513 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2514
2515 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2516 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2517 base_folder, task_instantiation_info, stage):
2518 # launch instantiate_N2VC in a asyncio task and register task object
2519 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2520 # if not found, create one entry and update database
2521 # fill db_nsr._admin.deployed.VCA.<index>
2522
2523 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2524 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2525 ee_list = [descriptor_config]
2526 elif descriptor_config.get("execution-environment-list"):
2527 ee_list = descriptor_config.get("execution-environment-list")
2528 else: # other types as script are not supported
2529 ee_list = []
2530
2531 for ee_item in ee_list:
2532 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2533 ee_item.get("helm-chart")))
2534 if ee_item.get("juju"):
2535 vca_name = ee_item['juju'].get('charm')
2536 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2537 if ee_item['juju'].get('cloud') == "k8s":
2538 vca_type = "k8s_proxy_charm"
2539 elif ee_item['juju'].get('proxy') is False:
2540 vca_type = "native_charm"
2541 elif ee_item.get("helm-chart"):
2542 vca_name = ee_item['helm-chart']
2543 vca_type = "helm"
2544 else:
2545 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2546 continue
2547
2548 vca_index = -1
2549 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2550 if not vca_deployed:
2551 continue
2552 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2553 vca_deployed.get("vdu_id") == vdu_id and \
2554 vca_deployed.get("kdu_name") == kdu_name and \
2555 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2556 break
2557 else:
2558 # not found, create one.
2559 vca_deployed = {
2560 "member-vnf-index": member_vnf_index,
2561 "vdu_id": vdu_id,
2562 "kdu_name": kdu_name,
2563 "vdu_count_index": vdu_index,
2564 "operational-status": "init", # TODO revise
2565 "detailed-status": "", # TODO revise
2566 "step": "initial-deploy", # TODO revise
2567 "vnfd_id": vnfd_id,
2568 "vdu_name": vdu_name,
2569 "type": vca_type
2570 }
2571 vca_index += 1
2572
2573 # create VCA and configurationStatus in db
2574 db_dict = {
2575 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2576 "configurationStatus.{}".format(vca_index): dict()
2577 }
2578 self.update_db_2("nsrs", nsr_id, db_dict)
2579
2580 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2581
2582 # Launch task
2583 task_n2vc = asyncio.ensure_future(
2584 self.instantiate_N2VC(
2585 logging_text=logging_text,
2586 vca_index=vca_index,
2587 nsi_id=nsi_id,
2588 db_nsr=db_nsr,
2589 db_vnfr=db_vnfr,
2590 vdu_id=vdu_id,
2591 kdu_name=kdu_name,
2592 vdu_index=vdu_index,
2593 deploy_params=deploy_params,
2594 config_descriptor=descriptor_config,
2595 base_folder=base_folder,
2596 nslcmop_id=nslcmop_id,
2597 stage=stage,
2598 vca_type=vca_type,
2599 vca_name=vca_name
2600 )
2601 )
2602 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2603 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2604 member_vnf_index or "", vdu_id or "")
2605
2606 # Check if this VNFD has a configured terminate action
2607 def _has_terminate_config_primitive(self, vnfd):
2608 vnf_config = vnfd.get("vnf-configuration")
2609 if vnf_config and vnf_config.get("terminate-config-primitive"):
2610 return True
2611 else:
2612 return False
2613
2614 @staticmethod
2615 def _get_terminate_config_primitive_seq_list(vnfd):
2616 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2617 # No need to check for existing primitive twice, already done before
2618 vnf_config = vnfd.get("vnf-configuration")
2619 seq_list = vnf_config.get("terminate-config-primitive")
2620 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2621 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2622 return seq_list_sorted
2623
2624 @staticmethod
2625 def _create_nslcmop(nsr_id, operation, params):
2626 """
2627 Creates a ns-lcm-opp content to be stored at database.
2628 :param nsr_id: internal id of the instance
2629 :param operation: instantiate, terminate, scale, action, ...
2630 :param params: user parameters for the operation
2631 :return: dictionary following SOL005 format
2632 """
2633 # Raise exception if invalid arguments
2634 if not (nsr_id and operation and params):
2635 raise LcmException(
2636 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2637 now = time()
2638 _id = str(uuid4())
2639 nslcmop = {
2640 "id": _id,
2641 "_id": _id,
2642 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2643 "operationState": "PROCESSING",
2644 "statusEnteredTime": now,
2645 "nsInstanceId": nsr_id,
2646 "lcmOperationType": operation,
2647 "startTime": now,
2648 "isAutomaticInvocation": False,
2649 "operationParams": params,
2650 "isCancelPending": False,
2651 "links": {
2652 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2653 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2654 }
2655 }
2656 return nslcmop
2657
2658 def _format_additional_params(self, params):
2659 params = params or {}
2660 for key, value in params.items():
2661 if str(value).startswith("!!yaml "):
2662 params[key] = yaml.safe_load(value[7:])
2663 return params
2664
2665 def _get_terminate_primitive_params(self, seq, vnf_index):
2666 primitive = seq.get('name')
2667 primitive_params = {}
2668 params = {
2669 "member_vnf_index": vnf_index,
2670 "primitive": primitive,
2671 "primitive_params": primitive_params,
2672 }
2673 desc_params = {}
2674 return self._map_primitive_params(seq, params, desc_params)
2675
2676 # sub-operations
2677
2678 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2679 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2680 if op.get('operationState') == 'COMPLETED':
2681 # b. Skip sub-operation
2682 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2683 return self.SUBOPERATION_STATUS_SKIP
2684 else:
2685 # c. retry executing sub-operation
2686 # The sub-operation exists, and operationState != 'COMPLETED'
2687 # Update operationState = 'PROCESSING' to indicate a retry.
2688 operationState = 'PROCESSING'
2689 detailed_status = 'In progress'
2690 self._update_suboperation_status(
2691 db_nslcmop, op_index, operationState, detailed_status)
2692 # Return the sub-operation index
2693 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2694 # with arguments extracted from the sub-operation
2695 return op_index
2696
2697 # Find a sub-operation where all keys in a matching dictionary must match
2698 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2699 def _find_suboperation(self, db_nslcmop, match):
2700 if db_nslcmop and match:
2701 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2702 for i, op in enumerate(op_list):
2703 if all(op.get(k) == match[k] for k in match):
2704 return i
2705 return self.SUBOPERATION_STATUS_NOT_FOUND
2706
2707 # Update status for a sub-operation given its index
2708 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2709 # Update DB for HA tasks
2710 q_filter = {'_id': db_nslcmop['_id']}
2711 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2712 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2713 self.db.set_one("nslcmops",
2714 q_filter=q_filter,
2715 update_dict=update_dict,
2716 fail_on_empty=False)
2717
2718 # Add sub-operation, return the index of the added sub-operation
2719 # Optionally, set operationState, detailed-status, and operationType
2720 # Status and type are currently set for 'scale' sub-operations:
2721 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2722 # 'detailed-status' : status message
2723 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2724 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2725 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2726 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2727 RO_nsr_id=None, RO_scaling_info=None):
2728 if not db_nslcmop:
2729 return self.SUBOPERATION_STATUS_NOT_FOUND
2730 # Get the "_admin.operations" list, if it exists
2731 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2732 op_list = db_nslcmop_admin.get('operations')
2733 # Create or append to the "_admin.operations" list
2734 new_op = {'member_vnf_index': vnf_index,
2735 'vdu_id': vdu_id,
2736 'vdu_count_index': vdu_count_index,
2737 'primitive': primitive,
2738 'primitive_params': mapped_primitive_params}
2739 if operationState:
2740 new_op['operationState'] = operationState
2741 if detailed_status:
2742 new_op['detailed-status'] = detailed_status
2743 if operationType:
2744 new_op['lcmOperationType'] = operationType
2745 if RO_nsr_id:
2746 new_op['RO_nsr_id'] = RO_nsr_id
2747 if RO_scaling_info:
2748 new_op['RO_scaling_info'] = RO_scaling_info
2749 if not op_list:
2750 # No existing operations, create key 'operations' with current operation as first list element
2751 db_nslcmop_admin.update({'operations': [new_op]})
2752 op_list = db_nslcmop_admin.get('operations')
2753 else:
2754 # Existing operations, append operation to list
2755 op_list.append(new_op)
2756
2757 db_nslcmop_update = {'_admin.operations': op_list}
2758 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2759 op_index = len(op_list) - 1
2760 return op_index
2761
2762 # Helper methods for scale() sub-operations
2763
2764 # pre-scale/post-scale:
2765 # Check for 3 different cases:
2766 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2767 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2768 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2769 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2770 operationType, RO_nsr_id=None, RO_scaling_info=None):
2771 # Find this sub-operation
2772 if RO_nsr_id and RO_scaling_info:
2773 operationType = 'SCALE-RO'
2774 match = {
2775 'member_vnf_index': vnf_index,
2776 'RO_nsr_id': RO_nsr_id,
2777 'RO_scaling_info': RO_scaling_info,
2778 }
2779 else:
2780 match = {
2781 'member_vnf_index': vnf_index,
2782 'primitive': vnf_config_primitive,
2783 'primitive_params': primitive_params,
2784 'lcmOperationType': operationType
2785 }
2786 op_index = self._find_suboperation(db_nslcmop, match)
2787 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2788 # a. New sub-operation
2789 # The sub-operation does not exist, add it.
2790 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2791 # The following parameters are set to None for all kind of scaling:
2792 vdu_id = None
2793 vdu_count_index = None
2794 vdu_name = None
2795 if RO_nsr_id and RO_scaling_info:
2796 vnf_config_primitive = None
2797 primitive_params = None
2798 else:
2799 RO_nsr_id = None
2800 RO_scaling_info = None
2801 # Initial status for sub-operation
2802 operationState = 'PROCESSING'
2803 detailed_status = 'In progress'
2804 # Add sub-operation for pre/post-scaling (zero or more operations)
2805 self._add_suboperation(db_nslcmop,
2806 vnf_index,
2807 vdu_id,
2808 vdu_count_index,
2809 vdu_name,
2810 vnf_config_primitive,
2811 primitive_params,
2812 operationState,
2813 detailed_status,
2814 operationType,
2815 RO_nsr_id,
2816 RO_scaling_info)
2817 return self.SUBOPERATION_STATUS_NEW
2818 else:
2819 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2820 # or op_index (operationState != 'COMPLETED')
2821 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2822
2823 # Function to return execution_environment id
2824
2825 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2826 # TODO vdu_index_count
2827 for vca in vca_deployed_list:
2828 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2829 return vca["ee_id"]
2830
2831 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2832 vca_index, destroy_ee=True, exec_primitives=True):
2833 """
2834 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2835 :param logging_text:
2836 :param db_nslcmop:
2837 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2838 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2839 :param vca_index: index in the database _admin.deployed.VCA
2840 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2841 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2842 not executed properly
2843 :return: None or exception
2844 """
2845
2846 self.logger.debug(
2847 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2848 vca_index, vca_deployed, config_descriptor, destroy_ee
2849 )
2850 )
2851
2852 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2853
2854 # execute terminate_primitives
2855 if exec_primitives:
2856 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2857 vdu_id = vca_deployed.get("vdu_id")
2858 vdu_count_index = vca_deployed.get("vdu_count_index")
2859 vdu_name = vca_deployed.get("vdu_name")
2860 vnf_index = vca_deployed.get("member-vnf-index")
2861 if terminate_primitives and vca_deployed.get("needed_terminate"):
2862 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2863 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2864 for seq in terminate_primitives:
2865 # For each sequence in list, get primitive and call _ns_execute_primitive()
2866 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2867 vnf_index, seq.get("name"))
2868 self.logger.debug(logging_text + step)
2869 # Create the primitive for each sequence, i.e. "primitive": "touch"
2870 primitive = seq.get('name')
2871 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2872 # The following 3 parameters are currently set to None for 'terminate':
2873 # vdu_id, vdu_count_index, vdu_name
2874
2875 # Add sub-operation
2876 self._add_suboperation(db_nslcmop,
2877 vnf_index,
2878 vdu_id,
2879 vdu_count_index,
2880 vdu_name,
2881 primitive,
2882 mapped_primitive_params)
2883 # Sub-operations: Call _ns_execute_primitive() instead of action()
2884 try:
2885 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2886 mapped_primitive_params,
2887 vca_type=vca_type)
2888 except LcmException:
2889 # this happens when VCA is not deployed. In this case it is not needed to terminate
2890 continue
2891 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2892 if result not in result_ok:
2893 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2894 "error {}".format(seq.get("name"), vnf_index, result_detail))
2895 # set that this VCA do not need terminated
2896 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2897 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2898
2899 if destroy_ee:
2900 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
2901
2902 async def _delete_all_N2VC(self, db_nsr: dict):
2903 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2904 namespace = "." + db_nsr["_id"]
2905 try:
2906 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2907 except N2VCNotFound: # already deleted. Skip
2908 pass
2909 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2910
2911 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2912 """
2913 Terminates a deployment from RO
2914 :param logging_text:
2915 :param nsr_deployed: db_nsr._admin.deployed
2916 :param nsr_id:
2917 :param nslcmop_id:
2918 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2919 this method will update only the index 2, but it will write on database the concatenated content of the list
2920 :return:
2921 """
2922 db_nsr_update = {}
2923 failed_detail = []
2924 ro_nsr_id = ro_delete_action = None
2925 if nsr_deployed and nsr_deployed.get("RO"):
2926 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2927 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2928 try:
2929 if ro_nsr_id:
2930 stage[2] = "Deleting ns from VIM."
2931 db_nsr_update["detailed-status"] = " ".join(stage)
2932 self._write_op_status(nslcmop_id, stage)
2933 self.logger.debug(logging_text + stage[2])
2934 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2935 self._write_op_status(nslcmop_id, stage)
2936 desc = await self.RO.delete("ns", ro_nsr_id)
2937 ro_delete_action = desc["action_id"]
2938 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2939 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2940 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2941 if ro_delete_action:
2942 # wait until NS is deleted from VIM
2943 stage[2] = "Waiting ns deleted from VIM."
2944 detailed_status_old = None
2945 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2946 ro_delete_action))
2947 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2948 self._write_op_status(nslcmop_id, stage)
2949
2950 delete_timeout = 20 * 60 # 20 minutes
2951 while delete_timeout > 0:
2952 desc = await self.RO.show(
2953 "ns",
2954 item_id_name=ro_nsr_id,
2955 extra_item="action",
2956 extra_item_id=ro_delete_action)
2957
2958 # deploymentStatus
2959 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2960
2961 ns_status, ns_status_info = self.RO.check_action_status(desc)
2962 if ns_status == "ERROR":
2963 raise ROclient.ROClientException(ns_status_info)
2964 elif ns_status == "BUILD":
2965 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2966 elif ns_status == "ACTIVE":
2967 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2968 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2969 break
2970 else:
2971 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2972 if stage[2] != detailed_status_old:
2973 detailed_status_old = stage[2]
2974 db_nsr_update["detailed-status"] = " ".join(stage)
2975 self._write_op_status(nslcmop_id, stage)
2976 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2977 await asyncio.sleep(5, loop=self.loop)
2978 delete_timeout -= 5
2979 else: # delete_timeout <= 0:
2980 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2981
2982 except Exception as e:
2983 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2984 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2985 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2986 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2987 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2988 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2989 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2990 failed_detail.append("delete conflict: {}".format(e))
2991 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2992 else:
2993 failed_detail.append("delete error: {}".format(e))
2994 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2995
2996 # Delete nsd
2997 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2998 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2999 try:
3000 stage[2] = "Deleting nsd from RO."
3001 db_nsr_update["detailed-status"] = " ".join(stage)
3002 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3003 self._write_op_status(nslcmop_id, stage)
3004 await self.RO.delete("nsd", ro_nsd_id)
3005 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3006 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3007 except Exception as e:
3008 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3009 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3010 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3011 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3012 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3013 self.logger.debug(logging_text + failed_detail[-1])
3014 else:
3015 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3016 self.logger.error(logging_text + failed_detail[-1])
3017
3018 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3019 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3020 if not vnf_deployed or not vnf_deployed["id"]:
3021 continue
3022 try:
3023 ro_vnfd_id = vnf_deployed["id"]
3024 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3025 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3026 db_nsr_update["detailed-status"] = " ".join(stage)
3027 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3028 self._write_op_status(nslcmop_id, stage)
3029 await self.RO.delete("vnfd", ro_vnfd_id)
3030 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3031 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3032 except Exception as e:
3033 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3034 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3035 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3036 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3037 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3038 self.logger.debug(logging_text + failed_detail[-1])
3039 else:
3040 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3041 self.logger.error(logging_text + failed_detail[-1])
3042
3043 if failed_detail:
3044 stage[2] = "Error deleting from VIM"
3045 else:
3046 stage[2] = "Deleted from VIM"
3047 db_nsr_update["detailed-status"] = " ".join(stage)
3048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3049 self._write_op_status(nslcmop_id, stage)
3050
3051 if failed_detail:
3052 raise LcmException("; ".join(failed_detail))
3053
3054 async def terminate(self, nsr_id, nslcmop_id):
3055 # Try to lock HA task here
3056 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3057 if not task_is_locked_by_me:
3058 return
3059
3060 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3061 self.logger.debug(logging_text + "Enter")
3062 timeout_ns_terminate = self.timeout_ns_terminate
3063 db_nsr = None
3064 db_nslcmop = None
3065 operation_params = None
3066 exc = None
3067 error_list = [] # annotates all failed error messages
3068 db_nslcmop_update = {}
3069 autoremove = False # autoremove after terminated
3070 tasks_dict_info = {}
3071 db_nsr_update = {}
3072 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3073 # ^ contains [stage, step, VIM-status]
3074 try:
3075 # wait for any previous tasks in process
3076 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3077
3078 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3079 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3080 operation_params = db_nslcmop.get("operationParams") or {}
3081 if operation_params.get("timeout_ns_terminate"):
3082 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3083 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3084 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3085
3086 db_nsr_update["operational-status"] = "terminating"
3087 db_nsr_update["config-status"] = "terminating"
3088 self._write_ns_status(
3089 nsr_id=nsr_id,
3090 ns_state="TERMINATING",
3091 current_operation="TERMINATING",
3092 current_operation_id=nslcmop_id,
3093 other_update=db_nsr_update
3094 )
3095 self._write_op_status(
3096 op_id=nslcmop_id,
3097 queuePosition=0,
3098 stage=stage
3099 )
3100 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3101 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3102 return
3103
3104 stage[1] = "Getting vnf descriptors from db."
3105 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3106 db_vnfds_from_id = {}
3107 db_vnfds_from_member_index = {}
3108 # Loop over VNFRs
3109 for vnfr in db_vnfrs_list:
3110 vnfd_id = vnfr["vnfd-id"]
3111 if vnfd_id not in db_vnfds_from_id:
3112 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3113 db_vnfds_from_id[vnfd_id] = vnfd
3114 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3115
3116 # Destroy individual execution environments when there are terminating primitives.
3117 # Rest of EE will be deleted at once
3118 # TODO - check before calling _destroy_N2VC
3119 # if not operation_params.get("skip_terminate_primitives"):#
3120 # or not vca.get("needed_terminate"):
3121 stage[0] = "Stage 2/3 execute terminating primitives."
3122 self.logger.debug(logging_text + stage[0])
3123 stage[1] = "Looking execution environment that needs terminate."
3124 self.logger.debug(logging_text + stage[1])
3125 self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3126 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3127 self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
3128 config_descriptor = None
3129 if not vca or not vca.get("ee_id"):
3130 continue
3131 if not vca.get("member-vnf-index"):
3132 # ns
3133 config_descriptor = db_nsr.get("ns-configuration")
3134 elif vca.get("vdu_id"):
3135 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3136 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3137 if vdud:
3138 config_descriptor = vdud.get("vdu-configuration")
3139 elif vca.get("kdu_name"):
3140 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3141 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3142 if kdud:
3143 config_descriptor = kdud.get("kdu-configuration")
3144 else:
3145 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3146 # For helm we must destroy_ee
3147 vca_type = vca.get("type")
3148 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3149 vca.get("needed_terminate"))
3150 self.logger.debug("vca type: {}".format(vca_type))
3151 if not vca_type == "helm":
3152 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3153 vca_index, False, exec_terminate_primitives))
3154 else:
3155 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3156 vca_index, True, exec_terminate_primitives))
3157 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3158
3159 # wait for pending tasks of terminate primitives
3160 if tasks_dict_info:
3161 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
3162 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3163 min(self.timeout_charm_delete, timeout_ns_terminate),
3164 stage, nslcmop_id)
3165 if error_list:
3166 return # raise LcmException("; ".join(error_list))
3167 tasks_dict_info.clear()
3168
3169 # remove All execution environments at once
3170 stage[0] = "Stage 3/3 delete all."
3171
3172 if nsr_deployed.get("VCA"):
3173 stage[1] = "Deleting all execution environments."
3174 self.logger.debug(logging_text + stage[1])
3175 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3176 timeout=self.timeout_charm_delete))
3177 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3178 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3179
3180 # Delete from k8scluster
3181 stage[1] = "Deleting KDUs."
3182 self.logger.debug(logging_text + stage[1])
3183 # print(nsr_deployed)
3184 for kdu in get_iterable(nsr_deployed, "K8s"):
3185 if not kdu or not kdu.get("kdu-instance"):
3186 continue
3187 kdu_instance = kdu.get("kdu-instance")
3188 if kdu.get("k8scluster-type") in self.k8scluster_map:
3189 task_delete_kdu_instance = asyncio.ensure_future(
3190 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3191 cluster_uuid=kdu.get("k8scluster-uuid"),
3192 kdu_instance=kdu_instance))
3193 else:
3194 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3195 format(kdu.get("k8scluster-type")))
3196 continue
3197 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3198
3199 # remove from RO
3200 stage[1] = "Deleting ns from VIM."
3201 if self.ng_ro:
3202 task_delete_ro = asyncio.ensure_future(
3203 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3204 else:
3205 task_delete_ro = asyncio.ensure_future(
3206 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3207 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3208
3209 # rest of staff will be done at finally
3210
3211 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3212 self.logger.error(logging_text + "Exit Exception {}".format(e))
3213 exc = e
3214 except asyncio.CancelledError:
3215 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3216 exc = "Operation was cancelled"
3217 except Exception as e:
3218 exc = traceback.format_exc()
3219 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3220 finally:
3221 if exc:
3222 error_list.append(str(exc))
3223 try:
3224 # wait for pending tasks
3225 if tasks_dict_info:
3226 stage[1] = "Waiting for terminate pending tasks."
3227 self.logger.debug(logging_text + stage[1])
3228 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3229 stage, nslcmop_id)
3230 stage[1] = stage[2] = ""
3231 except asyncio.CancelledError:
3232 error_list.append("Cancelled")
3233 # TODO cancell all tasks
3234 except Exception as exc:
3235 error_list.append(str(exc))
3236 # update status at database
3237 if error_list:
3238 error_detail = "; ".join(error_list)
3239 # self.logger.error(logging_text + error_detail)
3240 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
3241 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
3242
3243 db_nsr_update["operational-status"] = "failed"
3244 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3245 db_nslcmop_update["detailed-status"] = error_detail
3246 nslcmop_operation_state = "FAILED"
3247 ns_state = "BROKEN"
3248 else:
3249 error_detail = None
3250 error_description_nsr = error_description_nslcmop = None
3251 ns_state = "NOT_INSTANTIATED"
3252 db_nsr_update["operational-status"] = "terminated"
3253 db_nsr_update["detailed-status"] = "Done"
3254 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3255 db_nslcmop_update["detailed-status"] = "Done"
3256 nslcmop_operation_state = "COMPLETED"
3257
3258 if db_nsr:
3259 self._write_ns_status(
3260 nsr_id=nsr_id,
3261 ns_state=ns_state,
3262 current_operation="IDLE",
3263 current_operation_id=None,
3264 error_description=error_description_nsr,
3265 error_detail=error_detail,
3266 other_update=db_nsr_update
3267 )
3268 self._write_op_status(
3269 op_id=nslcmop_id,
3270 stage="",
3271 error_message=error_description_nslcmop,
3272 operation_state=nslcmop_operation_state,
3273 other_update=db_nslcmop_update,
3274 )
3275 if operation_params:
3276 autoremove = operation_params.get("autoremove", False)
3277 if nslcmop_operation_state:
3278 try:
3279 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3280 "operationState": nslcmop_operation_state,
3281 "autoremove": autoremove},
3282 loop=self.loop)
3283 except Exception as e:
3284 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3285
3286 self.logger.debug(logging_text + "Exit")
3287 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3288
3289 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3290 time_start = time()
3291 error_detail_list = []
3292 error_list = []
3293 pending_tasks = list(created_tasks_info.keys())
3294 num_tasks = len(pending_tasks)
3295 num_done = 0
3296 stage[1] = "{}/{}.".format(num_done, num_tasks)
3297 self._write_op_status(nslcmop_id, stage)
3298 while pending_tasks:
3299 new_error = None
3300 _timeout = timeout + time_start - time()
3301 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3302 return_when=asyncio.FIRST_COMPLETED)
3303 num_done += len(done)
3304 if not done: # Timeout
3305 for task in pending_tasks:
3306 new_error = created_tasks_info[task] + ": Timeout"
3307 error_detail_list.append(new_error)
3308 error_list.append(new_error)
3309 break
3310 for task in done:
3311 if task.cancelled():
3312 exc = "Cancelled"
3313 else:
3314 exc = task.exception()
3315 if exc:
3316 if isinstance(exc, asyncio.TimeoutError):
3317 exc = "Timeout"
3318 new_error = created_tasks_info[task] + ": {}".format(exc)
3319 error_list.append(created_tasks_info[task])
3320 error_detail_list.append(new_error)
3321 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3322 K8sException)):
3323 self.logger.error(logging_text + new_error)
3324 else:
3325 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3326 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3327 else:
3328 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3329 stage[1] = "{}/{}.".format(num_done, num_tasks)
3330 if new_error:
3331 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3332 if nsr_id: # update also nsr
3333 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3334 "errorDetail": ". ".join(error_detail_list)})
3335 self._write_op_status(nslcmop_id, stage)
3336 return error_detail_list
3337
3338 @staticmethod
3339 def _map_primitive_params(primitive_desc, params, instantiation_params):
3340 """
3341 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3342 The default-value is used. If it is between < > it look for a value at instantiation_params
3343 :param primitive_desc: portion of VNFD/NSD that describes primitive
3344 :param params: Params provided by user
3345 :param instantiation_params: Instantiation params provided by user
3346 :return: a dictionary with the calculated params
3347 """
3348 calculated_params = {}
3349 for parameter in primitive_desc.get("parameter", ()):
3350 param_name = parameter["name"]
3351 if param_name in params:
3352 calculated_params[param_name] = params[param_name]
3353 elif "default-value" in parameter or "value" in parameter:
3354 if "value" in parameter:
3355 calculated_params[param_name] = parameter["value"]
3356 else:
3357 calculated_params[param_name] = parameter["default-value"]
3358 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3359 and calculated_params[param_name].endswith(">"):
3360 if calculated_params[param_name][1:-1] in instantiation_params:
3361 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3362 else:
3363 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3364 format(calculated_params[param_name], primitive_desc["name"]))
3365 else:
3366 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3367 format(param_name, primitive_desc["name"]))
3368
3369 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3370 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3371 width=256)
3372 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3373 calculated_params[param_name] = calculated_params[param_name][7:]
3374
3375 # add always ns_config_info if primitive name is config
3376 if primitive_desc["name"] == "config":
3377 if "ns_config_info" in instantiation_params:
3378 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3379 return calculated_params
3380
3381 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
3382 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3383 for vca in deployed_vca:
3384 if not vca:
3385 continue
3386 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3387 continue
3388 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3389 continue
3390 if kdu_name and kdu_name != vca["kdu_name"]:
3391 continue
3392 break
3393 else:
3394 # vca_deployed not found
3395 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3396 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3397
3398 # get ee_id
3399 ee_id = vca.get("ee_id")
3400 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3401 if not ee_id:
3402 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3403 "execution environment"
3404 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3405 return ee_id, vca_type
3406
3407 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3408 retries_interval=30, timeout=None,
3409 vca_type=None, db_dict=None) -> (str, str):
3410 try:
3411 if primitive == "config":
3412 primitive_params = {"params": primitive_params}
3413
3414 vca_type = vca_type or "lxc_proxy_charm"
3415
3416 while retries >= 0:
3417 try:
3418 output = await asyncio.wait_for(
3419 self.vca_map[vca_type].exec_primitive(
3420 ee_id=ee_id,
3421 primitive_name=primitive,
3422 params_dict=primitive_params,
3423 progress_timeout=self.timeout_progress_primitive,
3424 total_timeout=self.timeout_primitive,
3425 db_dict=db_dict),
3426 timeout=timeout or self.timeout_primitive)
3427 # execution was OK
3428 break
3429 except asyncio.CancelledError:
3430 raise
3431 except Exception as e: # asyncio.TimeoutError
3432 if isinstance(e, asyncio.TimeoutError):
3433 e = "Timeout"
3434 retries -= 1
3435 if retries >= 0:
3436 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3437 # wait and retry
3438 await asyncio.sleep(retries_interval, loop=self.loop)
3439 else:
3440 return 'FAILED', str(e)
3441
3442 return 'COMPLETED', output
3443
3444 except (LcmException, asyncio.CancelledError):
3445 raise
3446 except Exception as e:
3447 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3448
3449 async def action(self, nsr_id, nslcmop_id):
3450
3451 # Try to lock HA task here
3452 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3453 if not task_is_locked_by_me:
3454 return
3455
3456 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3457 self.logger.debug(logging_text + "Enter")
3458 # get all needed from database
3459 db_nsr = None
3460 db_nslcmop = None
3461 db_nsr_update = {}
3462 db_nslcmop_update = {}
3463 nslcmop_operation_state = None
3464 error_description_nslcmop = None
3465 exc = None
3466 try:
3467 # wait for any previous tasks in process
3468 step = "Waiting for previous operations to terminate"
3469 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3470
3471 self._write_ns_status(
3472 nsr_id=nsr_id,
3473 ns_state=None,
3474 current_operation="RUNNING ACTION",
3475 current_operation_id=nslcmop_id
3476 )
3477
3478 step = "Getting information from database"
3479 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3480 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3481
3482 nsr_deployed = db_nsr["_admin"].get("deployed")
3483 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3484 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3485 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3486 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3487 primitive = db_nslcmop["operationParams"]["primitive"]
3488 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3489 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3490
3491 if vnf_index:
3492 step = "Getting vnfr from database"
3493 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3494 step = "Getting vnfd from database"
3495 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3496 else:
3497 step = "Getting nsd from database"
3498 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3499
3500 # for backward compatibility
3501 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3502 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3503 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3504 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3505
3506 # look for primitive
3507 config_primitive_desc = None
3508 if vdu_id:
3509 for vdu in get_iterable(db_vnfd, "vdu"):
3510 if vdu_id == vdu["id"]:
3511 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3512 if config_primitive["name"] == primitive:
3513 config_primitive_desc = config_primitive
3514 break
3515 break
3516 elif kdu_name:
3517 for kdu in get_iterable(db_vnfd, "kdu"):
3518 if kdu_name == kdu["name"]:
3519 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3520 if config_primitive["name"] == primitive:
3521 config_primitive_desc = config_primitive
3522 break
3523 break
3524 elif vnf_index:
3525 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3526 if config_primitive["name"] == primitive:
3527 config_primitive_desc = config_primitive
3528 break
3529 else:
3530 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3531 if config_primitive["name"] == primitive:
3532 config_primitive_desc = config_primitive
3533 break
3534
3535 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3536 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3537 format(primitive))
3538
3539 if vnf_index:
3540 if vdu_id:
3541 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3542 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3543 elif kdu_name:
3544 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3545 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3546 else:
3547 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3548 else:
3549 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3550
3551 if kdu_name:
3552 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3553
3554 # TODO check if ns is in a proper status
3555 if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
3556 # kdur and desc_params already set from before
3557 if primitive_params:
3558 desc_params.update(primitive_params)
3559 # TODO Check if we will need something at vnf level
3560 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3561 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3562 break
3563 else:
3564 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3565
3566 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3567 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3568 raise LcmException(msg)
3569
3570 db_dict = {"collection": "nsrs",
3571 "filter": {"_id": nsr_id},
3572 "path": "_admin.deployed.K8s.{}".format(index)}
3573 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3574 step = "Executing kdu {}".format(primitive)
3575 if primitive == "upgrade":
3576 if desc_params.get("kdu_model"):
3577 kdu_model = desc_params.get("kdu_model")
3578 del desc_params["kdu_model"]
3579 else:
3580 kdu_model = kdu.get("kdu-model")
3581 parts = kdu_model.split(sep=":")
3582 if len(parts) == 2:
3583 kdu_model = parts[0]
3584
3585 detailed_status = await asyncio.wait_for(
3586 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3587 cluster_uuid=kdu.get("k8scluster-uuid"),
3588 kdu_instance=kdu.get("kdu-instance"),
3589 atomic=True, kdu_model=kdu_model,
3590 params=desc_params, db_dict=db_dict,
3591 timeout=timeout_ns_action),
3592 timeout=timeout_ns_action + 10)
3593 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3594 elif primitive == "rollback":
3595 detailed_status = await asyncio.wait_for(
3596 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3597 cluster_uuid=kdu.get("k8scluster-uuid"),
3598 kdu_instance=kdu.get("kdu-instance"),
3599 db_dict=db_dict),
3600 timeout=timeout_ns_action)
3601 elif primitive == "status":
3602 detailed_status = await asyncio.wait_for(
3603 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3604 cluster_uuid=kdu.get("k8scluster-uuid"),
3605 kdu_instance=kdu.get("kdu-instance")),
3606 timeout=timeout_ns_action)
3607 else:
3608 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3609 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3610
3611 detailed_status = await asyncio.wait_for(
3612 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3613 cluster_uuid=kdu.get("k8scluster-uuid"),
3614 kdu_instance=kdu_instance,
3615 primitive_name=primitive,
3616 params=params, db_dict=db_dict,
3617 timeout=timeout_ns_action),
3618 timeout=timeout_ns_action)
3619
3620 if detailed_status:
3621 nslcmop_operation_state = 'COMPLETED'
3622 else:
3623 detailed_status = ''
3624 nslcmop_operation_state = 'FAILED'
3625 else:
3626 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3627 member_vnf_index=vnf_index,
3628 vdu_id=vdu_id,
3629 vdu_count_index=vdu_count_index)
3630 db_nslcmop_notif = {"collection": "nslcmops",
3631 "filter": {"_id": nslcmop_id},
3632 "path": "admin.VCA"}
3633 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3634 ee_id,
3635 primitive=primitive,
3636 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3637 timeout=timeout_ns_action,
3638 vca_type=vca_type,
3639 db_dict=db_nslcmop_notif)
3640
3641 db_nslcmop_update["detailed-status"] = detailed_status
3642 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3643 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3644 detailed_status))
3645 return # database update is called inside finally
3646
3647 except (DbException, LcmException, N2VCException, K8sException) as e:
3648 self.logger.error(logging_text + "Exit Exception {}".format(e))
3649 exc = e
3650 except asyncio.CancelledError:
3651 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3652 exc = "Operation was cancelled"
3653 except asyncio.TimeoutError:
3654 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3655 exc = "Timeout"
3656 except Exception as e:
3657 exc = traceback.format_exc()
3658 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3659 finally:
3660 if exc:
3661 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3662 "FAILED {}: {}".format(step, exc)
3663 nslcmop_operation_state = "FAILED"
3664 if db_nsr:
3665 self._write_ns_status(
3666 nsr_id=nsr_id,
3667 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3668 current_operation="IDLE",
3669 current_operation_id=None,
3670 # error_description=error_description_nsr,
3671 # error_detail=error_detail,
3672 other_update=db_nsr_update
3673 )
3674
3675 self._write_op_status(
3676 op_id=nslcmop_id,
3677 stage="",
3678 error_message=error_description_nslcmop,
3679 operation_state=nslcmop_operation_state,
3680 other_update=db_nslcmop_update,
3681 )
3682
3683 if nslcmop_operation_state:
3684 try:
3685 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3686 "operationState": nslcmop_operation_state},
3687 loop=self.loop)
3688 except Exception as e:
3689 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3690 self.logger.debug(logging_text + "Exit")
3691 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3692 return nslcmop_operation_state, detailed_status
3693
3694 async def scale(self, nsr_id, nslcmop_id):
3695
3696 # Try to lock HA task here
3697 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3698 if not task_is_locked_by_me:
3699 return
3700
3701 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3702 self.logger.debug(logging_text + "Enter")
3703 # get all needed from database
3704 db_nsr = None
3705 db_nslcmop = None
3706 db_nslcmop_update = {}
3707 nslcmop_operation_state = None
3708 db_nsr_update = {}
3709 exc = None
3710 # in case of error, indicates what part of scale was failed to put nsr at error status
3711 scale_process = None
3712 old_operational_status = ""
3713 old_config_status = ""
3714 vnfr_scaled = False
3715 try:
3716 # wait for any previous tasks in process
3717 step = "Waiting for previous operations to terminate"
3718 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3719
3720 self._write_ns_status(
3721 nsr_id=nsr_id,
3722 ns_state=None,
3723 current_operation="SCALING",
3724 current_operation_id=nslcmop_id
3725 )
3726
3727 step = "Getting nslcmop from database"
3728 self.logger.debug(step + " after having waited for previous tasks to be completed")
3729 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3730 step = "Getting nsr from database"
3731 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3732
3733 old_operational_status = db_nsr["operational-status"]
3734 old_config_status = db_nsr["config-status"]
3735 step = "Parsing scaling parameters"
3736 # self.logger.debug(step)
3737 db_nsr_update["operational-status"] = "scaling"
3738 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3739 nsr_deployed = db_nsr["_admin"].get("deployed")
3740
3741 #######
3742 nsr_deployed = db_nsr["_admin"].get("deployed")
3743 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3744 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3745 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3746 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3747 #######
3748
3749 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3750 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3751 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3752 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3753 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3754
3755 # for backward compatibility
3756 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3757 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3758 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3759 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3760
3761 step = "Getting vnfr from database"
3762 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3763 step = "Getting vnfd from database"
3764 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3765
3766 step = "Getting scaling-group-descriptor"
3767 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3768 if scaling_descriptor["name"] == scaling_group:
3769 break
3770 else:
3771 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3772 "at vnfd:scaling-group-descriptor".format(scaling_group))
3773
3774 # cooldown_time = 0
3775 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3776 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3777 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3778 # break
3779
3780 # TODO check if ns is in a proper status
3781 step = "Sending scale order to VIM"
3782 nb_scale_op = 0
3783 if not db_nsr["_admin"].get("scaling-group"):
3784 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3785 admin_scale_index = 0
3786 else:
3787 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3788 if admin_scale_info["name"] == scaling_group:
3789 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3790 break
3791 else: # not found, set index one plus last element and add new entry with the name
3792 admin_scale_index += 1
3793 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3794 RO_scaling_info = []
3795 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3796 if scaling_type == "SCALE_OUT":
3797 # count if max-instance-count is reached
3798 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3799 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3800 if nb_scale_op >= max_instance_count:
3801 raise LcmException("reached the limit of {} (max-instance-count) "
3802 "scaling-out operations for the "
3803 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3804
3805 nb_scale_op += 1
3806 vdu_scaling_info["scaling_direction"] = "OUT"
3807 vdu_scaling_info["vdu-create"] = {}
3808 for vdu_scale_info in scaling_descriptor["vdu"]:
3809 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3810 "type": "create", "count": vdu_scale_info.get("count", 1)})
3811 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3812
3813 elif scaling_type == "SCALE_IN":
3814 # count if min-instance-count is reached
3815 min_instance_count = 0
3816 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3817 min_instance_count = int(scaling_descriptor["min-instance-count"])
3818 if nb_scale_op <= min_instance_count:
3819 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3820 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3821 nb_scale_op -= 1
3822 vdu_scaling_info["scaling_direction"] = "IN"
3823 vdu_scaling_info["vdu-delete"] = {}
3824 for vdu_scale_info in scaling_descriptor["vdu"]:
3825 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3826 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3827 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3828
3829 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3830 vdu_create = vdu_scaling_info.get("vdu-create")
3831 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3832 if vdu_scaling_info["scaling_direction"] == "IN":
3833 for vdur in reversed(db_vnfr["vdur"]):
3834 if vdu_delete.get(vdur["vdu-id-ref"]):
3835 vdu_delete[vdur["vdu-id-ref"]] -= 1
3836 vdu_scaling_info["vdu"].append({
3837 "name": vdur["name"],
3838 "vdu_id": vdur["vdu-id-ref"],
3839 "interface": []
3840 })
3841 for interface in vdur["interfaces"]:
3842 vdu_scaling_info["vdu"][-1]["interface"].append({
3843 "name": interface["name"],
3844 "ip_address": interface["ip-address"],
3845 "mac_address": interface.get("mac-address"),
3846 })
3847 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3848
3849 # PRE-SCALE BEGIN
3850 step = "Executing pre-scale vnf-config-primitive"
3851 if scaling_descriptor.get("scaling-config-action"):
3852 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3853 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3854 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3855 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3856 step = db_nslcmop_update["detailed-status"] = \
3857 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3858
3859 # look for primitive
3860 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3861 if config_primitive["name"] == vnf_config_primitive:
3862 break
3863 else:
3864 raise LcmException(
3865 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3866 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3867 "primitive".format(scaling_group, config_primitive))
3868
3869 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3870 if db_vnfr.get("additionalParamsForVnf"):
3871 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3872
3873 scale_process = "VCA"
3874 db_nsr_update["config-status"] = "configuring pre-scaling"
3875 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3876
3877 # Pre-scale retry check: Check if this sub-operation has been executed before
3878 op_index = self._check_or_add_scale_suboperation(
3879 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3880 if op_index == self.SUBOPERATION_STATUS_SKIP:
3881 # Skip sub-operation
3882 result = 'COMPLETED'
3883 result_detail = 'Done'
3884 self.logger.debug(logging_text +
3885 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3886 vnf_config_primitive, result, result_detail))
3887 else:
3888 if op_index == self.SUBOPERATION_STATUS_NEW:
3889 # New sub-operation: Get index of this sub-operation
3890 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3891 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3892 format(vnf_config_primitive))
3893 else:
3894 # retry: Get registered params for this existing sub-operation
3895 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3896 vnf_index = op.get('member_vnf_index')
3897 vnf_config_primitive = op.get('primitive')
3898 primitive_params = op.get('primitive_params')
3899 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3900 format(vnf_config_primitive))
3901 # Execute the primitive, either with new (first-time) or registered (reintent) args
3902 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3903 member_vnf_index=vnf_index,
3904 vdu_id=None,
3905 vdu_count_index=None)
3906 result, result_detail = await self._ns_execute_primitive(
3907 ee_id, vnf_config_primitive, primitive_params, vca_type)
3908 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3909 vnf_config_primitive, result, result_detail))
3910 # Update operationState = COMPLETED | FAILED
3911 self._update_suboperation_status(
3912 db_nslcmop, op_index, result, result_detail)
3913
3914 if result == "FAILED":
3915 raise LcmException(result_detail)
3916 db_nsr_update["config-status"] = old_config_status
3917 scale_process = None
3918 # PRE-SCALE END
3919
3920 # SCALE RO - BEGIN
3921 # Should this block be skipped if 'RO_nsr_id' == None ?
3922 # if (RO_nsr_id and RO_scaling_info):
3923 if RO_scaling_info:
3924 scale_process = "RO"
3925 # Scale RO retry check: Check if this sub-operation has been executed before
3926 op_index = self._check_or_add_scale_suboperation(
3927 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3928 if op_index == self.SUBOPERATION_STATUS_SKIP:
3929 # Skip sub-operation
3930 result = 'COMPLETED'
3931 result_detail = 'Done'
3932 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3933 result, result_detail))
3934 else:
3935 if op_index == self.SUBOPERATION_STATUS_NEW:
3936 # New sub-operation: Get index of this sub-operation
3937 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3938 self.logger.debug(logging_text + "New sub-operation RO")
3939 else:
3940 # retry: Get registered params for this existing sub-operation
3941 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3942 RO_nsr_id = op.get('RO_nsr_id')
3943 RO_scaling_info = op.get('RO_scaling_info')
3944 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
3945 vnf_config_primitive))
3946
3947 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3948 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3949 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3950 # wait until ready
3951 RO_nslcmop_id = RO_desc["instance_action_id"]
3952 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3953
3954 RO_task_done = False
3955 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3956 detailed_status_old = None
3957 self.logger.debug(logging_text + step)
3958
3959 deployment_timeout = 1 * 3600 # One hour
3960 while deployment_timeout > 0:
3961 if not RO_task_done:
3962 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3963 extra_item_id=RO_nslcmop_id)
3964
3965 # deploymentStatus
3966 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3967
3968 ns_status, ns_status_info = self.RO.check_action_status(desc)
3969 if ns_status == "ERROR":
3970 raise ROclient.ROClientException(ns_status_info)
3971 elif ns_status == "BUILD":
3972 detailed_status = step + "; {}".format(ns_status_info)
3973 elif ns_status == "ACTIVE":
3974 RO_task_done = True
3975 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3976 self.logger.debug(logging_text + step)
3977 else:
3978 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3979 else:
3980
3981 if ns_status == "ERROR":
3982 raise ROclient.ROClientException(ns_status_info)
3983 elif ns_status == "BUILD":
3984 detailed_status = step + "; {}".format(ns_status_info)
3985 elif ns_status == "ACTIVE":
3986 step = detailed_status = \
3987 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3988 if not vnfr_scaled:
3989 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3990 vnfr_scaled = True
3991 try:
3992 desc = await self.RO.show("ns", RO_nsr_id)
3993
3994 # deploymentStatus
3995 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3996
3997 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3998 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3999 break
4000 except LcmExceptionNoMgmtIP:
4001 pass
4002 else:
4003 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4004 if detailed_status != detailed_status_old:
4005 self._update_suboperation_status(
4006 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4007 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4008 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4009
4010 await asyncio.sleep(5, loop=self.loop)
4011 deployment_timeout -= 5
4012 if deployment_timeout <= 0:
4013 self._update_suboperation_status(
4014 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4015 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4016
4017 # update VDU_SCALING_INFO with the obtained ip_addresses
4018 if vdu_scaling_info["scaling_direction"] == "OUT":
4019 for vdur in reversed(db_vnfr["vdur"]):
4020 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4021 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4022 vdu_scaling_info["vdu"].append({
4023 "name": vdur["name"],
4024 "vdu_id": vdur["vdu-id-ref"],
4025 "interface": []
4026 })
4027 for interface in vdur["interfaces"]:
4028 vdu_scaling_info["vdu"][-1]["interface"].append({
4029 "name": interface["name"],
4030 "ip_address": interface["ip-address"],
4031 "mac_address": interface.get("mac-address"),
4032 })
4033 del vdu_scaling_info["vdu-create"]
4034
4035 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4036 # SCALE RO - END
4037
4038 scale_process = None
4039 if db_nsr_update:
4040 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4041
4042 # POST-SCALE BEGIN
4043 # execute primitive service POST-SCALING
4044 step = "Executing post-scale vnf-config-primitive"
4045 if scaling_descriptor.get("scaling-config-action"):
4046 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4047 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4048 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4049 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4050 step = db_nslcmop_update["detailed-status"] = \
4051 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4052
4053 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4054 if db_vnfr.get("additionalParamsForVnf"):
4055 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4056
4057 # look for primitive
4058 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4059 if config_primitive["name"] == vnf_config_primitive:
4060 break
4061 else:
4062 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
4063 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
4064 "match any vnf-configuration:config-primitive".format(scaling_group,
4065 config_primitive))
4066 scale_process = "VCA"
4067 db_nsr_update["config-status"] = "configuring post-scaling"
4068 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4069
4070 # Post-scale retry check: Check if this sub-operation has been executed before
4071 op_index = self._check_or_add_scale_suboperation(
4072 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4073 if op_index == self.SUBOPERATION_STATUS_SKIP:
4074 # Skip sub-operation
4075 result = 'COMPLETED'
4076 result_detail = 'Done'
4077 self.logger.debug(logging_text +
4078 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4079 format(vnf_config_primitive, result, result_detail))
4080 else:
4081 if op_index == self.SUBOPERATION_STATUS_NEW:
4082 # New sub-operation: Get index of this sub-operation
4083 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4084 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4085 format(vnf_config_primitive))
4086 else:
4087 # retry: Get registered params for this existing sub-operation
4088 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4089 vnf_index = op.get('member_vnf_index')
4090 vnf_config_primitive = op.get('primitive')
4091 primitive_params = op.get('primitive_params')
4092 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4093 format(vnf_config_primitive))
4094 # Execute the primitive, either with new (first-time) or registered (reintent) args
4095 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4096 member_vnf_index=vnf_index,
4097 vdu_id=None,
4098 vdu_count_index=None)
4099 result, result_detail = await self._ns_execute_primitive(
4100 ee_id, vnf_config_primitive, primitive_params, vca_type)
4101 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4102 vnf_config_primitive, result, result_detail))
4103 # Update operationState = COMPLETED | FAILED
4104 self._update_suboperation_status(
4105 db_nslcmop, op_index, result, result_detail)
4106
4107 if result == "FAILED":
4108 raise LcmException(result_detail)
4109 db_nsr_update["config-status"] = old_config_status
4110 scale_process = None
4111 # POST-SCALE END
4112
4113 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4114 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4115 else old_operational_status
4116 db_nsr_update["config-status"] = old_config_status
4117 return
4118 except (ROclient.ROClientException, DbException, LcmException) as e:
4119 self.logger.error(logging_text + "Exit Exception {}".format(e))
4120 exc = e
4121 except asyncio.CancelledError:
4122 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4123 exc = "Operation was cancelled"
4124 except Exception as e:
4125 exc = traceback.format_exc()
4126 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4127 finally:
4128 self._write_ns_status(
4129 nsr_id=nsr_id,
4130 ns_state=None,
4131 current_operation="IDLE",
4132 current_operation_id=None
4133 )
4134 if exc:
4135 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4136 nslcmop_operation_state = "FAILED"
4137 if db_nsr:
4138 db_nsr_update["operational-status"] = old_operational_status
4139 db_nsr_update["config-status"] = old_config_status
4140 db_nsr_update["detailed-status"] = ""
4141 if scale_process:
4142 if "VCA" in scale_process:
4143 db_nsr_update["config-status"] = "failed"
4144 if "RO" in scale_process:
4145 db_nsr_update["operational-status"] = "failed"
4146 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4147 exc)
4148 else:
4149 error_description_nslcmop = None
4150 nslcmop_operation_state = "COMPLETED"
4151 db_nslcmop_update["detailed-status"] = "Done"
4152
4153 self._write_op_status(
4154 op_id=nslcmop_id,
4155 stage="",
4156 error_message=error_description_nslcmop,
4157 operation_state=nslcmop_operation_state,
4158 other_update=db_nslcmop_update,
4159 )
4160 if db_nsr:
4161 self._write_ns_status(
4162 nsr_id=nsr_id,
4163 ns_state=None,
4164 current_operation="IDLE",
4165 current_operation_id=None,
4166 other_update=db_nsr_update
4167 )
4168
4169 if nslcmop_operation_state:
4170 try:
4171 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4172 "operationState": nslcmop_operation_state},
4173 loop=self.loop)
4174 # if cooldown_time:
4175 # await asyncio.sleep(cooldown_time, loop=self.loop)
4176 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4177 except Exception as e:
4178 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4179 self.logger.debug(logging_text + "Exit")
4180 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")