fix 1043 ns-action: set default timeout
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52 timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
53
54 SUBOPERATION_STATUS_NOT_FOUND = -1
55 SUBOPERATION_STATUS_NEW = -2
56 SUBOPERATION_STATUS_SKIP = -3
57 task_name_deploy_vca = "Deploying VCA"
58
59 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65 super().__init__(
66 db=db,
67 msg=msg,
68 fs=fs,
69 logger=logging.getLogger('lcm.ns')
70 )
71
72 self.loop = loop
73 self.lcm_tasks = lcm_tasks
74 self.timeout = config["timeout"]
75 self.ro_config = config["ro_config"]
76 self.vca_config = config["VCA"].copy()
77
78 # create N2VC connector
79 self.n2vc = N2VCJujuConnector(
80 db=self.db,
81 fs=self.fs,
82 log=self.logger,
83 loop=self.loop,
84 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
85 username=self.vca_config.get('user', None),
86 vca_config=self.vca_config,
87 on_update_db=self._on_update_n2vc_db
88 )
89
90 self.k8sclusterhelm = K8sHelmConnector(
91 kubectl_command=self.vca_config.get("kubectlpath"),
92 helm_command=self.vca_config.get("helmpath"),
93 fs=self.fs,
94 log=self.logger,
95 db=self.db,
96 on_update_db=None,
97 )
98
99 self.k8sclusterjuju = K8sJujuConnector(
100 kubectl_command=self.vca_config.get("kubectlpath"),
101 juju_command=self.vca_config.get("jujupath"),
102 fs=self.fs,
103 log=self.logger,
104 db=self.db,
105 on_update_db=None,
106 )
107
108 self.k8scluster_map = {
109 "helm-chart": self.k8sclusterhelm,
110 "chart": self.k8sclusterhelm,
111 "juju-bundle": self.k8sclusterjuju,
112 "juju": self.k8sclusterjuju,
113 }
114 # create RO client
115 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
116
117 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
118
119 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
120
121 try:
122 # TODO filter RO descriptor fields...
123
124 # write to database
125 db_dict = dict()
126 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
127 db_dict['deploymentStatus'] = ro_descriptor
128 self.update_db_2("nsrs", nsrs_id, db_dict)
129
130 except Exception as e:
131 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
132
133 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
134
135 # remove last dot from path (if exists)
136 if path.endswith('.'):
137 path = path[:-1]
138
139 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
140 # .format(table, filter, path, updated_data))
141
142 try:
143
144 nsr_id = filter.get('_id')
145
146 # read ns record from database
147 nsr = self.db.get_one(table='nsrs', q_filter=filter)
148 current_ns_status = nsr.get('nsState')
149
150 # get vca status for NS
151 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
152
153 # vcaStatus
154 db_dict = dict()
155 db_dict['vcaStatus'] = status_dict
156
157 # update configurationStatus for this VCA
158 try:
159 vca_index = int(path[path.rfind(".")+1:])
160
161 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
162 vca_status = vca_list[vca_index].get('status')
163
164 configuration_status_list = nsr.get('configurationStatus')
165 config_status = configuration_status_list[vca_index].get('status')
166
167 if config_status == 'BROKEN' and vca_status != 'failed':
168 db_dict['configurationStatus'][vca_index] = 'READY'
169 elif config_status != 'BROKEN' and vca_status == 'failed':
170 db_dict['configurationStatus'][vca_index] = 'BROKEN'
171 except Exception as e:
172 # not update configurationStatus
173 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
174
175 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
176 # if nsState = 'DEGRADED' check if all is OK
177 is_degraded = False
178 if current_ns_status in ('READY', 'DEGRADED'):
179 error_description = ''
180 # check machines
181 if status_dict.get('machines'):
182 for machine_id in status_dict.get('machines'):
183 machine = status_dict.get('machines').get(machine_id)
184 # check machine agent-status
185 if machine.get('agent-status'):
186 s = machine.get('agent-status').get('status')
187 if s != 'started':
188 is_degraded = True
189 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
190 # check machine instance status
191 if machine.get('instance-status'):
192 s = machine.get('instance-status').get('status')
193 if s != 'running':
194 is_degraded = True
195 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
196 # check applications
197 if status_dict.get('applications'):
198 for app_id in status_dict.get('applications'):
199 app = status_dict.get('applications').get(app_id)
200 # check application status
201 if app.get('status'):
202 s = app.get('status').get('status')
203 if s != 'active':
204 is_degraded = True
205 error_description += 'application {} status={} ; '.format(app_id, s)
206
207 if error_description:
208 db_dict['errorDescription'] = error_description
209 if current_ns_status == 'READY' and is_degraded:
210 db_dict['nsState'] = 'DEGRADED'
211 if current_ns_status == 'DEGRADED' and not is_degraded:
212 db_dict['nsState'] = 'READY'
213
214 # write to database
215 self.update_db_2("nsrs", nsr_id, db_dict)
216
217 except Exception as e:
218 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
219
220 return
221
222 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
223 """
224 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
225 :param vnfd: input vnfd
226 :param new_id: overrides vnf id if provided
227 :param additionalParams: Instantiation params for VNFs provided
228 :param nsrId: Id of the NSR
229 :return: copy of vnfd
230 """
231 try:
232 vnfd_RO = deepcopy(vnfd)
233 # remove unused by RO configuration, monitoring, scaling and internal keys
234 vnfd_RO.pop("_id", None)
235 vnfd_RO.pop("_admin", None)
236 vnfd_RO.pop("vnf-configuration", None)
237 vnfd_RO.pop("monitoring-param", None)
238 vnfd_RO.pop("scaling-group-descriptor", None)
239 vnfd_RO.pop("kdu", None)
240 vnfd_RO.pop("k8s-cluster", None)
241 if new_id:
242 vnfd_RO["id"] = new_id
243
244 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
245 for vdu in get_iterable(vnfd_RO, "vdu"):
246 cloud_init_file = None
247 if vdu.get("cloud-init-file"):
248 base_folder = vnfd["_admin"]["storage"]
249 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
250 vdu["cloud-init-file"])
251 with self.fs.file_open(cloud_init_file, "r") as ci_file:
252 cloud_init_content = ci_file.read()
253 vdu.pop("cloud-init-file", None)
254 elif vdu.get("cloud-init"):
255 cloud_init_content = vdu["cloud-init"]
256 else:
257 continue
258
259 env = Environment()
260 ast = env.parse(cloud_init_content)
261 mandatory_vars = meta.find_undeclared_variables(ast)
262 if mandatory_vars:
263 for var in mandatory_vars:
264 if not additionalParams or var not in additionalParams.keys():
265 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
266 "file, must be provided in the instantiation parameters inside the "
267 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
268 template = Template(cloud_init_content)
269 cloud_init_content = template.render(additionalParams or {})
270 vdu["cloud-init"] = cloud_init_content
271
272 return vnfd_RO
273 except FsException as e:
274 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
275 format(vnfd["id"], vdu["id"], cloud_init_file, e))
276 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd["id"], vdu["id"], e))
279
280 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
281 """
282 Creates a RO ns descriptor from OSM ns_instantiate params
283 :param ns_params: OSM instantiate params
284 :return: The RO ns descriptor
285 """
286 vim_2_RO = {}
287 wim_2_RO = {}
288 # TODO feature 1417: Check that no instantiation is set over PDU
289 # check if PDU forces a concrete vim-network-id and add it
290 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
291
292 def vim_account_2_RO(vim_account):
293 if vim_account in vim_2_RO:
294 return vim_2_RO[vim_account]
295
296 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
297 if db_vim["_admin"]["operationalState"] != "ENABLED":
298 raise LcmException("VIM={} is not available. operationalState={}".format(
299 vim_account, db_vim["_admin"]["operationalState"]))
300 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
301 vim_2_RO[vim_account] = RO_vim_id
302 return RO_vim_id
303
304 def wim_account_2_RO(wim_account):
305 if isinstance(wim_account, str):
306 if wim_account in wim_2_RO:
307 return wim_2_RO[wim_account]
308
309 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
310 if db_wim["_admin"]["operationalState"] != "ENABLED":
311 raise LcmException("WIM={} is not available. operationalState={}".format(
312 wim_account, db_wim["_admin"]["operationalState"]))
313 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
314 wim_2_RO[wim_account] = RO_wim_id
315 return RO_wim_id
316 else:
317 return wim_account
318
319 def ip_profile_2_RO(ip_profile):
320 RO_ip_profile = deepcopy((ip_profile))
321 if "dns-server" in RO_ip_profile:
322 if isinstance(RO_ip_profile["dns-server"], list):
323 RO_ip_profile["dns-address"] = []
324 for ds in RO_ip_profile.pop("dns-server"):
325 RO_ip_profile["dns-address"].append(ds['address'])
326 else:
327 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
328 if RO_ip_profile.get("ip-version") == "ipv4":
329 RO_ip_profile["ip-version"] = "IPv4"
330 if RO_ip_profile.get("ip-version") == "ipv6":
331 RO_ip_profile["ip-version"] = "IPv6"
332 if "dhcp-params" in RO_ip_profile:
333 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
334 return RO_ip_profile
335
336 if not ns_params:
337 return None
338 RO_ns_params = {
339 # "name": ns_params["nsName"],
340 # "description": ns_params.get("nsDescription"),
341 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
342 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
343 # "scenario": ns_params["nsdId"],
344 }
345
346 n2vc_key_list = n2vc_key_list or []
347 for vnfd_ref, vnfd in vnfd_dict.items():
348 vdu_needed_access = []
349 mgmt_cp = None
350 if vnfd.get("vnf-configuration"):
351 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required and vnfd.get("mgmt-interface"):
353 if vnfd["mgmt-interface"].get("vdu-id"):
354 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
355 elif vnfd["mgmt-interface"].get("cp"):
356 mgmt_cp = vnfd["mgmt-interface"]["cp"]
357
358 for vdu in vnfd.get("vdu", ()):
359 if vdu.get("vdu-configuration"):
360 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
361 if ssh_required:
362 vdu_needed_access.append(vdu["id"])
363 elif mgmt_cp:
364 for vdu_interface in vdu.get("interface"):
365 if vdu_interface.get("external-connection-point-ref") and \
366 vdu_interface["external-connection-point-ref"] == mgmt_cp:
367 vdu_needed_access.append(vdu["id"])
368 mgmt_cp = None
369 break
370
371 if vdu_needed_access:
372 for vnf_member in nsd.get("constituent-vnfd"):
373 if vnf_member["vnfd-id-ref"] != vnfd_ref:
374 continue
375 for vdu in vdu_needed_access:
376 populate_dict(RO_ns_params,
377 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
378 n2vc_key_list)
379
380 if ns_params.get("vduImage"):
381 RO_ns_params["vduImage"] = ns_params["vduImage"]
382
383 if ns_params.get("ssh_keys"):
384 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
385 for vnf_params in get_iterable(ns_params, "vnf"):
386 for constituent_vnfd in nsd["constituent-vnfd"]:
387 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
388 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
389 break
390 else:
391 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
392 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
393 if vnf_params.get("vimAccountId"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
395 vim_account_2_RO(vnf_params["vimAccountId"]))
396
397 for vdu_params in get_iterable(vnf_params, "vdu"):
398 # TODO feature 1417: check that this VDU exist and it is not a PDU
399 if vdu_params.get("volume"):
400 for volume_params in vdu_params["volume"]:
401 if volume_params.get("vim-volume-id"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
404 volume_params["vim-volume-id"])
405 if vdu_params.get("interface"):
406 for interface_params in vdu_params["interface"]:
407 if interface_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
409 vdu_params["id"], "interfaces", interface_params["name"],
410 "ip_address"),
411 interface_params["ip-address"])
412 if interface_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
414 vdu_params["id"], "interfaces", interface_params["name"],
415 "mac_address"),
416 interface_params["mac-address"])
417 if interface_params.get("floating-ip-required"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
419 vdu_params["id"], "interfaces", interface_params["name"],
420 "floating-ip"),
421 interface_params["floating-ip-required"])
422
423 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
424 if internal_vld_params.get("vim-network-name"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
426 internal_vld_params["name"], "vim-network-name"),
427 internal_vld_params["vim-network-name"])
428 if internal_vld_params.get("vim-network-id"):
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "vim-network-id"),
431 internal_vld_params["vim-network-id"])
432 if internal_vld_params.get("ip-profile"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
434 internal_vld_params["name"], "ip-profile"),
435 ip_profile_2_RO(internal_vld_params["ip-profile"]))
436 if internal_vld_params.get("provider-network"):
437
438 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
439 internal_vld_params["name"], "provider-network"),
440 internal_vld_params["provider-network"].copy())
441
442 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
443 # look for interface
444 iface_found = False
445 for vdu_descriptor in vnf_descriptor["vdu"]:
446 for vdu_interface in vdu_descriptor["interface"]:
447 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
448 if icp_params.get("ip-address"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_descriptor["id"], "interfaces",
451 vdu_interface["name"], "ip_address"),
452 icp_params["ip-address"])
453
454 if icp_params.get("mac-address"):
455 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 vdu_interface["name"], "mac_address"),
458 icp_params["mac-address"])
459 iface_found = True
460 break
461 if iface_found:
462 break
463 else:
464 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
465 "internal-vld:id-ref={} is not present at vnfd:internal-"
466 "connection-point".format(vnf_params["member-vnf-index"],
467 icp_params["id-ref"]))
468
469 for vld_params in get_iterable(ns_params, "vld"):
470 if "ip-profile" in vld_params:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
472 ip_profile_2_RO(vld_params["ip-profile"]))
473
474 if vld_params.get("provider-network"):
475
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
477 vld_params["provider-network"].copy())
478
479 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
480 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
481 wim_account_2_RO(vld_params["wimAccountId"])),
482 if vld_params.get("vim-network-name"):
483 RO_vld_sites = []
484 if isinstance(vld_params["vim-network-name"], dict):
485 for vim_account, vim_net in vld_params["vim-network-name"].items():
486 RO_vld_sites.append({
487 "netmap-use": vim_net,
488 "datacenter": vim_account_2_RO(vim_account)
489 })
490 else: # isinstance str
491 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
492 if RO_vld_sites:
493 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
494
495 if vld_params.get("vim-network-id"):
496 RO_vld_sites = []
497 if isinstance(vld_params["vim-network-id"], dict):
498 for vim_account, vim_net in vld_params["vim-network-id"].items():
499 RO_vld_sites.append({
500 "netmap-use": vim_net,
501 "datacenter": vim_account_2_RO(vim_account)
502 })
503 else: # isinstance str
504 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
505 if RO_vld_sites:
506 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
507 if vld_params.get("ns-net"):
508 if isinstance(vld_params["ns-net"], dict):
509 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
510 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
512 if "vnfd-connection-point-ref" in vld_params:
513 for cp_params in vld_params["vnfd-connection-point-ref"]:
514 # look for interface
515 for constituent_vnfd in nsd["constituent-vnfd"]:
516 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
517 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
518 break
519 else:
520 raise LcmException(
521 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
522 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
523 match_cp = False
524 for vdu_descriptor in vnf_descriptor["vdu"]:
525 for interface_descriptor in vdu_descriptor["interface"]:
526 if interface_descriptor.get("external-connection-point-ref") == \
527 cp_params["vnfd-connection-point-ref"]:
528 match_cp = True
529 break
530 if match_cp:
531 break
532 else:
533 raise LcmException(
534 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
535 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
536 cp_params["member-vnf-index-ref"],
537 cp_params["vnfd-connection-point-ref"],
538 vnf_descriptor["id"]))
539 if cp_params.get("ip-address"):
540 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
541 vdu_descriptor["id"], "interfaces",
542 interface_descriptor["name"], "ip_address"),
543 cp_params["ip-address"])
544 if cp_params.get("mac-address"):
545 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
546 vdu_descriptor["id"], "interfaces",
547 interface_descriptor["name"], "mac_address"),
548 cp_params["mac-address"])
549 return RO_ns_params
550
551 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
552 # make a copy to do not change
553 vdu_create = copy(vdu_create)
554 vdu_delete = copy(vdu_delete)
555
556 vdurs = db_vnfr.get("vdur")
557 if vdurs is None:
558 vdurs = []
559 vdu_index = len(vdurs)
560 while vdu_index:
561 vdu_index -= 1
562 vdur = vdurs[vdu_index]
563 if vdur.get("pdu-type"):
564 continue
565 vdu_id_ref = vdur["vdu-id-ref"]
566 if vdu_create and vdu_create.get(vdu_id_ref):
567 for index in range(0, vdu_create[vdu_id_ref]):
568 vdur = deepcopy(vdur)
569 vdur["_id"] = str(uuid4())
570 vdur["count-index"] += 1
571 vdurs.insert(vdu_index+1+index, vdur)
572 del vdu_create[vdu_id_ref]
573 if vdu_delete and vdu_delete.get(vdu_id_ref):
574 del vdurs[vdu_index]
575 vdu_delete[vdu_id_ref] -= 1
576 if not vdu_delete[vdu_id_ref]:
577 del vdu_delete[vdu_id_ref]
578 # check all operations are done
579 if vdu_create or vdu_delete:
580 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
581 vdu_create))
582 if vdu_delete:
583 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
584 vdu_delete))
585
586 vnfr_update = {"vdur": vdurs}
587 db_vnfr["vdur"] = vdurs
588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
589
590 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
591 """
592 Updates database nsr with the RO info for the created vld
593 :param ns_update_nsr: dictionary to be filled with the updated info
594 :param db_nsr: content of db_nsr. This is also modified
595 :param nsr_desc_RO: nsr descriptor from RO
596 :return: Nothing, LcmException is raised on errors
597 """
598
599 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
600 for net_RO in get_iterable(nsr_desc_RO, "nets"):
601 if vld["id"] != net_RO.get("ns_net_osm_id"):
602 continue
603 vld["vim-id"] = net_RO.get("vim_net_id")
604 vld["name"] = net_RO.get("vim_name")
605 vld["status"] = net_RO.get("status")
606 vld["status-detailed"] = net_RO.get("error_msg")
607 ns_update_nsr["vld.{}".format(vld_index)] = vld
608 break
609 else:
610 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
611
612 def set_vnfr_at_error(self, db_vnfrs, error_text):
613 try:
614 for db_vnfr in db_vnfrs.values():
615 vnfr_update = {"status": "ERROR"}
616 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
617 if "status" not in vdur:
618 vdur["status"] = "ERROR"
619 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
620 if error_text:
621 vdur["status-detailed"] = str(error_text)
622 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
623 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
624 except DbException as e:
625 self.logger.error("Cannot update vnf. {}".format(e))
626
627 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
628 """
629 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
630 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
631 :param nsr_desc_RO: nsr descriptor from RO
632 :return: Nothing, LcmException is raised on errors
633 """
634 for vnf_index, db_vnfr in db_vnfrs.items():
635 for vnf_RO in nsr_desc_RO["vnfs"]:
636 if vnf_RO["member_vnf_index"] != vnf_index:
637 continue
638 vnfr_update = {}
639 if vnf_RO.get("ip_address"):
640 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
641 elif not db_vnfr.get("ip-address"):
642 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
643 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
644
645 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
646 vdur_RO_count_index = 0
647 if vdur.get("pdu-type"):
648 continue
649 for vdur_RO in get_iterable(vnf_RO, "vms"):
650 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
651 continue
652 if vdur["count-index"] != vdur_RO_count_index:
653 vdur_RO_count_index += 1
654 continue
655 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
656 if vdur_RO.get("ip_address"):
657 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
658 else:
659 vdur["ip-address"] = None
660 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
661 vdur["name"] = vdur_RO.get("vim_name")
662 vdur["status"] = vdur_RO.get("status")
663 vdur["status-detailed"] = vdur_RO.get("error_msg")
664 for ifacer in get_iterable(vdur, "interfaces"):
665 for interface_RO in get_iterable(vdur_RO, "interfaces"):
666 if ifacer["name"] == interface_RO.get("internal_name"):
667 ifacer["ip-address"] = interface_RO.get("ip_address")
668 ifacer["mac-address"] = interface_RO.get("mac_address")
669 break
670 else:
671 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info"
673 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
678 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
679
680 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
681 for net_RO in get_iterable(nsr_desc_RO, "nets"):
682 if vld["id"] != net_RO.get("vnf_net_osm_id"):
683 continue
684 vld["vim-id"] = net_RO.get("vim_net_id")
685 vld["name"] = net_RO.get("vim_name")
686 vld["status"] = net_RO.get("status")
687 vld["status-detailed"] = net_RO.get("error_msg")
688 vnfr_update["vld.{}".format(vld_index)] = vld
689 break
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
692 vnf_index, vld["id"]))
693
694 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
695 break
696
697 else:
698 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
699
700 def _get_ns_config_info(self, nsr_id):
701 """
702 Generates a mapping between vnf,vdu elements and the N2VC id
703 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
704 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
705 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
706 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
707 """
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
710 mapping = {}
711 ns_config_info = {"osm-config-mapping": mapping}
712 for vca in vca_deployed_list:
713 if not vca["member-vnf-index"]:
714 continue
715 if not vca["vdu_id"]:
716 mapping[vca["member-vnf-index"]] = vca["application"]
717 else:
718 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
719 vca["application"]
720 return ns_config_info
721
722 @staticmethod
723 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
724 """
725 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
726 primitives as verify-ssh-credentials, or config when needed
727 :param desc_primitive_list: information of the descriptor
728 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
729 this element contains a ssh public key
730 :return: The modified list. Can ba an empty list, but always a list
731 """
732 if desc_primitive_list:
733 primitive_list = desc_primitive_list.copy()
734 else:
735 primitive_list = []
736 # look for primitive config, and get the position. None if not present
737 config_position = None
738 for index, primitive in enumerate(primitive_list):
739 if primitive["name"] == "config":
740 config_position = index
741 break
742
743 # for NS, add always a config primitive if not present (bug 874)
744 if not vca_deployed["member-vnf-index"] and config_position is None:
745 primitive_list.insert(0, {"name": "config", "parameter": []})
746 config_position = 0
747 # for VNF/VDU add verify-ssh-credentials after config
748 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
749 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
750 return primitive_list
751
752 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
753 n2vc_key_list, stage):
754 try:
755 db_nsr_update = {}
756 RO_descriptor_number = 0 # number of descriptors created at RO
757 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
758 nslcmop_id = db_nslcmop["_id"]
759 start_deploy = time()
760 ns_params = db_nslcmop.get("operationParams")
761 if ns_params and ns_params.get("timeout_ns_deploy"):
762 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
763 else:
764 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
765
766 # Check for and optionally request placement optimization. Database will be updated if placement activated
767 stage[2] = "Waiting for Placement."
768 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
769
770 # deploy RO
771
772 # get vnfds, instantiate at RO
773 for c_vnf in nsd.get("constituent-vnfd", ()):
774 member_vnf_index = c_vnf["member-vnf-index"]
775 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
776 vnfd_ref = vnfd["id"]
777
778 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
779 db_nsr_update["detailed-status"] = " ".join(stage)
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 self._write_op_status(nslcmop_id, stage)
782
783 # self.logger.debug(logging_text + stage[2])
784 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
785 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
786 RO_descriptor_number += 1
787
788 # look position at deployed.RO.vnfd if not present it will be appended at the end
789 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
790 if vnf_deployed["member-vnf-index"] == member_vnf_index:
791 break
792 else:
793 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
794 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
795
796 # look if present
797 RO_update = {"member-vnf-index": member_vnf_index}
798 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
799 if vnfd_list:
800 RO_update["id"] = vnfd_list[0]["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
802 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
803 else:
804 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
805 get("additionalParamsForVnf"), nsr_id)
806 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
807 RO_update["id"] = desc["uuid"]
808 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
809 vnfd_ref, member_vnf_index, desc["uuid"]))
810 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
811 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
812
813 # create nsd at RO
814 nsd_ref = nsd["id"]
815
816 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
817 db_nsr_update["detailed-status"] = " ".join(stage)
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819 self._write_op_status(nslcmop_id, stage)
820
821 # self.logger.debug(logging_text + stage[2])
822 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
823 RO_descriptor_number += 1
824 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
825 if nsd_list:
826 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
827 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
828 nsd_ref, RO_nsd_uuid))
829 else:
830 nsd_RO = deepcopy(nsd)
831 nsd_RO["id"] = RO_osm_nsd_id
832 nsd_RO.pop("_id", None)
833 nsd_RO.pop("_admin", None)
834 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
835 member_vnf_index = c_vnf["member-vnf-index"]
836 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
837 for c_vld in nsd_RO.get("vld", ()):
838 for cp in c_vld.get("vnfd-connection-point-ref", ()):
839 member_vnf_index = cp["member-vnf-index-ref"]
840 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
841
842 desc = await self.RO.create("nsd", descriptor=nsd_RO)
843 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
844 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
845 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 # Crate ns at RO
849 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853
854 # if present use it unless in error status
855 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
856 if RO_nsr_id:
857 try:
858 stage[2] = "Looking for existing ns at RO"
859 db_nsr_update["detailed-status"] = " ".join(stage)
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._write_op_status(nslcmop_id, stage)
862 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
863 desc = await self.RO.show("ns", RO_nsr_id)
864
865 except ROclient.ROClientException as e:
866 if e.http_code != HTTPStatus.NOT_FOUND:
867 raise
868 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
869 if RO_nsr_id:
870 ns_status, ns_status_info = self.RO.check_ns_status(desc)
871 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
872 if ns_status == "ERROR":
873 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
874 self.logger.debug(logging_text + stage[2])
875 await self.RO.delete("ns", RO_nsr_id)
876 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
877 if not RO_nsr_id:
878 stage[2] = "Checking dependencies"
879 db_nsr_update["detailed-status"] = " ".join(stage)
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 self._write_op_status(nslcmop_id, stage)
882 # self.logger.debug(logging_text + stage[2])
883
884 # check if VIM is creating and wait look if previous tasks in process
885 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
886 if task_dependency:
887 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
888 self.logger.debug(logging_text + stage[2])
889 await asyncio.wait(task_dependency, timeout=3600)
890 if ns_params.get("vnf"):
891 for vnf in ns_params["vnf"]:
892 if "vimAccountId" in vnf:
893 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
894 vnf["vimAccountId"])
895 if task_dependency:
896 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
897 self.logger.debug(logging_text + stage[2])
898 await asyncio.wait(task_dependency, timeout=3600)
899
900 stage[2] = "Checking instantiation parameters."
901 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
902 stage[2] = "Deploying ns at VIM."
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
908 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
911 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
912
913 # wait until NS is ready
914 stage[2] = "Waiting VIM to deploy ns."
915 db_nsr_update["detailed-status"] = " ".join(stage)
916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
917 self._write_op_status(nslcmop_id, stage)
918 detailed_status_old = None
919 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
920
921 old_desc = None
922 while time() <= start_deploy + timeout_ns_deploy:
923 desc = await self.RO.show("ns", RO_nsr_id)
924
925 # deploymentStatus
926 if desc != old_desc:
927 # desc has changed => update db
928 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
929 old_desc = desc
930
931 ns_status, ns_status_info = self.RO.check_ns_status(desc)
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
933 if ns_status == "ERROR":
934 raise ROclient.ROClientException(ns_status_info)
935 elif ns_status == "BUILD":
936 stage[2] = "VIM: ({})".format(ns_status_info)
937 elif ns_status == "ACTIVE":
938 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
939 try:
940 self.ns_update_vnfr(db_vnfrs, desc)
941 break
942 except LcmExceptionNoMgmtIP:
943 pass
944 else:
945 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
946 if stage[2] != detailed_status_old:
947 detailed_status_old = stage[2]
948 db_nsr_update["detailed-status"] = " ".join(stage)
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950 self._write_op_status(nslcmop_id, stage)
951 await asyncio.sleep(5, loop=self.loop)
952 else: # timeout_ns_deploy
953 raise ROclient.ROClientException("Timeout waiting ns to be ready")
954
955 # Updating NSR
956 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
957
958 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
959 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
960 stage[2] = "Deployed at VIM"
961 db_nsr_update["detailed-status"] = " ".join(stage)
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 self._write_op_status(nslcmop_id, stage)
964 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
965 # self.logger.debug(logging_text + "Deployed at VIM")
966 except (ROclient.ROClientException, LcmException, DbException) as e:
967 stage[2] = "ERROR deploying at VIM"
968 self.set_vnfr_at_error(db_vnfrs, str(e))
969 raise
970
971 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
972 """
973 Wait for ip addres at RO, and optionally, insert public key in virtual machine
974 :param logging_text: prefix use for logging
975 :param nsr_id:
976 :param vnfr_id:
977 :param vdu_id:
978 :param vdu_index:
979 :param pub_key: public ssh key to inject, None to skip
980 :param user: user to apply the public ssh key
981 :return: IP address
982 """
983
984 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
985 ro_nsr_id = None
986 ip_address = None
987 nb_tries = 0
988 target_vdu_id = None
989 ro_retries = 0
990
991 while True:
992
993 ro_retries += 1
994 if ro_retries >= 360: # 1 hour
995 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
996
997 await asyncio.sleep(10, loop=self.loop)
998
999 # get ip address
1000 if not target_vdu_id:
1001 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1002
1003 if not vdu_id: # for the VNF case
1004 if db_vnfr.get("status") == "ERROR":
1005 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1006 ip_address = db_vnfr.get("ip-address")
1007 if not ip_address:
1008 continue
1009 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1010 else: # VDU case
1011 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1012 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1013
1014 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1015 vdur = db_vnfr["vdur"][0]
1016 if not vdur:
1017 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1018 vdu_index))
1019
1020 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1021 ip_address = vdur.get("ip-address")
1022 if not ip_address:
1023 continue
1024 target_vdu_id = vdur["vdu-id-ref"]
1025 elif vdur.get("status") == "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1027
1028 if not target_vdu_id:
1029 continue
1030
1031 # inject public key into machine
1032 if pub_key and user:
1033 # wait until NS is deployed at RO
1034 if not ro_nsr_id:
1035 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1036 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1037 if not ro_nsr_id:
1038 continue
1039
1040 # self.logger.debug(logging_text + "Inserting RO key")
1041 if vdur.get("pdu-type"):
1042 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1043 return ip_address
1044 try:
1045 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1046 result_dict = await self.RO.create_action(
1047 item="ns",
1048 item_id_name=ro_nsr_id,
1049 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1050 )
1051 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1052 if not result_dict or not isinstance(result_dict, dict):
1053 raise LcmException("Unknown response from RO when injecting key")
1054 for result in result_dict.values():
1055 if result.get("vim_result") == 200:
1056 break
1057 else:
1058 raise ROclient.ROClientException("error injecting key: {}".format(
1059 result.get("description")))
1060 break
1061 except ROclient.ROClientException as e:
1062 if not nb_tries:
1063 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1064 format(e, 20*10))
1065 nb_tries += 1
1066 if nb_tries >= 20:
1067 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1068 else:
1069 break
1070
1071 return ip_address
1072
1073 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1074 """
1075 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1076 """
1077 my_vca = vca_deployed_list[vca_index]
1078 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1079 # vdu or kdu: no dependencies
1080 return
1081 timeout = 300
1082 while timeout >= 0:
1083 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1084 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1085 configuration_status_list = db_nsr["configurationStatus"]
1086 for index, vca_deployed in enumerate(configuration_status_list):
1087 if index == vca_index:
1088 # myself
1089 continue
1090 if not my_vca.get("member-vnf-index") or \
1091 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1092 internal_status = configuration_status_list[index].get("status")
1093 if internal_status == 'READY':
1094 continue
1095 elif internal_status == 'BROKEN':
1096 raise LcmException("Configuration aborted because dependent charm/s has failed")
1097 else:
1098 break
1099 else:
1100 # no dependencies, return
1101 return
1102 await asyncio.sleep(10)
1103 timeout -= 1
1104
1105 raise LcmException("Configuration aborted because dependent charm/s timeout")
1106
1107 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1108 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1109 nsr_id = db_nsr["_id"]
1110 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1113 db_dict = {
1114 'collection': 'nsrs',
1115 'filter': {'_id': nsr_id},
1116 'path': db_update_entry
1117 }
1118 step = ""
1119 try:
1120
1121 element_type = 'NS'
1122 element_under_configuration = nsr_id
1123
1124 vnfr_id = None
1125 if db_vnfr:
1126 vnfr_id = db_vnfr["_id"]
1127
1128 namespace = "{nsi}.{ns}".format(
1129 nsi=nsi_id if nsi_id else "",
1130 ns=nsr_id)
1131
1132 if vnfr_id:
1133 element_type = 'VNF'
1134 element_under_configuration = vnfr_id
1135 namespace += ".{}".format(vnfr_id)
1136 if vdu_id:
1137 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1138 element_type = 'VDU'
1139 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1140
1141 # Get artifact path
1142 self.fs.sync() # Sync from FSMongo
1143 artifact_path = "{}/{}/charms/{}".format(
1144 base_folder["folder"],
1145 base_folder["pkg-dir"],
1146 config_descriptor["juju"]["charm"]
1147 )
1148
1149 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1150 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1151 is_proxy_charm = False
1152
1153 # n2vc_redesign STEP 3.1
1154
1155 # find old ee_id if exists
1156 ee_id = vca_deployed.get("ee_id")
1157
1158 # create or register execution environment in VCA
1159 if is_proxy_charm:
1160
1161 self._write_configuration_status(
1162 nsr_id=nsr_id,
1163 vca_index=vca_index,
1164 status='CREATING',
1165 element_under_configuration=element_under_configuration,
1166 element_type=element_type
1167 )
1168
1169 step = "create execution environment"
1170 self.logger.debug(logging_text + step)
1171 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1172 reuse_ee_id=ee_id,
1173 db_dict=db_dict)
1174
1175 else:
1176 step = "Waiting to VM being up and getting IP address"
1177 self.logger.debug(logging_text + step)
1178 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1179 user=None, pub_key=None)
1180 credentials = {"hostname": rw_mgmt_ip}
1181 # get username
1182 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1183 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1184 # merged. Meanwhile let's get username from initial-config-primitive
1185 if not username and config_descriptor.get("initial-config-primitive"):
1186 for config_primitive in config_descriptor["initial-config-primitive"]:
1187 for param in config_primitive.get("parameter", ()):
1188 if param["name"] == "ssh-username":
1189 username = param["value"]
1190 break
1191 if not username:
1192 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1193 "'config-access.ssh-access.default-user'")
1194 credentials["username"] = username
1195 # n2vc_redesign STEP 3.2
1196
1197 self._write_configuration_status(
1198 nsr_id=nsr_id,
1199 vca_index=vca_index,
1200 status='REGISTERING',
1201 element_under_configuration=element_under_configuration,
1202 element_type=element_type
1203 )
1204
1205 step = "register execution environment {}".format(credentials)
1206 self.logger.debug(logging_text + step)
1207 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1208 db_dict=db_dict)
1209
1210 # for compatibility with MON/POL modules, the need model and application name at database
1211 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1212 ee_id_parts = ee_id.split('.')
1213 model_name = ee_id_parts[0]
1214 application_name = ee_id_parts[1]
1215 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1216 db_update_entry + "application": application_name,
1217 db_update_entry + "ee_id": ee_id})
1218
1219 # n2vc_redesign STEP 3.3
1220
1221 step = "Install configuration Software"
1222
1223 self._write_configuration_status(
1224 nsr_id=nsr_id,
1225 vca_index=vca_index,
1226 status='INSTALLING SW',
1227 element_under_configuration=element_under_configuration,
1228 element_type=element_type
1229 )
1230
1231 # TODO check if already done
1232 self.logger.debug(logging_text + step)
1233 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1234
1235 # write in db flag of configuration_sw already installed
1236 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1237
1238 # add relations for this VCA (wait for other peers related with this VCA)
1239 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1240
1241 # if SSH access is required, then get execution environment SSH public
1242 if is_proxy_charm: # if native charm we have waited already to VM be UP
1243 pub_key = None
1244 user = None
1245 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1246 # Needed to inject a ssh key
1247 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1248 step = "Install configuration Software, getting public ssh key"
1249 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1250
1251 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1252 else:
1253 step = "Waiting to VM being up and getting IP address"
1254 self.logger.debug(logging_text + step)
1255
1256 # n2vc_redesign STEP 5.1
1257 # wait for RO (ip-address) Insert pub_key into VM
1258 if vnfr_id:
1259 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1260 user=user, pub_key=pub_key)
1261 else:
1262 rw_mgmt_ip = None # This is for a NS configuration
1263
1264 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1265
1266 # store rw_mgmt_ip in deploy params for later replacement
1267 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1268
1269 # n2vc_redesign STEP 6 Execute initial config primitive
1270 step = 'execute initial config primitive'
1271 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1272
1273 # sort initial config primitives by 'seq'
1274 if initial_config_primitive_list:
1275 try:
1276 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1277 except Exception as e:
1278 self.logger.error(logging_text + step + ": " + str(e))
1279 else:
1280 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1281
1282 # add config if not present for NS charm
1283 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1284 vca_deployed)
1285
1286 # wait for dependent primitives execution (NS -> VNF -> VDU)
1287 if initial_config_primitive_list:
1288 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1289
1290 # stage, in function of element type: vdu, kdu, vnf or ns
1291 my_vca = vca_deployed_list[vca_index]
1292 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1293 # VDU or KDU
1294 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1295 elif my_vca.get("member-vnf-index"):
1296 # VNF
1297 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1298 else:
1299 # NS
1300 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1301
1302 self._write_configuration_status(
1303 nsr_id=nsr_id,
1304 vca_index=vca_index,
1305 status='EXECUTING PRIMITIVE'
1306 )
1307
1308 self._write_op_status(
1309 op_id=nslcmop_id,
1310 stage=stage
1311 )
1312
1313 check_if_terminated_needed = True
1314 for initial_config_primitive in initial_config_primitive_list:
1315 # adding information on the vca_deployed if it is a NS execution environment
1316 if not vca_deployed["member-vnf-index"]:
1317 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1318 # TODO check if already done
1319 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1320
1321 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1322 self.logger.debug(logging_text + step)
1323 await self.n2vc.exec_primitive(
1324 ee_id=ee_id,
1325 primitive_name=initial_config_primitive["name"],
1326 params_dict=primitive_params_,
1327 db_dict=db_dict
1328 )
1329 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1330 if check_if_terminated_needed:
1331 if config_descriptor.get('terminate-config-primitive'):
1332 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1333 check_if_terminated_needed = False
1334
1335 # TODO register in database that primitive is done
1336
1337 step = "instantiated at VCA"
1338 self.logger.debug(logging_text + step)
1339
1340 self._write_configuration_status(
1341 nsr_id=nsr_id,
1342 vca_index=vca_index,
1343 status='READY'
1344 )
1345
1346 except Exception as e: # TODO not use Exception but N2VC exception
1347 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1348 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1349 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1350 self._write_configuration_status(
1351 nsr_id=nsr_id,
1352 vca_index=vca_index,
1353 status='BROKEN'
1354 )
1355 raise LcmException("{} {}".format(step, e)) from e
1356
1357 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1358 error_description: str = None, error_detail: str = None, other_update: dict = None):
1359 """
1360 Update db_nsr fields.
1361 :param nsr_id:
1362 :param ns_state:
1363 :param current_operation:
1364 :param current_operation_id:
1365 :param error_description:
1366 :param error_detail:
1367 :param other_update: Other required changes at database if provided, will be cleared
1368 :return:
1369 """
1370 try:
1371 db_dict = other_update or {}
1372 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1373 db_dict["_admin.current-operation"] = current_operation_id
1374 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1375 db_dict["currentOperation"] = current_operation
1376 db_dict["currentOperationID"] = current_operation_id
1377 db_dict["errorDescription"] = error_description
1378 db_dict["errorDetail"] = error_detail
1379
1380 if ns_state:
1381 db_dict["nsState"] = ns_state
1382 self.update_db_2("nsrs", nsr_id, db_dict)
1383 except DbException as e:
1384 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1385
1386 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1387 operation_state: str = None, other_update: dict = None):
1388 try:
1389 db_dict = other_update or {}
1390 db_dict['queuePosition'] = queuePosition
1391 if isinstance(stage, list):
1392 db_dict['stage'] = stage[0]
1393 db_dict['detailed-status'] = " ".join(stage)
1394 elif stage is not None:
1395 db_dict['stage'] = str(stage)
1396
1397 if error_message is not None:
1398 db_dict['errorMessage'] = error_message
1399 if operation_state is not None:
1400 db_dict['operationState'] = operation_state
1401 db_dict["statusEnteredTime"] = time()
1402 self.update_db_2("nslcmops", op_id, db_dict)
1403 except DbException as e:
1404 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1405
1406 def _write_all_config_status(self, nsr_id: str, status: str):
1407 try:
1408 # nsrs record
1409 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1410 # configurationStatus
1411 config_status = db_nsr.get('configurationStatus')
1412 if config_status:
1413 # update status
1414 db_dict = dict()
1415 db_dict['configurationStatus'] = list()
1416 for c in config_status:
1417 c['status'] = status
1418 db_dict['configurationStatus'].append(c)
1419 self.update_db_2("nsrs", nsr_id, db_dict)
1420
1421 except DbException as e:
1422 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1423
1424 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1425 element_under_configuration: str = None, element_type: str = None):
1426
1427 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1428 # .format(vca_index, status))
1429
1430 try:
1431 db_path = 'configurationStatus.{}.'.format(vca_index)
1432 db_dict = dict()
1433 if status:
1434 db_dict[db_path + 'status'] = status
1435 if element_under_configuration:
1436 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1437 if element_type:
1438 db_dict[db_path + 'elementType'] = element_type
1439 self.update_db_2("nsrs", nsr_id, db_dict)
1440 except DbException as e:
1441 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1442 .format(status, nsr_id, vca_index, e))
1443
1444 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1445 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1446 if placement_engine == "PLA":
1447 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1448 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1449 db_poll_interval = 5
1450 wait = db_poll_interval * 4
1451 pla_result = None
1452 while not pla_result and wait >= 0:
1453 await asyncio.sleep(db_poll_interval)
1454 wait -= db_poll_interval
1455 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1456 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1457
1458 if not pla_result:
1459 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1460
1461 for pla_vnf in pla_result['vnf']:
1462 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1463 if not pla_vnf.get('vimAccountId') or not vnfr:
1464 continue
1465 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1466 return
1467
1468 def update_nsrs_with_pla_result(self, params):
1469 try:
1470 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1471 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1472 except Exception as e:
1473 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1474
1475 async def instantiate(self, nsr_id, nslcmop_id):
1476 """
1477
1478 :param nsr_id: ns instance to deploy
1479 :param nslcmop_id: operation to run
1480 :return:
1481 """
1482
1483 # Try to lock HA task here
1484 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1485 if not task_is_locked_by_me:
1486 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1487 return
1488
1489 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1490 self.logger.debug(logging_text + "Enter")
1491
1492 # get all needed from database
1493
1494 # database nsrs record
1495 db_nsr = None
1496
1497 # database nslcmops record
1498 db_nslcmop = None
1499
1500 # update operation on nsrs
1501 db_nsr_update = {}
1502 # update operation on nslcmops
1503 db_nslcmop_update = {}
1504
1505 nslcmop_operation_state = None
1506 db_vnfrs = {} # vnf's info indexed by member-index
1507 # n2vc_info = {}
1508 tasks_dict_info = {} # from task to info text
1509 exc = None
1510 error_list = []
1511 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1512 # ^ stage, step, VIM progress
1513 try:
1514 # wait for any previous tasks in process
1515 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1516
1517 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1518 stage[1] = "Reading from database,"
1519 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1520 db_nsr_update["detailed-status"] = "creating"
1521 db_nsr_update["operational-status"] = "init"
1522 self._write_ns_status(
1523 nsr_id=nsr_id,
1524 ns_state="BUILDING",
1525 current_operation="INSTANTIATING",
1526 current_operation_id=nslcmop_id,
1527 other_update=db_nsr_update
1528 )
1529 self._write_op_status(
1530 op_id=nslcmop_id,
1531 stage=stage,
1532 queuePosition=0
1533 )
1534
1535 # read from db: operation
1536 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1537 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1538 ns_params = db_nslcmop.get("operationParams")
1539 if ns_params and ns_params.get("timeout_ns_deploy"):
1540 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1541 else:
1542 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1543
1544 # read from db: ns
1545 stage[1] = "Getting nsr={} from db".format(nsr_id)
1546 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1547 # nsd is replicated into ns (no db read)
1548 nsd = db_nsr["nsd"]
1549 # nsr_name = db_nsr["name"] # TODO short-name??
1550
1551 # read from db: vnf's of this ns
1552 stage[1] = "Getting vnfrs from db"
1553 self.logger.debug(logging_text + stage[1])
1554 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1555
1556 # read from db: vnfd's for every vnf
1557 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1558 db_vnfds = {} # every vnfd data indexed by vnf id
1559 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1560
1561 # for each vnf in ns, read vnfd
1562 for vnfr in db_vnfrs_list:
1563 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1564 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1565 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1566 # if we haven't this vnfd, read it from db
1567 if vnfd_id not in db_vnfds:
1568 # read from db
1569 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1570 self.logger.debug(logging_text + stage[1])
1571 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1572
1573 # store vnfd
1574 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1575 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1576 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1577
1578 # Get or generates the _admin.deployed.VCA list
1579 vca_deployed_list = None
1580 if db_nsr["_admin"].get("deployed"):
1581 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1582 if vca_deployed_list is None:
1583 vca_deployed_list = []
1584 configuration_status_list = []
1585 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1586 db_nsr_update["configurationStatus"] = configuration_status_list
1587 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1588 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1589 elif isinstance(vca_deployed_list, dict):
1590 # maintain backward compatibility. Change a dict to list at database
1591 vca_deployed_list = list(vca_deployed_list.values())
1592 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1593 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1594
1595 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1596 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1597 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1598
1599 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1600 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1601 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1602
1603 # n2vc_redesign STEP 2 Deploy Network Scenario
1604 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1605 self._write_op_status(
1606 op_id=nslcmop_id,
1607 stage=stage
1608 )
1609
1610 stage[1] = "Deploying KDUs,"
1611 # self.logger.debug(logging_text + "Before deploy_kdus")
1612 # Call to deploy_kdus in case exists the "vdu:kdu" param
1613 await self.deploy_kdus(
1614 logging_text=logging_text,
1615 nsr_id=nsr_id,
1616 nslcmop_id=nslcmop_id,
1617 db_vnfrs=db_vnfrs,
1618 db_vnfds=db_vnfds,
1619 task_instantiation_info=tasks_dict_info,
1620 )
1621
1622 stage[1] = "Getting VCA public key."
1623 # n2vc_redesign STEP 1 Get VCA public ssh-key
1624 # feature 1429. Add n2vc public key to needed VMs
1625 n2vc_key = self.n2vc.get_public_key()
1626 n2vc_key_list = [n2vc_key]
1627 if self.vca_config.get("public_key"):
1628 n2vc_key_list.append(self.vca_config["public_key"])
1629
1630 stage[1] = "Deploying NS at VIM."
1631 task_ro = asyncio.ensure_future(
1632 self.instantiate_RO(
1633 logging_text=logging_text,
1634 nsr_id=nsr_id,
1635 nsd=nsd,
1636 db_nsr=db_nsr,
1637 db_nslcmop=db_nslcmop,
1638 db_vnfrs=db_vnfrs,
1639 db_vnfds_ref=db_vnfds_ref,
1640 n2vc_key_list=n2vc_key_list,
1641 stage=stage
1642 )
1643 )
1644 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1645 tasks_dict_info[task_ro] = "Deploying at VIM"
1646
1647 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1648 stage[1] = "Deploying Execution Environments."
1649 self.logger.debug(logging_text + stage[1])
1650
1651 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1652 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1653 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1654 vnfd_id = c_vnf["vnfd-id-ref"]
1655 vnfd = db_vnfds_ref[vnfd_id]
1656 member_vnf_index = str(c_vnf["member-vnf-index"])
1657 db_vnfr = db_vnfrs[member_vnf_index]
1658 base_folder = vnfd["_admin"]["storage"]
1659 vdu_id = None
1660 vdu_index = 0
1661 vdu_name = None
1662 kdu_name = None
1663
1664 # Get additional parameters
1665 deploy_params = {}
1666 if db_vnfr.get("additionalParamsForVnf"):
1667 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1668
1669 descriptor_config = vnfd.get("vnf-configuration")
1670 if descriptor_config and descriptor_config.get("juju"):
1671 self._deploy_n2vc(
1672 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1673 db_nsr=db_nsr,
1674 db_vnfr=db_vnfr,
1675 nslcmop_id=nslcmop_id,
1676 nsr_id=nsr_id,
1677 nsi_id=nsi_id,
1678 vnfd_id=vnfd_id,
1679 vdu_id=vdu_id,
1680 kdu_name=kdu_name,
1681 member_vnf_index=member_vnf_index,
1682 vdu_index=vdu_index,
1683 vdu_name=vdu_name,
1684 deploy_params=deploy_params,
1685 descriptor_config=descriptor_config,
1686 base_folder=base_folder,
1687 task_instantiation_info=tasks_dict_info,
1688 stage=stage
1689 )
1690
1691 # Deploy charms for each VDU that supports one.
1692 for vdud in get_iterable(vnfd, 'vdu'):
1693 vdu_id = vdud["id"]
1694 descriptor_config = vdud.get('vdu-configuration')
1695 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1696 if vdur.get("additionalParams"):
1697 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1698 else:
1699 deploy_params_vdu = deploy_params
1700 if descriptor_config and descriptor_config.get("juju"):
1701 # look for vdu index in the db_vnfr["vdu"] section
1702 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1703 # if vdur["vdu-id-ref"] == vdu_id:
1704 # break
1705 # else:
1706 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1707 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1708 # vdu_name = vdur.get("name")
1709 vdu_name = None
1710 kdu_name = None
1711 for vdu_index in range(int(vdud.get("count", 1))):
1712 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1713 self._deploy_n2vc(
1714 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1715 member_vnf_index, vdu_id, vdu_index),
1716 db_nsr=db_nsr,
1717 db_vnfr=db_vnfr,
1718 nslcmop_id=nslcmop_id,
1719 nsr_id=nsr_id,
1720 nsi_id=nsi_id,
1721 vnfd_id=vnfd_id,
1722 vdu_id=vdu_id,
1723 kdu_name=kdu_name,
1724 member_vnf_index=member_vnf_index,
1725 vdu_index=vdu_index,
1726 vdu_name=vdu_name,
1727 deploy_params=deploy_params_vdu,
1728 descriptor_config=descriptor_config,
1729 base_folder=base_folder,
1730 task_instantiation_info=tasks_dict_info,
1731 stage=stage
1732 )
1733 for kdud in get_iterable(vnfd, 'kdu'):
1734 kdu_name = kdud["name"]
1735 descriptor_config = kdud.get('kdu-configuration')
1736 if descriptor_config and descriptor_config.get("juju"):
1737 vdu_id = None
1738 vdu_index = 0
1739 vdu_name = None
1740 # look for vdu index in the db_vnfr["vdu"] section
1741 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1742 # if vdur["vdu-id-ref"] == vdu_id:
1743 # break
1744 # else:
1745 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1746 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1747 # vdu_name = vdur.get("name")
1748 # vdu_name = None
1749
1750 self._deploy_n2vc(
1751 logging_text=logging_text,
1752 db_nsr=db_nsr,
1753 db_vnfr=db_vnfr,
1754 nslcmop_id=nslcmop_id,
1755 nsr_id=nsr_id,
1756 nsi_id=nsi_id,
1757 vnfd_id=vnfd_id,
1758 vdu_id=vdu_id,
1759 kdu_name=kdu_name,
1760 member_vnf_index=member_vnf_index,
1761 vdu_index=vdu_index,
1762 vdu_name=vdu_name,
1763 deploy_params=deploy_params,
1764 descriptor_config=descriptor_config,
1765 base_folder=base_folder,
1766 task_instantiation_info=tasks_dict_info,
1767 stage=stage
1768 )
1769
1770 # Check if this NS has a charm configuration
1771 descriptor_config = nsd.get("ns-configuration")
1772 if descriptor_config and descriptor_config.get("juju"):
1773 vnfd_id = None
1774 db_vnfr = None
1775 member_vnf_index = None
1776 vdu_id = None
1777 kdu_name = None
1778 vdu_index = 0
1779 vdu_name = None
1780
1781 # Get additional parameters
1782 deploy_params = {}
1783 if db_nsr.get("additionalParamsForNs"):
1784 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1785 base_folder = nsd["_admin"]["storage"]
1786 self._deploy_n2vc(
1787 logging_text=logging_text,
1788 db_nsr=db_nsr,
1789 db_vnfr=db_vnfr,
1790 nslcmop_id=nslcmop_id,
1791 nsr_id=nsr_id,
1792 nsi_id=nsi_id,
1793 vnfd_id=vnfd_id,
1794 vdu_id=vdu_id,
1795 kdu_name=kdu_name,
1796 member_vnf_index=member_vnf_index,
1797 vdu_index=vdu_index,
1798 vdu_name=vdu_name,
1799 deploy_params=deploy_params,
1800 descriptor_config=descriptor_config,
1801 base_folder=base_folder,
1802 task_instantiation_info=tasks_dict_info,
1803 stage=stage
1804 )
1805
1806 # rest of staff will be done at finally
1807
1808 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1809 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1810 exc = e
1811 except asyncio.CancelledError:
1812 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1813 exc = "Operation was cancelled"
1814 except Exception as e:
1815 exc = traceback.format_exc()
1816 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1817 finally:
1818 if exc:
1819 error_list.append(str(exc))
1820 try:
1821 # wait for pending tasks
1822 if tasks_dict_info:
1823 stage[1] = "Waiting for instantiate pending tasks."
1824 self.logger.debug(logging_text + stage[1])
1825 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1826 stage, nslcmop_id, nsr_id=nsr_id)
1827 stage[1] = stage[2] = ""
1828 except asyncio.CancelledError:
1829 error_list.append("Cancelled")
1830 # TODO cancel all tasks
1831 except Exception as exc:
1832 error_list.append(str(exc))
1833
1834 # update operation-status
1835 db_nsr_update["operational-status"] = "running"
1836 # let's begin with VCA 'configured' status (later we can change it)
1837 db_nsr_update["config-status"] = "configured"
1838 for task, task_name in tasks_dict_info.items():
1839 if not task.done() or task.cancelled() or task.exception():
1840 if task_name.startswith(self.task_name_deploy_vca):
1841 # A N2VC task is pending
1842 db_nsr_update["config-status"] = "failed"
1843 else:
1844 # RO or KDU task is pending
1845 db_nsr_update["operational-status"] = "failed"
1846
1847 # update status at database
1848 if error_list:
1849 error_detail = ". ".join(error_list)
1850 self.logger.error(logging_text + error_detail)
1851 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1852 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1853
1854 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1855 db_nslcmop_update["detailed-status"] = error_detail
1856 nslcmop_operation_state = "FAILED"
1857 ns_state = "BROKEN"
1858 else:
1859 error_detail = None
1860 error_description_nsr = error_description_nslcmop = None
1861 ns_state = "READY"
1862 db_nsr_update["detailed-status"] = "Done"
1863 db_nslcmop_update["detailed-status"] = "Done"
1864 nslcmop_operation_state = "COMPLETED"
1865
1866 if db_nsr:
1867 self._write_ns_status(
1868 nsr_id=nsr_id,
1869 ns_state=ns_state,
1870 current_operation="IDLE",
1871 current_operation_id=None,
1872 error_description=error_description_nsr,
1873 error_detail=error_detail,
1874 other_update=db_nsr_update
1875 )
1876 if db_nslcmop:
1877 self._write_op_status(
1878 op_id=nslcmop_id,
1879 stage="",
1880 error_message=error_description_nslcmop,
1881 operation_state=nslcmop_operation_state,
1882 other_update=db_nslcmop_update,
1883 )
1884
1885 if nslcmop_operation_state:
1886 try:
1887 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1888 "operationState": nslcmop_operation_state},
1889 loop=self.loop)
1890 except Exception as e:
1891 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1892
1893 self.logger.debug(logging_text + "Exit")
1894 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1895
1896 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1897
1898 # steps:
1899 # 1. find all relations for this VCA
1900 # 2. wait for other peers related
1901 # 3. add relations
1902
1903 try:
1904
1905 # STEP 1: find all relations for this VCA
1906
1907 # read nsr record
1908 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1909
1910 # this VCA data
1911 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1912
1913 # read all ns-configuration relations
1914 ns_relations = list()
1915 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1916 if db_ns_relations:
1917 for r in db_ns_relations:
1918 # check if this VCA is in the relation
1919 if my_vca.get('member-vnf-index') in\
1920 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1921 ns_relations.append(r)
1922
1923 # read all vnf-configuration relations
1924 vnf_relations = list()
1925 db_vnfd_list = db_nsr.get('vnfd-id')
1926 if db_vnfd_list:
1927 for vnfd in db_vnfd_list:
1928 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1929 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1930 if db_vnf_relations:
1931 for r in db_vnf_relations:
1932 # check if this VCA is in the relation
1933 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1934 vnf_relations.append(r)
1935
1936 # if no relations, terminate
1937 if not ns_relations and not vnf_relations:
1938 self.logger.debug(logging_text + ' No relations')
1939 return True
1940
1941 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1942
1943 # add all relations
1944 start = time()
1945 while True:
1946 # check timeout
1947 now = time()
1948 if now - start >= timeout:
1949 self.logger.error(logging_text + ' : timeout adding relations')
1950 return False
1951
1952 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1953 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1954
1955 # for each defined NS relation, find the VCA's related
1956 for r in ns_relations:
1957 from_vca_ee_id = None
1958 to_vca_ee_id = None
1959 from_vca_endpoint = None
1960 to_vca_endpoint = None
1961 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1962 for vca in vca_list:
1963 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1964 and vca.get('config_sw_installed'):
1965 from_vca_ee_id = vca.get('ee_id')
1966 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1967 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1968 and vca.get('config_sw_installed'):
1969 to_vca_ee_id = vca.get('ee_id')
1970 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1971 if from_vca_ee_id and to_vca_ee_id:
1972 # add relation
1973 await self.n2vc.add_relation(
1974 ee_id_1=from_vca_ee_id,
1975 ee_id_2=to_vca_ee_id,
1976 endpoint_1=from_vca_endpoint,
1977 endpoint_2=to_vca_endpoint)
1978 # remove entry from relations list
1979 ns_relations.remove(r)
1980 else:
1981 # check failed peers
1982 try:
1983 vca_status_list = db_nsr.get('configurationStatus')
1984 if vca_status_list:
1985 for i in range(len(vca_list)):
1986 vca = vca_list[i]
1987 vca_status = vca_status_list[i]
1988 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1989 if vca_status.get('status') == 'BROKEN':
1990 # peer broken: remove relation from list
1991 ns_relations.remove(r)
1992 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1993 if vca_status.get('status') == 'BROKEN':
1994 # peer broken: remove relation from list
1995 ns_relations.remove(r)
1996 except Exception:
1997 # ignore
1998 pass
1999
2000 # for each defined VNF relation, find the VCA's related
2001 for r in vnf_relations:
2002 from_vca_ee_id = None
2003 to_vca_ee_id = None
2004 from_vca_endpoint = None
2005 to_vca_endpoint = None
2006 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2007 for vca in vca_list:
2008 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2009 from_vca_ee_id = vca.get('ee_id')
2010 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2011 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2012 to_vca_ee_id = vca.get('ee_id')
2013 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2014 if from_vca_ee_id and to_vca_ee_id:
2015 # add relation
2016 await self.n2vc.add_relation(
2017 ee_id_1=from_vca_ee_id,
2018 ee_id_2=to_vca_ee_id,
2019 endpoint_1=from_vca_endpoint,
2020 endpoint_2=to_vca_endpoint)
2021 # remove entry from relations list
2022 vnf_relations.remove(r)
2023 else:
2024 # check failed peers
2025 try:
2026 vca_status_list = db_nsr.get('configurationStatus')
2027 if vca_status_list:
2028 for i in range(len(vca_list)):
2029 vca = vca_list[i]
2030 vca_status = vca_status_list[i]
2031 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2032 if vca_status.get('status') == 'BROKEN':
2033 # peer broken: remove relation from list
2034 ns_relations.remove(r)
2035 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2036 if vca_status.get('status') == 'BROKEN':
2037 # peer broken: remove relation from list
2038 ns_relations.remove(r)
2039 except Exception:
2040 # ignore
2041 pass
2042
2043 # wait for next try
2044 await asyncio.sleep(5.0)
2045
2046 if not ns_relations and not vnf_relations:
2047 self.logger.debug('Relations added')
2048 break
2049
2050 return True
2051
2052 except Exception as e:
2053 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2054 return False
2055
2056 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2057 # Launch kdus if present in the descriptor
2058
2059 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2060
2061 def _get_cluster_id(cluster_id, cluster_type):
2062 nonlocal k8scluster_id_2_uuic
2063 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2064 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2065
2066 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2067 if not db_k8scluster:
2068 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2069 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2070 if not k8s_id:
2071 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2072 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2073 return k8s_id
2074
2075 logging_text += "Deploy kdus: "
2076 step = ""
2077 try:
2078 db_nsr_update = {"_admin.deployed.K8s": []}
2079 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2080
2081 index = 0
2082 updated_cluster_list = []
2083
2084 for vnfr_data in db_vnfrs.values():
2085 for kdur in get_iterable(vnfr_data, "kdur"):
2086 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2087 vnfd_id = vnfr_data.get('vnfd-id')
2088 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2089 if kdur.get("helm-chart"):
2090 kdumodel = kdur["helm-chart"]
2091 k8sclustertype = "helm-chart"
2092 elif kdur.get("juju-bundle"):
2093 kdumodel = kdur["juju-bundle"]
2094 k8sclustertype = "juju-bundle"
2095 else:
2096 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2097 "juju-bundle. Maybe an old NBI version is running".
2098 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2099 # check if kdumodel is a file and exists
2100 try:
2101 # path format: /vnfdid/pkkdir/kdumodel
2102 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
2103 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2104 kdumodel = self.fs.path + filename
2105 except asyncio.CancelledError:
2106 raise
2107 except Exception: # it is not a file
2108 pass
2109
2110 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2111 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2112 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2113
2114 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2115 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2116 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2117 if del_repo_list or added_repo_dict:
2118 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2119 updated = {'_admin.helm_charts_added.' +
2120 item: name for item, name in added_repo_dict.items()}
2121 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2122 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2123 added_repo_dict))
2124 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2125 updated_cluster_list.append(cluster_uuid)
2126
2127 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2128 kdur["kdu-name"], k8s_cluster_id)
2129
2130 k8s_instace_info = {"kdu-instance": None,
2131 "k8scluster-uuid": cluster_uuid,
2132 "k8scluster-type": k8sclustertype,
2133 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2134 "kdu-name": kdur["kdu-name"],
2135 "kdu-model": kdumodel}
2136 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2137 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2138
2139 db_dict = {"collection": "nsrs",
2140 "filter": {"_id": nsr_id},
2141 "path": "_admin.deployed.K8s.{}".format(index)}
2142
2143 task = asyncio.ensure_future(
2144 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2145 atomic=True, params=desc_params,
2146 db_dict=db_dict, timeout=600,
2147 kdu_name=kdur["kdu-name"]))
2148
2149 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2150 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2151
2152 index += 1
2153
2154 except (LcmException, asyncio.CancelledError):
2155 raise
2156 except Exception as e:
2157 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2158 if isinstance(e, (N2VCException, DbException)):
2159 self.logger.error(logging_text + msg)
2160 else:
2161 self.logger.critical(logging_text + msg, exc_info=True)
2162 raise LcmException(msg)
2163 finally:
2164 if db_nsr_update:
2165 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2166
2167 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2168 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2169 base_folder, task_instantiation_info, stage):
2170 # launch instantiate_N2VC in a asyncio task and register task object
2171 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2172 # if not found, create one entry and update database
2173
2174 # fill db_nsr._admin.deployed.VCA.<index>
2175 vca_index = -1
2176 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2177 if not vca_deployed:
2178 continue
2179 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2180 vca_deployed.get("vdu_id") == vdu_id and \
2181 vca_deployed.get("kdu_name") == kdu_name and \
2182 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2183 break
2184 else:
2185 # not found, create one.
2186 vca_deployed = {
2187 "member-vnf-index": member_vnf_index,
2188 "vdu_id": vdu_id,
2189 "kdu_name": kdu_name,
2190 "vdu_count_index": vdu_index,
2191 "operational-status": "init", # TODO revise
2192 "detailed-status": "", # TODO revise
2193 "step": "initial-deploy", # TODO revise
2194 "vnfd_id": vnfd_id,
2195 "vdu_name": vdu_name,
2196 }
2197 vca_index += 1
2198
2199 # create VCA and configurationStatus in db
2200 db_dict = {
2201 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2202 "configurationStatus.{}".format(vca_index): dict()
2203 }
2204 self.update_db_2("nsrs", nsr_id, db_dict)
2205
2206 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2207
2208 # Launch task
2209 task_n2vc = asyncio.ensure_future(
2210 self.instantiate_N2VC(
2211 logging_text=logging_text,
2212 vca_index=vca_index,
2213 nsi_id=nsi_id,
2214 db_nsr=db_nsr,
2215 db_vnfr=db_vnfr,
2216 vdu_id=vdu_id,
2217 kdu_name=kdu_name,
2218 vdu_index=vdu_index,
2219 deploy_params=deploy_params,
2220 config_descriptor=descriptor_config,
2221 base_folder=base_folder,
2222 nslcmop_id=nslcmop_id,
2223 stage=stage
2224 )
2225 )
2226 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2227 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2228 member_vnf_index or "", vdu_id or "")
2229
2230 # Check if this VNFD has a configured terminate action
2231 def _has_terminate_config_primitive(self, vnfd):
2232 vnf_config = vnfd.get("vnf-configuration")
2233 if vnf_config and vnf_config.get("terminate-config-primitive"):
2234 return True
2235 else:
2236 return False
2237
2238 @staticmethod
2239 def _get_terminate_config_primitive_seq_list(vnfd):
2240 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2241 # No need to check for existing primitive twice, already done before
2242 vnf_config = vnfd.get("vnf-configuration")
2243 seq_list = vnf_config.get("terminate-config-primitive")
2244 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2245 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2246 return seq_list_sorted
2247
2248 @staticmethod
2249 def _create_nslcmop(nsr_id, operation, params):
2250 """
2251 Creates a ns-lcm-opp content to be stored at database.
2252 :param nsr_id: internal id of the instance
2253 :param operation: instantiate, terminate, scale, action, ...
2254 :param params: user parameters for the operation
2255 :return: dictionary following SOL005 format
2256 """
2257 # Raise exception if invalid arguments
2258 if not (nsr_id and operation and params):
2259 raise LcmException(
2260 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2261 now = time()
2262 _id = str(uuid4())
2263 nslcmop = {
2264 "id": _id,
2265 "_id": _id,
2266 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2267 "operationState": "PROCESSING",
2268 "statusEnteredTime": now,
2269 "nsInstanceId": nsr_id,
2270 "lcmOperationType": operation,
2271 "startTime": now,
2272 "isAutomaticInvocation": False,
2273 "operationParams": params,
2274 "isCancelPending": False,
2275 "links": {
2276 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2277 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2278 }
2279 }
2280 return nslcmop
2281
2282 def _format_additional_params(self, params):
2283 params = params or {}
2284 for key, value in params.items():
2285 if str(value).startswith("!!yaml "):
2286 params[key] = yaml.safe_load(value[7:])
2287 return params
2288
2289 def _get_terminate_primitive_params(self, seq, vnf_index):
2290 primitive = seq.get('name')
2291 primitive_params = {}
2292 params = {
2293 "member_vnf_index": vnf_index,
2294 "primitive": primitive,
2295 "primitive_params": primitive_params,
2296 }
2297 desc_params = {}
2298 return self._map_primitive_params(seq, params, desc_params)
2299
2300 # sub-operations
2301
2302 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2303 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2304 if (op.get('operationState') == 'COMPLETED'):
2305 # b. Skip sub-operation
2306 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2307 return self.SUBOPERATION_STATUS_SKIP
2308 else:
2309 # c. Reintent executing sub-operation
2310 # The sub-operation exists, and operationState != 'COMPLETED'
2311 # Update operationState = 'PROCESSING' to indicate a reintent.
2312 operationState = 'PROCESSING'
2313 detailed_status = 'In progress'
2314 self._update_suboperation_status(
2315 db_nslcmop, op_index, operationState, detailed_status)
2316 # Return the sub-operation index
2317 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2318 # with arguments extracted from the sub-operation
2319 return op_index
2320
2321 # Find a sub-operation where all keys in a matching dictionary must match
2322 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2323 def _find_suboperation(self, db_nslcmop, match):
2324 if (db_nslcmop and match):
2325 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2326 for i, op in enumerate(op_list):
2327 if all(op.get(k) == match[k] for k in match):
2328 return i
2329 return self.SUBOPERATION_STATUS_NOT_FOUND
2330
2331 # Update status for a sub-operation given its index
2332 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2333 # Update DB for HA tasks
2334 q_filter = {'_id': db_nslcmop['_id']}
2335 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2336 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2337 self.db.set_one("nslcmops",
2338 q_filter=q_filter,
2339 update_dict=update_dict,
2340 fail_on_empty=False)
2341
2342 # Add sub-operation, return the index of the added sub-operation
2343 # Optionally, set operationState, detailed-status, and operationType
2344 # Status and type are currently set for 'scale' sub-operations:
2345 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2346 # 'detailed-status' : status message
2347 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2348 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2349 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2350 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2351 RO_nsr_id=None, RO_scaling_info=None):
2352 if not db_nslcmop:
2353 return self.SUBOPERATION_STATUS_NOT_FOUND
2354 # Get the "_admin.operations" list, if it exists
2355 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2356 op_list = db_nslcmop_admin.get('operations')
2357 # Create or append to the "_admin.operations" list
2358 new_op = {'member_vnf_index': vnf_index,
2359 'vdu_id': vdu_id,
2360 'vdu_count_index': vdu_count_index,
2361 'primitive': primitive,
2362 'primitive_params': mapped_primitive_params}
2363 if operationState:
2364 new_op['operationState'] = operationState
2365 if detailed_status:
2366 new_op['detailed-status'] = detailed_status
2367 if operationType:
2368 new_op['lcmOperationType'] = operationType
2369 if RO_nsr_id:
2370 new_op['RO_nsr_id'] = RO_nsr_id
2371 if RO_scaling_info:
2372 new_op['RO_scaling_info'] = RO_scaling_info
2373 if not op_list:
2374 # No existing operations, create key 'operations' with current operation as first list element
2375 db_nslcmop_admin.update({'operations': [new_op]})
2376 op_list = db_nslcmop_admin.get('operations')
2377 else:
2378 # Existing operations, append operation to list
2379 op_list.append(new_op)
2380
2381 db_nslcmop_update = {'_admin.operations': op_list}
2382 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2383 op_index = len(op_list) - 1
2384 return op_index
2385
2386 # Helper methods for scale() sub-operations
2387
2388 # pre-scale/post-scale:
2389 # Check for 3 different cases:
2390 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2391 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2392 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2393 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2394 operationType, RO_nsr_id=None, RO_scaling_info=None):
2395 # Find this sub-operation
2396 if (RO_nsr_id and RO_scaling_info):
2397 operationType = 'SCALE-RO'
2398 match = {
2399 'member_vnf_index': vnf_index,
2400 'RO_nsr_id': RO_nsr_id,
2401 'RO_scaling_info': RO_scaling_info,
2402 }
2403 else:
2404 match = {
2405 'member_vnf_index': vnf_index,
2406 'primitive': vnf_config_primitive,
2407 'primitive_params': primitive_params,
2408 'lcmOperationType': operationType
2409 }
2410 op_index = self._find_suboperation(db_nslcmop, match)
2411 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2412 # a. New sub-operation
2413 # The sub-operation does not exist, add it.
2414 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2415 # The following parameters are set to None for all kind of scaling:
2416 vdu_id = None
2417 vdu_count_index = None
2418 vdu_name = None
2419 if (RO_nsr_id and RO_scaling_info):
2420 vnf_config_primitive = None
2421 primitive_params = None
2422 else:
2423 RO_nsr_id = None
2424 RO_scaling_info = None
2425 # Initial status for sub-operation
2426 operationState = 'PROCESSING'
2427 detailed_status = 'In progress'
2428 # Add sub-operation for pre/post-scaling (zero or more operations)
2429 self._add_suboperation(db_nslcmop,
2430 vnf_index,
2431 vdu_id,
2432 vdu_count_index,
2433 vdu_name,
2434 vnf_config_primitive,
2435 primitive_params,
2436 operationState,
2437 detailed_status,
2438 operationType,
2439 RO_nsr_id,
2440 RO_scaling_info)
2441 return self.SUBOPERATION_STATUS_NEW
2442 else:
2443 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2444 # or op_index (operationState != 'COMPLETED')
2445 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2446
2447 # Function to return execution_environment id
2448
2449 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2450 # TODO vdu_index_count
2451 for vca in vca_deployed_list:
2452 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2453 return vca["ee_id"]
2454
2455 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2456 """
2457 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2458 :param logging_text:
2459 :param db_nslcmop:
2460 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2461 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2462 :param vca_index: index in the database _admin.deployed.VCA
2463 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2464 :return: None or exception
2465 """
2466 # execute terminate_primitives
2467 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2468 vdu_id = vca_deployed.get("vdu_id")
2469 vdu_count_index = vca_deployed.get("vdu_count_index")
2470 vdu_name = vca_deployed.get("vdu_name")
2471 vnf_index = vca_deployed.get("member-vnf-index")
2472 if terminate_primitives and vca_deployed.get("needed_terminate"):
2473 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2474 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2475 for seq in terminate_primitives:
2476 # For each sequence in list, get primitive and call _ns_execute_primitive()
2477 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2478 vnf_index, seq.get("name"))
2479 self.logger.debug(logging_text + step)
2480 # Create the primitive for each sequence, i.e. "primitive": "touch"
2481 primitive = seq.get('name')
2482 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2483 # The following 3 parameters are currently set to None for 'terminate':
2484 # vdu_id, vdu_count_index, vdu_name
2485
2486 # Add sub-operation
2487 self._add_suboperation(db_nslcmop,
2488 vnf_index,
2489 vdu_id,
2490 vdu_count_index,
2491 vdu_name,
2492 primitive,
2493 mapped_primitive_params)
2494 # Sub-operations: Call _ns_execute_primitive() instead of action()
2495 try:
2496 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2497 mapped_primitive_params)
2498 except LcmException:
2499 # this happens when VCA is not deployed. In this case it is not needed to terminate
2500 continue
2501 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2502 if result not in result_ok:
2503 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2504 "error {}".format(seq.get("name"), vnf_index, result_detail))
2505 # set that this VCA do not need terminated
2506 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2507 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2508
2509 if destroy_ee:
2510 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2511
2512 async def _delete_N2VC(self, nsr_id: str):
2513 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2514 namespace = "." + nsr_id
2515 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2516 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2517
2518 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2519 """
2520 Terminates a deployment from RO
2521 :param logging_text:
2522 :param nsr_deployed: db_nsr._admin.deployed
2523 :param nsr_id:
2524 :param nslcmop_id:
2525 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2526 this method will update only the index 2, but it will write on database the concatenated content of the list
2527 :return:
2528 """
2529 db_nsr_update = {}
2530 failed_detail = []
2531 ro_nsr_id = ro_delete_action = None
2532 if nsr_deployed and nsr_deployed.get("RO"):
2533 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2534 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2535 try:
2536 if ro_nsr_id:
2537 stage[2] = "Deleting ns from VIM."
2538 db_nsr_update["detailed-status"] = " ".join(stage)
2539 self._write_op_status(nslcmop_id, stage)
2540 self.logger.debug(logging_text + stage[2])
2541 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2542 self._write_op_status(nslcmop_id, stage)
2543 desc = await self.RO.delete("ns", ro_nsr_id)
2544 ro_delete_action = desc["action_id"]
2545 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2546 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2547 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2548 if ro_delete_action:
2549 # wait until NS is deleted from VIM
2550 stage[2] = "Waiting ns deleted from VIM."
2551 detailed_status_old = None
2552 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2553 ro_delete_action))
2554 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2555 self._write_op_status(nslcmop_id, stage)
2556
2557 delete_timeout = 20 * 60 # 20 minutes
2558 while delete_timeout > 0:
2559 desc = await self.RO.show(
2560 "ns",
2561 item_id_name=ro_nsr_id,
2562 extra_item="action",
2563 extra_item_id=ro_delete_action)
2564
2565 # deploymentStatus
2566 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2567
2568 ns_status, ns_status_info = self.RO.check_action_status(desc)
2569 if ns_status == "ERROR":
2570 raise ROclient.ROClientException(ns_status_info)
2571 elif ns_status == "BUILD":
2572 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2573 elif ns_status == "ACTIVE":
2574 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2575 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2576 break
2577 else:
2578 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2579 if stage[2] != detailed_status_old:
2580 detailed_status_old = stage[2]
2581 db_nsr_update["detailed-status"] = " ".join(stage)
2582 self._write_op_status(nslcmop_id, stage)
2583 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2584 await asyncio.sleep(5, loop=self.loop)
2585 delete_timeout -= 5
2586 else: # delete_timeout <= 0:
2587 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2588
2589 except Exception as e:
2590 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2591 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2592 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2593 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2594 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2595 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2596 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2597 failed_detail.append("delete conflict: {}".format(e))
2598 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2599 else:
2600 failed_detail.append("delete error: {}".format(e))
2601 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2602
2603 # Delete nsd
2604 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2605 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2606 try:
2607 stage[2] = "Deleting nsd from RO."
2608 db_nsr_update["detailed-status"] = " ".join(stage)
2609 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2610 self._write_op_status(nslcmop_id, stage)
2611 await self.RO.delete("nsd", ro_nsd_id)
2612 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2613 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2614 except Exception as e:
2615 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2616 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2617 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2618 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2619 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2620 self.logger.debug(logging_text + failed_detail[-1])
2621 else:
2622 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2623 self.logger.error(logging_text + failed_detail[-1])
2624
2625 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2626 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2627 if not vnf_deployed or not vnf_deployed["id"]:
2628 continue
2629 try:
2630 ro_vnfd_id = vnf_deployed["id"]
2631 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2632 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2633 db_nsr_update["detailed-status"] = " ".join(stage)
2634 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2635 self._write_op_status(nslcmop_id, stage)
2636 await self.RO.delete("vnfd", ro_vnfd_id)
2637 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2638 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2639 except Exception as e:
2640 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2641 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2642 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2643 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2644 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2645 self.logger.debug(logging_text + failed_detail[-1])
2646 else:
2647 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2648 self.logger.error(logging_text + failed_detail[-1])
2649
2650 if failed_detail:
2651 stage[2] = "Error deleting from VIM"
2652 else:
2653 stage[2] = "Deleted from VIM"
2654 db_nsr_update["detailed-status"] = " ".join(stage)
2655 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2656 self._write_op_status(nslcmop_id, stage)
2657
2658 if failed_detail:
2659 raise LcmException("; ".join(failed_detail))
2660
2661 async def terminate(self, nsr_id, nslcmop_id):
2662 # Try to lock HA task here
2663 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2664 if not task_is_locked_by_me:
2665 return
2666
2667 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2668 self.logger.debug(logging_text + "Enter")
2669 timeout_ns_terminate = self.timeout_ns_terminate
2670 db_nsr = None
2671 db_nslcmop = None
2672 exc = None
2673 error_list = [] # annotates all failed error messages
2674 db_nslcmop_update = {}
2675 autoremove = False # autoremove after terminated
2676 tasks_dict_info = {}
2677 db_nsr_update = {}
2678 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2679 # ^ contains [stage, step, VIM-status]
2680 try:
2681 # wait for any previous tasks in process
2682 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2683
2684 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2685 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2686 operation_params = db_nslcmop.get("operationParams") or {}
2687 if operation_params.get("timeout_ns_terminate"):
2688 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2689 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2690 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2691
2692 db_nsr_update["operational-status"] = "terminating"
2693 db_nsr_update["config-status"] = "terminating"
2694 self._write_ns_status(
2695 nsr_id=nsr_id,
2696 ns_state="TERMINATING",
2697 current_operation="TERMINATING",
2698 current_operation_id=nslcmop_id,
2699 other_update=db_nsr_update
2700 )
2701 self._write_op_status(
2702 op_id=nslcmop_id,
2703 queuePosition=0,
2704 stage=stage
2705 )
2706 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2707 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2708 return
2709
2710 stage[1] = "Getting vnf descriptors from db."
2711 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2712 db_vnfds_from_id = {}
2713 db_vnfds_from_member_index = {}
2714 # Loop over VNFRs
2715 for vnfr in db_vnfrs_list:
2716 vnfd_id = vnfr["vnfd-id"]
2717 if vnfd_id not in db_vnfds_from_id:
2718 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2719 db_vnfds_from_id[vnfd_id] = vnfd
2720 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2721
2722 # Destroy individual execution environments when there are terminating primitives.
2723 # Rest of EE will be deleted at once
2724 if not operation_params.get("skip_terminate_primitives"):
2725 stage[0] = "Stage 2/3 execute terminating primitives."
2726 stage[1] = "Looking execution environment that needs terminate."
2727 self.logger.debug(logging_text + stage[1])
2728 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2729 config_descriptor = None
2730 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2731 continue
2732 if not vca.get("member-vnf-index"):
2733 # ns
2734 config_descriptor = db_nsr.get("ns-configuration")
2735 elif vca.get("vdu_id"):
2736 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2737 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2738 if vdud:
2739 config_descriptor = vdud.get("vdu-configuration")
2740 elif vca.get("kdu_name"):
2741 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2742 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2743 if kdud:
2744 config_descriptor = kdud.get("kdu-configuration")
2745 else:
2746 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2747 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2748 vca_index, False))
2749 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2750
2751 # wait for pending tasks of terminate primitives
2752 if tasks_dict_info:
2753 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2754 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2755 min(self.timeout_charm_delete, timeout_ns_terminate),
2756 stage, nslcmop_id)
2757 if error_list:
2758 return # raise LcmException("; ".join(error_list))
2759 tasks_dict_info.clear()
2760
2761 # remove All execution environments at once
2762 stage[0] = "Stage 3/3 delete all."
2763 stage[1] = "Deleting all execution environments."
2764 self.logger.debug(logging_text + stage[1])
2765
2766 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_N2VC(nsr_id=nsr_id),
2767 timeout=self.timeout_charm_delete))
2768 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2769 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2770
2771 # Delete from k8scluster
2772 stage[1] = "Deleting KDUs."
2773 self.logger.debug(logging_text + stage[1])
2774 # print(nsr_deployed)
2775 for kdu in get_iterable(nsr_deployed, "K8s"):
2776 if not kdu or not kdu.get("kdu-instance"):
2777 continue
2778 kdu_instance = kdu.get("kdu-instance")
2779 if kdu.get("k8scluster-type") in self.k8scluster_map:
2780 task_delete_kdu_instance = asyncio.ensure_future(
2781 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2782 cluster_uuid=kdu.get("k8scluster-uuid"),
2783 kdu_instance=kdu_instance))
2784 else:
2785 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2786 format(kdu.get("k8scluster-type")))
2787 continue
2788 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2789
2790 # remove from RO
2791 stage[1] = "Deleting ns from VIM."
2792 task_delete_ro = asyncio.ensure_future(
2793 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2794 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2795
2796 # rest of staff will be done at finally
2797
2798 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2799 self.logger.error(logging_text + "Exit Exception {}".format(e))
2800 exc = e
2801 except asyncio.CancelledError:
2802 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2803 exc = "Operation was cancelled"
2804 except Exception as e:
2805 exc = traceback.format_exc()
2806 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2807 finally:
2808 if exc:
2809 error_list.append(str(exc))
2810 try:
2811 # wait for pending tasks
2812 if tasks_dict_info:
2813 stage[1] = "Waiting for terminate pending tasks."
2814 self.logger.debug(logging_text + stage[1])
2815 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2816 stage, nslcmop_id)
2817 stage[1] = stage[2] = ""
2818 except asyncio.CancelledError:
2819 error_list.append("Cancelled")
2820 # TODO cancell all tasks
2821 except Exception as exc:
2822 error_list.append(str(exc))
2823 # update status at database
2824 if error_list:
2825 error_detail = "; ".join(error_list)
2826 # self.logger.error(logging_text + error_detail)
2827 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2828 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2829
2830 db_nsr_update["operational-status"] = "failed"
2831 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2832 db_nslcmop_update["detailed-status"] = error_detail
2833 nslcmop_operation_state = "FAILED"
2834 ns_state = "BROKEN"
2835 else:
2836 error_detail = None
2837 error_description_nsr = error_description_nslcmop = None
2838 ns_state = "NOT_INSTANTIATED"
2839 db_nsr_update["operational-status"] = "terminated"
2840 db_nsr_update["detailed-status"] = "Done"
2841 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2842 db_nslcmop_update["detailed-status"] = "Done"
2843 nslcmop_operation_state = "COMPLETED"
2844
2845 if db_nsr:
2846 self._write_ns_status(
2847 nsr_id=nsr_id,
2848 ns_state=ns_state,
2849 current_operation="IDLE",
2850 current_operation_id=None,
2851 error_description=error_description_nsr,
2852 error_detail=error_detail,
2853 other_update=db_nsr_update
2854 )
2855 if db_nslcmop:
2856 self._write_op_status(
2857 op_id=nslcmop_id,
2858 stage="",
2859 error_message=error_description_nslcmop,
2860 operation_state=nslcmop_operation_state,
2861 other_update=db_nslcmop_update,
2862 )
2863 autoremove = operation_params.get("autoremove", False)
2864 if nslcmop_operation_state:
2865 try:
2866 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2867 "operationState": nslcmop_operation_state,
2868 "autoremove": autoremove},
2869 loop=self.loop)
2870 except Exception as e:
2871 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2872
2873 self.logger.debug(logging_text + "Exit")
2874 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2875
2876 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2877 time_start = time()
2878 error_detail_list = []
2879 error_list = []
2880 pending_tasks = list(created_tasks_info.keys())
2881 num_tasks = len(pending_tasks)
2882 num_done = 0
2883 stage[1] = "{}/{}.".format(num_done, num_tasks)
2884 self._write_op_status(nslcmop_id, stage)
2885 while pending_tasks:
2886 new_error = None
2887 _timeout = timeout + time_start - time()
2888 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2889 return_when=asyncio.FIRST_COMPLETED)
2890 num_done += len(done)
2891 if not done: # Timeout
2892 for task in pending_tasks:
2893 new_error = created_tasks_info[task] + ": Timeout"
2894 error_detail_list.append(new_error)
2895 error_list.append(new_error)
2896 break
2897 for task in done:
2898 if task.cancelled():
2899 exc = "Cancelled"
2900 else:
2901 exc = task.exception()
2902 if exc:
2903 if isinstance(exc, asyncio.TimeoutError):
2904 exc = "Timeout"
2905 new_error = created_tasks_info[task] + ": {}".format(exc)
2906 error_list.append(created_tasks_info[task])
2907 error_detail_list.append(new_error)
2908 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
2909 self.logger.error(logging_text + new_error)
2910 else:
2911 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2912 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2913 else:
2914 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2915 stage[1] = "{}/{}.".format(num_done, num_tasks)
2916 if new_error:
2917 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2918 if nsr_id: # update also nsr
2919 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2920 "errorDetail": ". ".join(error_detail_list)})
2921 self._write_op_status(nslcmop_id, stage)
2922 return error_detail_list
2923
2924 @staticmethod
2925 def _map_primitive_params(primitive_desc, params, instantiation_params):
2926 """
2927 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2928 The default-value is used. If it is between < > it look for a value at instantiation_params
2929 :param primitive_desc: portion of VNFD/NSD that describes primitive
2930 :param params: Params provided by user
2931 :param instantiation_params: Instantiation params provided by user
2932 :return: a dictionary with the calculated params
2933 """
2934 calculated_params = {}
2935 for parameter in primitive_desc.get("parameter", ()):
2936 param_name = parameter["name"]
2937 if param_name in params:
2938 calculated_params[param_name] = params[param_name]
2939 elif "default-value" in parameter or "value" in parameter:
2940 if "value" in parameter:
2941 calculated_params[param_name] = parameter["value"]
2942 else:
2943 calculated_params[param_name] = parameter["default-value"]
2944 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2945 and calculated_params[param_name].endswith(">"):
2946 if calculated_params[param_name][1:-1] in instantiation_params:
2947 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2948 else:
2949 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2950 format(calculated_params[param_name], primitive_desc["name"]))
2951 else:
2952 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2953 format(param_name, primitive_desc["name"]))
2954
2955 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2956 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2957 width=256)
2958 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2959 calculated_params[param_name] = calculated_params[param_name][7:]
2960
2961 # add always ns_config_info if primitive name is config
2962 if primitive_desc["name"] == "config":
2963 if "ns_config_info" in instantiation_params:
2964 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2965 return calculated_params
2966
2967 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
2968 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2969 for vca in deployed_vca:
2970 if not vca:
2971 continue
2972 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2973 continue
2974 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2975 continue
2976 if kdu_name and kdu_name != vca["kdu_name"]:
2977 continue
2978 break
2979 else:
2980 # vca_deployed not found
2981 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
2982 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
2983
2984 # get ee_id
2985 ee_id = vca.get("ee_id")
2986 if not ee_id:
2987 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
2988 "execution environment"
2989 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
2990 return ee_id
2991
2992 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
2993 retries_interval=30, timeout=None) -> (str, str):
2994 try:
2995 if primitive == "config":
2996 primitive_params = {"params": primitive_params}
2997
2998 while retries >= 0:
2999 try:
3000 output = await asyncio.wait_for(
3001 self.n2vc.exec_primitive(
3002 ee_id=ee_id,
3003 primitive_name=primitive,
3004 params_dict=primitive_params,
3005 progress_timeout=self.timeout_progress_primitive,
3006 total_timeout=self.timeout_primitive),
3007 timeout=timeout or self.timeout_primitive)
3008 # execution was OK
3009 break
3010 except asyncio.CancelledError:
3011 raise
3012 except Exception as e: # asyncio.TimeoutError
3013 if isinstance(e, asyncio.TimeoutError):
3014 e = "Timeout"
3015 retries -= 1
3016 if retries >= 0:
3017 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3018 # wait and retry
3019 await asyncio.sleep(retries_interval, loop=self.loop)
3020 else:
3021 return 'FAILED', str(e)
3022
3023 return 'COMPLETED', output
3024
3025 except (LcmException, asyncio.CancelledError):
3026 raise
3027 except Exception as e:
3028 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3029
3030 async def action(self, nsr_id, nslcmop_id):
3031
3032 # Try to lock HA task here
3033 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3034 if not task_is_locked_by_me:
3035 return
3036
3037 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3038 self.logger.debug(logging_text + "Enter")
3039 # get all needed from database
3040 db_nsr = None
3041 db_nslcmop = None
3042 db_nsr_update = {}
3043 db_nslcmop_update = {}
3044 nslcmop_operation_state = None
3045 error_description_nslcmop = None
3046 exc = None
3047 try:
3048 # wait for any previous tasks in process
3049 step = "Waiting for previous operations to terminate"
3050 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3051
3052 self._write_ns_status(
3053 nsr_id=nsr_id,
3054 ns_state=None,
3055 current_operation="RUNNING ACTION",
3056 current_operation_id=nslcmop_id
3057 )
3058
3059 step = "Getting information from database"
3060 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3061 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3062
3063 nsr_deployed = db_nsr["_admin"].get("deployed")
3064 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3065 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3066 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3067 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3068 primitive = db_nslcmop["operationParams"]["primitive"]
3069 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3070 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3071
3072 if vnf_index:
3073 step = "Getting vnfr from database"
3074 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3075 step = "Getting vnfd from database"
3076 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3077 else:
3078 step = "Getting nsd from database"
3079 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3080
3081 # for backward compatibility
3082 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3083 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3084 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3085 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3086
3087 # look for primitive
3088 config_primitive_desc = None
3089 if vdu_id:
3090 for vdu in get_iterable(db_vnfd, "vdu"):
3091 if vdu_id == vdu["id"]:
3092 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3093 if config_primitive["name"] == primitive:
3094 config_primitive_desc = config_primitive
3095 break
3096 break
3097 elif kdu_name:
3098 for kdu in get_iterable(db_vnfd, "kdu"):
3099 if kdu_name == kdu["name"]:
3100 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3101 if config_primitive["name"] == primitive:
3102 config_primitive_desc = config_primitive
3103 break
3104 break
3105 elif vnf_index:
3106 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3107 if config_primitive["name"] == primitive:
3108 config_primitive_desc = config_primitive
3109 break
3110 else:
3111 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3112 if config_primitive["name"] == primitive:
3113 config_primitive_desc = config_primitive
3114 break
3115
3116 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3117 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3118 format(primitive))
3119
3120 if vnf_index:
3121 if vdu_id:
3122 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3123 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3124 elif kdu_name:
3125 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3126 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3127 else:
3128 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3129 else:
3130 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3131
3132 # TODO check if ns is in a proper status
3133 if kdu_name and primitive in ("upgrade", "rollback", "status"):
3134 # kdur and desc_params already set from before
3135 if primitive_params:
3136 desc_params.update(primitive_params)
3137 # TODO Check if we will need something at vnf level
3138 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3139 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3140 break
3141 else:
3142 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3143
3144 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3145 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3146 raise LcmException(msg)
3147
3148 db_dict = {"collection": "nsrs",
3149 "filter": {"_id": nsr_id},
3150 "path": "_admin.deployed.K8s.{}".format(index)}
3151 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3152 step = "Executing kdu {}".format(primitive)
3153 if primitive == "upgrade":
3154 if desc_params.get("kdu_model"):
3155 kdu_model = desc_params.get("kdu_model")
3156 del desc_params["kdu_model"]
3157 else:
3158 kdu_model = kdu.get("kdu-model")
3159 parts = kdu_model.split(sep=":")
3160 if len(parts) == 2:
3161 kdu_model = parts[0]
3162
3163 detailed_status = await asyncio.wait_for(
3164 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3165 cluster_uuid=kdu.get("k8scluster-uuid"),
3166 kdu_instance=kdu.get("kdu-instance"),
3167 atomic=True, kdu_model=kdu_model,
3168 params=desc_params, db_dict=db_dict,
3169 timeout=timeout_ns_action),
3170 timeout=timeout_ns_action + 10)
3171 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3172 elif primitive == "rollback":
3173 detailed_status = await asyncio.wait_for(
3174 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3175 cluster_uuid=kdu.get("k8scluster-uuid"),
3176 kdu_instance=kdu.get("kdu-instance"),
3177 db_dict=db_dict),
3178 timeout=timeout_ns_action)
3179 elif primitive == "status":
3180 detailed_status = await asyncio.wait_for(
3181 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3182 cluster_uuid=kdu.get("k8scluster-uuid"),
3183 kdu_instance=kdu.get("kdu-instance")),
3184 timeout=timeout_ns_action)
3185
3186 if detailed_status:
3187 nslcmop_operation_state = 'COMPLETED'
3188 else:
3189 detailed_status = ''
3190 nslcmop_operation_state = 'FAILED'
3191
3192 else:
3193 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3194 self._look_for_deployed_vca(nsr_deployed["VCA"],
3195 member_vnf_index=vnf_index,
3196 vdu_id=vdu_id,
3197 vdu_count_index=vdu_count_index),
3198 primitive=primitive,
3199 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3200 timeout=timeout_ns_action)
3201
3202 db_nslcmop_update["detailed-status"] = detailed_status
3203 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3204 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3205 detailed_status))
3206 return # database update is called inside finally
3207
3208 except (DbException, LcmException, N2VCException) as e:
3209 self.logger.error(logging_text + "Exit Exception {}".format(e))
3210 exc = e
3211 except asyncio.CancelledError:
3212 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3213 exc = "Operation was cancelled"
3214 except asyncio.TimeoutError:
3215 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3216 exc = "Timeout"
3217 except Exception as e:
3218 exc = traceback.format_exc()
3219 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3220 finally:
3221 if exc:
3222 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3223 "FAILED {}: {}".format(step, exc)
3224 nslcmop_operation_state = "FAILED"
3225 if db_nsr:
3226 self._write_ns_status(
3227 nsr_id=nsr_id,
3228 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3229 current_operation="IDLE",
3230 current_operation_id=None,
3231 # error_description=error_description_nsr,
3232 # error_detail=error_detail,
3233 other_update=db_nsr_update
3234 )
3235
3236 if db_nslcmop:
3237 self._write_op_status(
3238 op_id=nslcmop_id,
3239 stage="",
3240 error_message=error_description_nslcmop,
3241 operation_state=nslcmop_operation_state,
3242 other_update=db_nslcmop_update,
3243 )
3244
3245 if nslcmop_operation_state:
3246 try:
3247 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3248 "operationState": nslcmop_operation_state},
3249 loop=self.loop)
3250 except Exception as e:
3251 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3252 self.logger.debug(logging_text + "Exit")
3253 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3254 return nslcmop_operation_state, detailed_status
3255
3256 async def scale(self, nsr_id, nslcmop_id):
3257
3258 # Try to lock HA task here
3259 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3260 if not task_is_locked_by_me:
3261 return
3262
3263 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3264 self.logger.debug(logging_text + "Enter")
3265 # get all needed from database
3266 db_nsr = None
3267 db_nslcmop = None
3268 db_nslcmop_update = {}
3269 nslcmop_operation_state = None
3270 db_nsr_update = {}
3271 exc = None
3272 # in case of error, indicates what part of scale was failed to put nsr at error status
3273 scale_process = None
3274 old_operational_status = ""
3275 old_config_status = ""
3276 vnfr_scaled = False
3277 try:
3278 # wait for any previous tasks in process
3279 step = "Waiting for previous operations to terminate"
3280 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3281
3282 self._write_ns_status(
3283 nsr_id=nsr_id,
3284 ns_state=None,
3285 current_operation="SCALING",
3286 current_operation_id=nslcmop_id
3287 )
3288
3289 step = "Getting nslcmop from database"
3290 self.logger.debug(step + " after having waited for previous tasks to be completed")
3291 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3292 step = "Getting nsr from database"
3293 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3294
3295 old_operational_status = db_nsr["operational-status"]
3296 old_config_status = db_nsr["config-status"]
3297 step = "Parsing scaling parameters"
3298 # self.logger.debug(step)
3299 db_nsr_update["operational-status"] = "scaling"
3300 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3301 nsr_deployed = db_nsr["_admin"].get("deployed")
3302
3303 #######
3304 nsr_deployed = db_nsr["_admin"].get("deployed")
3305 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3306 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3307 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3308 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3309 #######
3310
3311 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3312 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3313 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3314 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3315 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3316
3317 # for backward compatibility
3318 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3319 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3320 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3321 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3322
3323 step = "Getting vnfr from database"
3324 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3325 step = "Getting vnfd from database"
3326 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3327
3328 step = "Getting scaling-group-descriptor"
3329 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3330 if scaling_descriptor["name"] == scaling_group:
3331 break
3332 else:
3333 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3334 "at vnfd:scaling-group-descriptor".format(scaling_group))
3335
3336 # cooldown_time = 0
3337 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3338 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3339 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3340 # break
3341
3342 # TODO check if ns is in a proper status
3343 step = "Sending scale order to VIM"
3344 nb_scale_op = 0
3345 if not db_nsr["_admin"].get("scaling-group"):
3346 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3347 admin_scale_index = 0
3348 else:
3349 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3350 if admin_scale_info["name"] == scaling_group:
3351 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3352 break
3353 else: # not found, set index one plus last element and add new entry with the name
3354 admin_scale_index += 1
3355 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3356 RO_scaling_info = []
3357 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3358 if scaling_type == "SCALE_OUT":
3359 # count if max-instance-count is reached
3360 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3361 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3362 if nb_scale_op >= max_instance_count:
3363 raise LcmException("reached the limit of {} (max-instance-count) "
3364 "scaling-out operations for the "
3365 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3366
3367 nb_scale_op += 1
3368 vdu_scaling_info["scaling_direction"] = "OUT"
3369 vdu_scaling_info["vdu-create"] = {}
3370 for vdu_scale_info in scaling_descriptor["vdu"]:
3371 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3372 "type": "create", "count": vdu_scale_info.get("count", 1)})
3373 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3374
3375 elif scaling_type == "SCALE_IN":
3376 # count if min-instance-count is reached
3377 min_instance_count = 0
3378 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3379 min_instance_count = int(scaling_descriptor["min-instance-count"])
3380 if nb_scale_op <= min_instance_count:
3381 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3382 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3383 nb_scale_op -= 1
3384 vdu_scaling_info["scaling_direction"] = "IN"
3385 vdu_scaling_info["vdu-delete"] = {}
3386 for vdu_scale_info in scaling_descriptor["vdu"]:
3387 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3388 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3389 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3390
3391 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3392 vdu_create = vdu_scaling_info.get("vdu-create")
3393 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3394 if vdu_scaling_info["scaling_direction"] == "IN":
3395 for vdur in reversed(db_vnfr["vdur"]):
3396 if vdu_delete.get(vdur["vdu-id-ref"]):
3397 vdu_delete[vdur["vdu-id-ref"]] -= 1
3398 vdu_scaling_info["vdu"].append({
3399 "name": vdur["name"],
3400 "vdu_id": vdur["vdu-id-ref"],
3401 "interface": []
3402 })
3403 for interface in vdur["interfaces"]:
3404 vdu_scaling_info["vdu"][-1]["interface"].append({
3405 "name": interface["name"],
3406 "ip_address": interface["ip-address"],
3407 "mac_address": interface.get("mac-address"),
3408 })
3409 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3410
3411 # PRE-SCALE BEGIN
3412 step = "Executing pre-scale vnf-config-primitive"
3413 if scaling_descriptor.get("scaling-config-action"):
3414 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3415 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3416 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3417 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3418 step = db_nslcmop_update["detailed-status"] = \
3419 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3420
3421 # look for primitive
3422 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3423 if config_primitive["name"] == vnf_config_primitive:
3424 break
3425 else:
3426 raise LcmException(
3427 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3428 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3429 "primitive".format(scaling_group, config_primitive))
3430
3431 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3432 if db_vnfr.get("additionalParamsForVnf"):
3433 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3434
3435 scale_process = "VCA"
3436 db_nsr_update["config-status"] = "configuring pre-scaling"
3437 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3438
3439 # Pre-scale reintent check: Check if this sub-operation has been executed before
3440 op_index = self._check_or_add_scale_suboperation(
3441 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3442 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3443 # Skip sub-operation
3444 result = 'COMPLETED'
3445 result_detail = 'Done'
3446 self.logger.debug(logging_text +
3447 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3448 vnf_config_primitive, result, result_detail))
3449 else:
3450 if (op_index == self.SUBOPERATION_STATUS_NEW):
3451 # New sub-operation: Get index of this sub-operation
3452 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3453 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3454 format(vnf_config_primitive))
3455 else:
3456 # Reintent: Get registered params for this existing sub-operation
3457 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3458 vnf_index = op.get('member_vnf_index')
3459 vnf_config_primitive = op.get('primitive')
3460 primitive_params = op.get('primitive_params')
3461 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3462 format(vnf_config_primitive))
3463 # Execute the primitive, either with new (first-time) or registered (reintent) args
3464 result, result_detail = await self._ns_execute_primitive(
3465 self._look_for_deployed_vca(nsr_deployed["VCA"],
3466 member_vnf_index=vnf_index,
3467 vdu_id=None,
3468 vdu_count_index=None),
3469 vnf_config_primitive, primitive_params)
3470 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3471 vnf_config_primitive, result, result_detail))
3472 # Update operationState = COMPLETED | FAILED
3473 self._update_suboperation_status(
3474 db_nslcmop, op_index, result, result_detail)
3475
3476 if result == "FAILED":
3477 raise LcmException(result_detail)
3478 db_nsr_update["config-status"] = old_config_status
3479 scale_process = None
3480 # PRE-SCALE END
3481
3482 # SCALE RO - BEGIN
3483 # Should this block be skipped if 'RO_nsr_id' == None ?
3484 # if (RO_nsr_id and RO_scaling_info):
3485 if RO_scaling_info:
3486 scale_process = "RO"
3487 # Scale RO reintent check: Check if this sub-operation has been executed before
3488 op_index = self._check_or_add_scale_suboperation(
3489 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3490 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3491 # Skip sub-operation
3492 result = 'COMPLETED'
3493 result_detail = 'Done'
3494 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3495 result, result_detail))
3496 else:
3497 if (op_index == self.SUBOPERATION_STATUS_NEW):
3498 # New sub-operation: Get index of this sub-operation
3499 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3500 self.logger.debug(logging_text + "New sub-operation RO")
3501 else:
3502 # Reintent: Get registered params for this existing sub-operation
3503 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3504 RO_nsr_id = op.get('RO_nsr_id')
3505 RO_scaling_info = op.get('RO_scaling_info')
3506 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3507 vnf_config_primitive))
3508
3509 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3510 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3511 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3512 # wait until ready
3513 RO_nslcmop_id = RO_desc["instance_action_id"]
3514 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3515
3516 RO_task_done = False
3517 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3518 detailed_status_old = None
3519 self.logger.debug(logging_text + step)
3520
3521 deployment_timeout = 1 * 3600 # One hour
3522 while deployment_timeout > 0:
3523 if not RO_task_done:
3524 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3525 extra_item_id=RO_nslcmop_id)
3526
3527 # deploymentStatus
3528 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3529
3530 ns_status, ns_status_info = self.RO.check_action_status(desc)
3531 if ns_status == "ERROR":
3532 raise ROclient.ROClientException(ns_status_info)
3533 elif ns_status == "BUILD":
3534 detailed_status = step + "; {}".format(ns_status_info)
3535 elif ns_status == "ACTIVE":
3536 RO_task_done = True
3537 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3538 self.logger.debug(logging_text + step)
3539 else:
3540 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3541 else:
3542
3543 if ns_status == "ERROR":
3544 raise ROclient.ROClientException(ns_status_info)
3545 elif ns_status == "BUILD":
3546 detailed_status = step + "; {}".format(ns_status_info)
3547 elif ns_status == "ACTIVE":
3548 step = detailed_status = \
3549 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3550 if not vnfr_scaled:
3551 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3552 vnfr_scaled = True
3553 try:
3554 desc = await self.RO.show("ns", RO_nsr_id)
3555
3556 # deploymentStatus
3557 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3558
3559 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3560 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3561 break
3562 except LcmExceptionNoMgmtIP:
3563 pass
3564 else:
3565 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3566 if detailed_status != detailed_status_old:
3567 self._update_suboperation_status(
3568 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3569 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3570 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3571
3572 await asyncio.sleep(5, loop=self.loop)
3573 deployment_timeout -= 5
3574 if deployment_timeout <= 0:
3575 self._update_suboperation_status(
3576 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3577 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3578
3579 # update VDU_SCALING_INFO with the obtained ip_addresses
3580 if vdu_scaling_info["scaling_direction"] == "OUT":
3581 for vdur in reversed(db_vnfr["vdur"]):
3582 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3583 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3584 vdu_scaling_info["vdu"].append({
3585 "name": vdur["name"],
3586 "vdu_id": vdur["vdu-id-ref"],
3587 "interface": []
3588 })
3589 for interface in vdur["interfaces"]:
3590 vdu_scaling_info["vdu"][-1]["interface"].append({
3591 "name": interface["name"],
3592 "ip_address": interface["ip-address"],
3593 "mac_address": interface.get("mac-address"),
3594 })
3595 del vdu_scaling_info["vdu-create"]
3596
3597 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3598 # SCALE RO - END
3599
3600 scale_process = None
3601 if db_nsr_update:
3602 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3603
3604 # POST-SCALE BEGIN
3605 # execute primitive service POST-SCALING
3606 step = "Executing post-scale vnf-config-primitive"
3607 if scaling_descriptor.get("scaling-config-action"):
3608 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3609 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3610 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3611 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3612 step = db_nslcmop_update["detailed-status"] = \
3613 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3614
3615 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3616 if db_vnfr.get("additionalParamsForVnf"):
3617 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3618
3619 # look for primitive
3620 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3621 if config_primitive["name"] == vnf_config_primitive:
3622 break
3623 else:
3624 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3625 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3626 "match any vnf-configuration:config-primitive".format(scaling_group,
3627 config_primitive))
3628 scale_process = "VCA"
3629 db_nsr_update["config-status"] = "configuring post-scaling"
3630 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3631
3632 # Post-scale reintent check: Check if this sub-operation has been executed before
3633 op_index = self._check_or_add_scale_suboperation(
3634 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3635 if op_index == self.SUBOPERATION_STATUS_SKIP:
3636 # Skip sub-operation
3637 result = 'COMPLETED'
3638 result_detail = 'Done'
3639 self.logger.debug(logging_text +
3640 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3641 format(vnf_config_primitive, result, result_detail))
3642 else:
3643 if op_index == self.SUBOPERATION_STATUS_NEW:
3644 # New sub-operation: Get index of this sub-operation
3645 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3646 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3647 format(vnf_config_primitive))
3648 else:
3649 # Reintent: Get registered params for this existing sub-operation
3650 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3651 vnf_index = op.get('member_vnf_index')
3652 vnf_config_primitive = op.get('primitive')
3653 primitive_params = op.get('primitive_params')
3654 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3655 format(vnf_config_primitive))
3656 # Execute the primitive, either with new (first-time) or registered (reintent) args
3657 result, result_detail = await self._ns_execute_primitive(
3658 self._look_for_deployed_vca(nsr_deployed["VCA"],
3659 member_vnf_index=vnf_index,
3660 vdu_id=None,
3661 vdu_count_index=None),
3662 vnf_config_primitive, primitive_params)
3663 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3664 vnf_config_primitive, result, result_detail))
3665 # Update operationState = COMPLETED | FAILED
3666 self._update_suboperation_status(
3667 db_nslcmop, op_index, result, result_detail)
3668
3669 if result == "FAILED":
3670 raise LcmException(result_detail)
3671 db_nsr_update["config-status"] = old_config_status
3672 scale_process = None
3673 # POST-SCALE END
3674
3675 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3676 db_nslcmop_update["statusEnteredTime"] = time()
3677 db_nslcmop_update["detailed-status"] = "done"
3678 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3679 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3680 else old_operational_status
3681 db_nsr_update["config-status"] = old_config_status
3682 return
3683 except (ROclient.ROClientException, DbException, LcmException) as e:
3684 self.logger.error(logging_text + "Exit Exception {}".format(e))
3685 exc = e
3686 except asyncio.CancelledError:
3687 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3688 exc = "Operation was cancelled"
3689 except Exception as e:
3690 exc = traceback.format_exc()
3691 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3692 finally:
3693 self._write_ns_status(
3694 nsr_id=nsr_id,
3695 ns_state=None,
3696 current_operation="IDLE",
3697 current_operation_id=None
3698 )
3699 if exc:
3700 if db_nslcmop:
3701 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3702 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3703 db_nslcmop_update["statusEnteredTime"] = time()
3704 if db_nsr:
3705 db_nsr_update["operational-status"] = old_operational_status
3706 db_nsr_update["config-status"] = old_config_status
3707 db_nsr_update["detailed-status"] = ""
3708 if scale_process:
3709 if "VCA" in scale_process:
3710 db_nsr_update["config-status"] = "failed"
3711 if "RO" in scale_process:
3712 db_nsr_update["operational-status"] = "failed"
3713 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3714 exc)
3715 try:
3716 if db_nslcmop and db_nslcmop_update:
3717 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3718 if db_nsr:
3719 self._write_ns_status(
3720 nsr_id=nsr_id,
3721 ns_state=None,
3722 current_operation="IDLE",
3723 current_operation_id=None,
3724 other_update=db_nsr_update
3725 )
3726
3727 except DbException as e:
3728 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3729 if nslcmop_operation_state:
3730 try:
3731 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3732 "operationState": nslcmop_operation_state},
3733 loop=self.loop)
3734 # if cooldown_time:
3735 # await asyncio.sleep(cooldown_time, loop=self.loop)
3736 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3737 except Exception as e:
3738 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3739 self.logger.debug(logging_text + "Exit")
3740 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")