fix terminate primitives. Enhance detailed-status report
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52
53 SUBOPERATION_STATUS_NOT_FOUND = -1
54 SUBOPERATION_STATUS_NEW = -2
55 SUBOPERATION_STATUS_SKIP = -3
56 task_name_deploy_vca = "Deploy VCA"
57
58 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 super().__init__(
65 db=db,
66 msg=msg,
67 fs=fs,
68 logger=logging.getLogger('lcm.ns')
69 )
70
71 self.loop = loop
72 self.lcm_tasks = lcm_tasks
73 self.timeout = config["timeout"]
74 self.ro_config = config["ro_config"]
75 self.vca_config = config["VCA"].copy()
76
77 # create N2VC connector
78 self.n2vc = N2VCJujuConnector(
79 db=self.db,
80 fs=self.fs,
81 log=self.logger,
82 loop=self.loop,
83 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
84 username=self.vca_config.get('user', None),
85 vca_config=self.vca_config,
86 on_update_db=self._on_update_n2vc_db
87 )
88
89 self.k8sclusterhelm = K8sHelmConnector(
90 kubectl_command=self.vca_config.get("kubectlpath"),
91 helm_command=self.vca_config.get("helmpath"),
92 fs=self.fs,
93 log=self.logger,
94 db=self.db,
95 on_update_db=None,
96 )
97
98 self.k8sclusterjuju = K8sJujuConnector(
99 kubectl_command=self.vca_config.get("kubectlpath"),
100 juju_command=self.vca_config.get("jujupath"),
101 fs=self.fs,
102 log=self.logger,
103 db=self.db,
104 on_update_db=None,
105 )
106
107 # create RO client
108 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
109
110 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
111
112 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
113
114 try:
115 # TODO filter RO descriptor fields...
116
117 # write to database
118 db_dict = dict()
119 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
120 db_dict['deploymentStatus'] = ro_descriptor
121 self.update_db_2("nsrs", nsrs_id, db_dict)
122
123 except Exception as e:
124 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
125
126 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
127
128 # remove last dot from path (if exists)
129 if path.endswith('.'):
130 path = path[:-1]
131
132 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
133 # .format(table, filter, path, updated_data))
134
135 try:
136
137 nsr_id = filter.get('_id')
138
139 # read ns record from database
140 nsr = self.db.get_one(table='nsrs', q_filter=filter)
141 current_ns_status = nsr.get('nsState')
142
143 # get vca status for NS
144 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
145
146 # vcaStatus
147 db_dict = dict()
148 db_dict['vcaStatus'] = status_dict
149
150 # update configurationStatus for this VCA
151 try:
152 vca_index = int(path[path.rfind(".")+1:])
153
154 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
155 vca_status = vca_list[vca_index].get('status')
156
157 configuration_status_list = nsr.get('configurationStatus')
158 config_status = configuration_status_list[vca_index].get('status')
159
160 if config_status == 'BROKEN' and vca_status != 'failed':
161 db_dict['configurationStatus'][vca_index] = 'READY'
162 elif config_status != 'BROKEN' and vca_status == 'failed':
163 db_dict['configurationStatus'][vca_index] = 'BROKEN'
164 except Exception as e:
165 # not update configurationStatus
166 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
167
168 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
169 # if nsState = 'DEGRADED' check if all is OK
170 is_degraded = False
171 if current_ns_status in ('READY', 'DEGRADED'):
172 error_description = ''
173 # check machines
174 if status_dict.get('machines'):
175 for machine_id in status_dict.get('machines'):
176 machine = status_dict.get('machines').get(machine_id)
177 # check machine agent-status
178 if machine.get('agent-status'):
179 s = machine.get('agent-status').get('status')
180 if s != 'started':
181 is_degraded = True
182 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
183 # check machine instance status
184 if machine.get('instance-status'):
185 s = machine.get('instance-status').get('status')
186 if s != 'running':
187 is_degraded = True
188 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
189 # check applications
190 if status_dict.get('applications'):
191 for app_id in status_dict.get('applications'):
192 app = status_dict.get('applications').get(app_id)
193 # check application status
194 if app.get('status'):
195 s = app.get('status').get('status')
196 if s != 'active':
197 is_degraded = True
198 error_description += 'application {} status={} ; '.format(app_id, s)
199
200 if error_description:
201 db_dict['errorDescription'] = error_description
202 if current_ns_status == 'READY' and is_degraded:
203 db_dict['nsState'] = 'DEGRADED'
204 if current_ns_status == 'DEGRADED' and not is_degraded:
205 db_dict['nsState'] = 'READY'
206
207 # write to database
208 self.update_db_2("nsrs", nsr_id, db_dict)
209
210 except Exception as e:
211 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
212
213 return
214
215 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
216 """
217 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
218 :param vnfd: input vnfd
219 :param new_id: overrides vnf id if provided
220 :param additionalParams: Instantiation params for VNFs provided
221 :param nsrId: Id of the NSR
222 :return: copy of vnfd
223 """
224 try:
225 vnfd_RO = deepcopy(vnfd)
226 # remove unused by RO configuration, monitoring, scaling and internal keys
227 vnfd_RO.pop("_id", None)
228 vnfd_RO.pop("_admin", None)
229 vnfd_RO.pop("vnf-configuration", None)
230 vnfd_RO.pop("monitoring-param", None)
231 vnfd_RO.pop("scaling-group-descriptor", None)
232 vnfd_RO.pop("kdu", None)
233 vnfd_RO.pop("k8s-cluster", None)
234 if new_id:
235 vnfd_RO["id"] = new_id
236
237 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
238 for vdu in get_iterable(vnfd_RO, "vdu"):
239 cloud_init_file = None
240 if vdu.get("cloud-init-file"):
241 base_folder = vnfd["_admin"]["storage"]
242 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
243 vdu["cloud-init-file"])
244 with self.fs.file_open(cloud_init_file, "r") as ci_file:
245 cloud_init_content = ci_file.read()
246 vdu.pop("cloud-init-file", None)
247 elif vdu.get("cloud-init"):
248 cloud_init_content = vdu["cloud-init"]
249 else:
250 continue
251
252 env = Environment()
253 ast = env.parse(cloud_init_content)
254 mandatory_vars = meta.find_undeclared_variables(ast)
255 if mandatory_vars:
256 for var in mandatory_vars:
257 if not additionalParams or var not in additionalParams.keys():
258 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
259 "file, must be provided in the instantiation parameters inside the "
260 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
261 template = Template(cloud_init_content)
262 cloud_init_content = template.render(additionalParams or {})
263 vdu["cloud-init"] = cloud_init_content
264
265 return vnfd_RO
266 except FsException as e:
267 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
268 format(vnfd["id"], vdu["id"], cloud_init_file, e))
269 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
270 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
271 format(vnfd["id"], vdu["id"], e))
272
273 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
274 """
275 Creates a RO ns descriptor from OSM ns_instantiate params
276 :param ns_params: OSM instantiate params
277 :return: The RO ns descriptor
278 """
279 vim_2_RO = {}
280 wim_2_RO = {}
281 # TODO feature 1417: Check that no instantiation is set over PDU
282 # check if PDU forces a concrete vim-network-id and add it
283 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
284
285 def vim_account_2_RO(vim_account):
286 if vim_account in vim_2_RO:
287 return vim_2_RO[vim_account]
288
289 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
290 if db_vim["_admin"]["operationalState"] != "ENABLED":
291 raise LcmException("VIM={} is not available. operationalState={}".format(
292 vim_account, db_vim["_admin"]["operationalState"]))
293 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
294 vim_2_RO[vim_account] = RO_vim_id
295 return RO_vim_id
296
297 def wim_account_2_RO(wim_account):
298 if isinstance(wim_account, str):
299 if wim_account in wim_2_RO:
300 return wim_2_RO[wim_account]
301
302 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
303 if db_wim["_admin"]["operationalState"] != "ENABLED":
304 raise LcmException("WIM={} is not available. operationalState={}".format(
305 wim_account, db_wim["_admin"]["operationalState"]))
306 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
307 wim_2_RO[wim_account] = RO_wim_id
308 return RO_wim_id
309 else:
310 return wim_account
311
312 def ip_profile_2_RO(ip_profile):
313 RO_ip_profile = deepcopy((ip_profile))
314 if "dns-server" in RO_ip_profile:
315 if isinstance(RO_ip_profile["dns-server"], list):
316 RO_ip_profile["dns-address"] = []
317 for ds in RO_ip_profile.pop("dns-server"):
318 RO_ip_profile["dns-address"].append(ds['address'])
319 else:
320 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
321 if RO_ip_profile.get("ip-version") == "ipv4":
322 RO_ip_profile["ip-version"] = "IPv4"
323 if RO_ip_profile.get("ip-version") == "ipv6":
324 RO_ip_profile["ip-version"] = "IPv6"
325 if "dhcp-params" in RO_ip_profile:
326 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
327 return RO_ip_profile
328
329 if not ns_params:
330 return None
331 RO_ns_params = {
332 # "name": ns_params["nsName"],
333 # "description": ns_params.get("nsDescription"),
334 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
335 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
336 # "scenario": ns_params["nsdId"],
337 }
338
339 n2vc_key_list = n2vc_key_list or []
340 for vnfd_ref, vnfd in vnfd_dict.items():
341 vdu_needed_access = []
342 mgmt_cp = None
343 if vnfd.get("vnf-configuration"):
344 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
345 if ssh_required and vnfd.get("mgmt-interface"):
346 if vnfd["mgmt-interface"].get("vdu-id"):
347 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
348 elif vnfd["mgmt-interface"].get("cp"):
349 mgmt_cp = vnfd["mgmt-interface"]["cp"]
350
351 for vdu in vnfd.get("vdu", ()):
352 if vdu.get("vdu-configuration"):
353 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
354 if ssh_required:
355 vdu_needed_access.append(vdu["id"])
356 elif mgmt_cp:
357 for vdu_interface in vdu.get("interface"):
358 if vdu_interface.get("external-connection-point-ref") and \
359 vdu_interface["external-connection-point-ref"] == mgmt_cp:
360 vdu_needed_access.append(vdu["id"])
361 mgmt_cp = None
362 break
363
364 if vdu_needed_access:
365 for vnf_member in nsd.get("constituent-vnfd"):
366 if vnf_member["vnfd-id-ref"] != vnfd_ref:
367 continue
368 for vdu in vdu_needed_access:
369 populate_dict(RO_ns_params,
370 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
371 n2vc_key_list)
372
373 if ns_params.get("vduImage"):
374 RO_ns_params["vduImage"] = ns_params["vduImage"]
375
376 if ns_params.get("ssh_keys"):
377 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
378 for vnf_params in get_iterable(ns_params, "vnf"):
379 for constituent_vnfd in nsd["constituent-vnfd"]:
380 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
381 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
382 break
383 else:
384 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
385 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
386 if vnf_params.get("vimAccountId"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
388 vim_account_2_RO(vnf_params["vimAccountId"]))
389
390 for vdu_params in get_iterable(vnf_params, "vdu"):
391 # TODO feature 1417: check that this VDU exist and it is not a PDU
392 if vdu_params.get("volume"):
393 for volume_params in vdu_params["volume"]:
394 if volume_params.get("vim-volume-id"):
395 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
396 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
397 volume_params["vim-volume-id"])
398 if vdu_params.get("interface"):
399 for interface_params in vdu_params["interface"]:
400 if interface_params.get("ip-address"):
401 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
402 vdu_params["id"], "interfaces", interface_params["name"],
403 "ip_address"),
404 interface_params["ip-address"])
405 if interface_params.get("mac-address"):
406 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
407 vdu_params["id"], "interfaces", interface_params["name"],
408 "mac_address"),
409 interface_params["mac-address"])
410 if interface_params.get("floating-ip-required"):
411 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
412 vdu_params["id"], "interfaces", interface_params["name"],
413 "floating-ip"),
414 interface_params["floating-ip-required"])
415
416 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
417 if internal_vld_params.get("vim-network-name"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
419 internal_vld_params["name"], "vim-network-name"),
420 internal_vld_params["vim-network-name"])
421 if internal_vld_params.get("vim-network-id"):
422 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
423 internal_vld_params["name"], "vim-network-id"),
424 internal_vld_params["vim-network-id"])
425 if internal_vld_params.get("ip-profile"):
426 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
427 internal_vld_params["name"], "ip-profile"),
428 ip_profile_2_RO(internal_vld_params["ip-profile"]))
429 if internal_vld_params.get("provider-network"):
430
431 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
432 internal_vld_params["name"], "provider-network"),
433 internal_vld_params["provider-network"].copy())
434
435 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
436 # look for interface
437 iface_found = False
438 for vdu_descriptor in vnf_descriptor["vdu"]:
439 for vdu_interface in vdu_descriptor["interface"]:
440 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
441 if icp_params.get("ip-address"):
442 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
443 vdu_descriptor["id"], "interfaces",
444 vdu_interface["name"], "ip_address"),
445 icp_params["ip-address"])
446
447 if icp_params.get("mac-address"):
448 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
449 vdu_descriptor["id"], "interfaces",
450 vdu_interface["name"], "mac_address"),
451 icp_params["mac-address"])
452 iface_found = True
453 break
454 if iface_found:
455 break
456 else:
457 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
458 "internal-vld:id-ref={} is not present at vnfd:internal-"
459 "connection-point".format(vnf_params["member-vnf-index"],
460 icp_params["id-ref"]))
461
462 for vld_params in get_iterable(ns_params, "vld"):
463 if "ip-profile" in vld_params:
464 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
465 ip_profile_2_RO(vld_params["ip-profile"]))
466
467 if vld_params.get("provider-network"):
468
469 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
470 vld_params["provider-network"].copy())
471
472 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
473 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
474 wim_account_2_RO(vld_params["wimAccountId"])),
475 if vld_params.get("vim-network-name"):
476 RO_vld_sites = []
477 if isinstance(vld_params["vim-network-name"], dict):
478 for vim_account, vim_net in vld_params["vim-network-name"].items():
479 RO_vld_sites.append({
480 "netmap-use": vim_net,
481 "datacenter": vim_account_2_RO(vim_account)
482 })
483 else: # isinstance str
484 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
485 if RO_vld_sites:
486 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
487
488 if vld_params.get("vim-network-id"):
489 RO_vld_sites = []
490 if isinstance(vld_params["vim-network-id"], dict):
491 for vim_account, vim_net in vld_params["vim-network-id"].items():
492 RO_vld_sites.append({
493 "netmap-use": vim_net,
494 "datacenter": vim_account_2_RO(vim_account)
495 })
496 else: # isinstance str
497 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
498 if RO_vld_sites:
499 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
500 if vld_params.get("ns-net"):
501 if isinstance(vld_params["ns-net"], dict):
502 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
503 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
504 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
505 if "vnfd-connection-point-ref" in vld_params:
506 for cp_params in vld_params["vnfd-connection-point-ref"]:
507 # look for interface
508 for constituent_vnfd in nsd["constituent-vnfd"]:
509 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
510 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
511 break
512 else:
513 raise LcmException(
514 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
515 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
516 match_cp = False
517 for vdu_descriptor in vnf_descriptor["vdu"]:
518 for interface_descriptor in vdu_descriptor["interface"]:
519 if interface_descriptor.get("external-connection-point-ref") == \
520 cp_params["vnfd-connection-point-ref"]:
521 match_cp = True
522 break
523 if match_cp:
524 break
525 else:
526 raise LcmException(
527 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
528 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
529 cp_params["member-vnf-index-ref"],
530 cp_params["vnfd-connection-point-ref"],
531 vnf_descriptor["id"]))
532 if cp_params.get("ip-address"):
533 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
534 vdu_descriptor["id"], "interfaces",
535 interface_descriptor["name"], "ip_address"),
536 cp_params["ip-address"])
537 if cp_params.get("mac-address"):
538 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
539 vdu_descriptor["id"], "interfaces",
540 interface_descriptor["name"], "mac_address"),
541 cp_params["mac-address"])
542 return RO_ns_params
543
544 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
545 # make a copy to do not change
546 vdu_create = copy(vdu_create)
547 vdu_delete = copy(vdu_delete)
548
549 vdurs = db_vnfr.get("vdur")
550 if vdurs is None:
551 vdurs = []
552 vdu_index = len(vdurs)
553 while vdu_index:
554 vdu_index -= 1
555 vdur = vdurs[vdu_index]
556 if vdur.get("pdu-type"):
557 continue
558 vdu_id_ref = vdur["vdu-id-ref"]
559 if vdu_create and vdu_create.get(vdu_id_ref):
560 for index in range(0, vdu_create[vdu_id_ref]):
561 vdur = deepcopy(vdur)
562 vdur["_id"] = str(uuid4())
563 vdur["count-index"] += 1
564 vdurs.insert(vdu_index+1+index, vdur)
565 del vdu_create[vdu_id_ref]
566 if vdu_delete and vdu_delete.get(vdu_id_ref):
567 del vdurs[vdu_index]
568 vdu_delete[vdu_id_ref] -= 1
569 if not vdu_delete[vdu_id_ref]:
570 del vdu_delete[vdu_id_ref]
571 # check all operations are done
572 if vdu_create or vdu_delete:
573 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
574 vdu_create))
575 if vdu_delete:
576 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
577 vdu_delete))
578
579 vnfr_update = {"vdur": vdurs}
580 db_vnfr["vdur"] = vdurs
581 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
582
583 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
584 """
585 Updates database nsr with the RO info for the created vld
586 :param ns_update_nsr: dictionary to be filled with the updated info
587 :param db_nsr: content of db_nsr. This is also modified
588 :param nsr_desc_RO: nsr descriptor from RO
589 :return: Nothing, LcmException is raised on errors
590 """
591
592 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
593 for net_RO in get_iterable(nsr_desc_RO, "nets"):
594 if vld["id"] != net_RO.get("ns_net_osm_id"):
595 continue
596 vld["vim-id"] = net_RO.get("vim_net_id")
597 vld["name"] = net_RO.get("vim_name")
598 vld["status"] = net_RO.get("status")
599 vld["status-detailed"] = net_RO.get("error_msg")
600 ns_update_nsr["vld.{}".format(vld_index)] = vld
601 break
602 else:
603 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
604
605 def set_vnfr_at_error(self, db_vnfrs, error_text):
606 try:
607 for db_vnfr in db_vnfrs.values():
608 vnfr_update = {"status": "ERROR"}
609 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
610 if "status" not in vdur:
611 vdur["status"] = "ERROR"
612 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
613 if error_text:
614 vdur["status-detailed"] = str(error_text)
615 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
616 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
617 except DbException as e:
618 self.logger.error("Cannot update vnf. {}".format(e))
619
620 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
621 """
622 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
623 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
624 :param nsr_desc_RO: nsr descriptor from RO
625 :return: Nothing, LcmException is raised on errors
626 """
627 for vnf_index, db_vnfr in db_vnfrs.items():
628 for vnf_RO in nsr_desc_RO["vnfs"]:
629 if vnf_RO["member_vnf_index"] != vnf_index:
630 continue
631 vnfr_update = {}
632 if vnf_RO.get("ip_address"):
633 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
634 elif not db_vnfr.get("ip-address"):
635 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
636 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
637
638 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
639 vdur_RO_count_index = 0
640 if vdur.get("pdu-type"):
641 continue
642 for vdur_RO in get_iterable(vnf_RO, "vms"):
643 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
644 continue
645 if vdur["count-index"] != vdur_RO_count_index:
646 vdur_RO_count_index += 1
647 continue
648 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
649 if vdur_RO.get("ip_address"):
650 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
651 else:
652 vdur["ip-address"] = None
653 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
654 vdur["name"] = vdur_RO.get("vim_name")
655 vdur["status"] = vdur_RO.get("status")
656 vdur["status-detailed"] = vdur_RO.get("error_msg")
657 for ifacer in get_iterable(vdur, "interfaces"):
658 for interface_RO in get_iterable(vdur_RO, "interfaces"):
659 if ifacer["name"] == interface_RO.get("internal_name"):
660 ifacer["ip-address"] = interface_RO.get("ip_address")
661 ifacer["mac-address"] = interface_RO.get("mac_address")
662 break
663 else:
664 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
665 "from VIM info"
666 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
667 vnfr_update["vdur.{}".format(vdu_index)] = vdur
668 break
669 else:
670 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
671 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
672
673 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
674 for net_RO in get_iterable(nsr_desc_RO, "nets"):
675 if vld["id"] != net_RO.get("vnf_net_osm_id"):
676 continue
677 vld["vim-id"] = net_RO.get("vim_net_id")
678 vld["name"] = net_RO.get("vim_name")
679 vld["status"] = net_RO.get("status")
680 vld["status-detailed"] = net_RO.get("error_msg")
681 vnfr_update["vld.{}".format(vld_index)] = vld
682 break
683 else:
684 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
685 vnf_index, vld["id"]))
686
687 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
688 break
689
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
692
693 def _get_ns_config_info(self, nsr_id):
694 """
695 Generates a mapping between vnf,vdu elements and the N2VC id
696 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
697 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
698 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
699 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
700 """
701 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
702 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
703 mapping = {}
704 ns_config_info = {"osm-config-mapping": mapping}
705 for vca in vca_deployed_list:
706 if not vca["member-vnf-index"]:
707 continue
708 if not vca["vdu_id"]:
709 mapping[vca["member-vnf-index"]] = vca["application"]
710 else:
711 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
712 vca["application"]
713 return ns_config_info
714
715 @staticmethod
716 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
717 """
718 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
719 primitives as verify-ssh-credentials, or config when needed
720 :param desc_primitive_list: information of the descriptor
721 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
722 this element contains a ssh public key
723 :return: The modified list. Can ba an empty list, but always a list
724 """
725 if desc_primitive_list:
726 primitive_list = desc_primitive_list.copy()
727 else:
728 primitive_list = []
729 # look for primitive config, and get the position. None if not present
730 config_position = None
731 for index, primitive in enumerate(primitive_list):
732 if primitive["name"] == "config":
733 config_position = index
734 break
735
736 # for NS, add always a config primitive if not present (bug 874)
737 if not vca_deployed["member-vnf-index"] and config_position is None:
738 primitive_list.insert(0, {"name": "config", "parameter": []})
739 config_position = 0
740 # for VNF/VDU add verify-ssh-credentials after config
741 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
742 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
743 return primitive_list
744
745 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
746 n2vc_key_list, stage):
747 try:
748 db_nsr_update = {}
749 RO_descriptor_number = 0 # number of descriptors created at RO
750 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
751 nslcmop_id = db_nslcmop["_id"]
752 start_deploy = time()
753 ns_params = db_nslcmop.get("operationParams")
754 if ns_params and ns_params.get("timeout_ns_deploy"):
755 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
756 else:
757 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
758
759 # Check for and optionally request placement optimization. Database will be updated if placement activated
760 stage[2] = "Waiting for Placement."
761 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
762
763 # deploy RO
764
765 # get vnfds, instantiate at RO
766 for c_vnf in nsd.get("constituent-vnfd", ()):
767 member_vnf_index = c_vnf["member-vnf-index"]
768 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
769 vnfd_ref = vnfd["id"]
770
771 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
772 db_nsr_update["detailed-status"] = " ".join(stage)
773 self.update_db_2("nsrs", nsr_id, db_nsr_update)
774 self._write_op_status(nslcmop_id, stage)
775
776 # self.logger.debug(logging_text + stage[2])
777 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
778 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
779 RO_descriptor_number += 1
780
781 # look position at deployed.RO.vnfd if not present it will be appended at the end
782 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
783 if vnf_deployed["member-vnf-index"] == member_vnf_index:
784 break
785 else:
786 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
787 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
788
789 # look if present
790 RO_update = {"member-vnf-index": member_vnf_index}
791 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
792 if vnfd_list:
793 RO_update["id"] = vnfd_list[0]["uuid"]
794 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
795 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
796 else:
797 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
798 get("additionalParamsForVnf"), nsr_id)
799 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
800 RO_update["id"] = desc["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
802 vnfd_ref, member_vnf_index, desc["uuid"]))
803 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
804 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
805
806 # create nsd at RO
807 nsd_ref = nsd["id"]
808
809 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
810 db_nsr_update["detailed-status"] = " ".join(stage)
811 self.update_db_2("nsrs", nsr_id, db_nsr_update)
812 self._write_op_status(nslcmop_id, stage)
813
814 # self.logger.debug(logging_text + stage[2])
815 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
816 RO_descriptor_number += 1
817 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
818 if nsd_list:
819 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
820 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
821 nsd_ref, RO_nsd_uuid))
822 else:
823 nsd_RO = deepcopy(nsd)
824 nsd_RO["id"] = RO_osm_nsd_id
825 nsd_RO.pop("_id", None)
826 nsd_RO.pop("_admin", None)
827 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
828 member_vnf_index = c_vnf["member-vnf-index"]
829 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
830 for c_vld in nsd_RO.get("vld", ()):
831 for cp in c_vld.get("vnfd-connection-point-ref", ()):
832 member_vnf_index = cp["member-vnf-index-ref"]
833 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
834
835 desc = await self.RO.create("nsd", descriptor=nsd_RO)
836 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
837 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
838 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
839 self.update_db_2("nsrs", nsr_id, db_nsr_update)
840
841 # Crate ns at RO
842 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
843 db_nsr_update["detailed-status"] = " ".join(stage)
844 self.update_db_2("nsrs", nsr_id, db_nsr_update)
845 self._write_op_status(nslcmop_id, stage)
846
847 # if present use it unless in error status
848 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
849 if RO_nsr_id:
850 try:
851 stage[2] = "Looking for existing ns at RO"
852 db_nsr_update["detailed-status"] = " ".join(stage)
853 self.update_db_2("nsrs", nsr_id, db_nsr_update)
854 self._write_op_status(nslcmop_id, stage)
855 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
856 desc = await self.RO.show("ns", RO_nsr_id)
857
858 except ROclient.ROClientException as e:
859 if e.http_code != HTTPStatus.NOT_FOUND:
860 raise
861 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
862 if RO_nsr_id:
863 ns_status, ns_status_info = self.RO.check_ns_status(desc)
864 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
865 if ns_status == "ERROR":
866 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
867 self.logger.debug(logging_text + stage[2])
868 await self.RO.delete("ns", RO_nsr_id)
869 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
870 if not RO_nsr_id:
871 stage[2] = "Checking dependencies"
872 db_nsr_update["detailed-status"] = " ".join(stage)
873 self.update_db_2("nsrs", nsr_id, db_nsr_update)
874 self._write_op_status(nslcmop_id, stage)
875 # self.logger.debug(logging_text + stage[2])
876
877 # check if VIM is creating and wait look if previous tasks in process
878 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
879 if task_dependency:
880 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
881 self.logger.debug(logging_text + stage[2])
882 await asyncio.wait(task_dependency, timeout=3600)
883 if ns_params.get("vnf"):
884 for vnf in ns_params["vnf"]:
885 if "vimAccountId" in vnf:
886 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
887 vnf["vimAccountId"])
888 if task_dependency:
889 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
890 self.logger.debug(logging_text + stage[2])
891 await asyncio.wait(task_dependency, timeout=3600)
892
893 stage[2] = "Checking instantiation parameters."
894 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
895 stage[2] = "Deploying ns at VIM."
896 db_nsr_update["detailed-status"] = " ".join(stage)
897 self.update_db_2("nsrs", nsr_id, db_nsr_update)
898 self._write_op_status(nslcmop_id, stage)
899
900 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
901 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
902 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
903 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
904 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
905
906 # wait until NS is ready
907 stage[2] = "Waiting VIM to deploy ns."
908 db_nsr_update["detailed-status"] = " ".join(stage)
909 self.update_db_2("nsrs", nsr_id, db_nsr_update)
910 self._write_op_status(nslcmop_id, stage)
911 detailed_status_old = None
912 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
913
914 old_desc = None
915 while time() <= start_deploy + timeout_ns_deploy:
916 desc = await self.RO.show("ns", RO_nsr_id)
917
918 # deploymentStatus
919 if desc != old_desc:
920 # desc has changed => update db
921 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
922 old_desc = desc
923
924 ns_status, ns_status_info = self.RO.check_ns_status(desc)
925 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
926 if ns_status == "ERROR":
927 raise ROclient.ROClientException(ns_status_info)
928 elif ns_status == "BUILD":
929 stage[2] = "VIM: ({})".format(ns_status_info)
930 elif ns_status == "ACTIVE":
931 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
932 try:
933 self.ns_update_vnfr(db_vnfrs, desc)
934 break
935 except LcmExceptionNoMgmtIP:
936 pass
937 else:
938 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
939 if stage[2] != detailed_status_old:
940 detailed_status_old = stage[2]
941 db_nsr_update["detailed-status"] = " ".join(stage)
942 self.update_db_2("nsrs", nsr_id, db_nsr_update)
943 self._write_op_status(nslcmop_id, stage)
944 await asyncio.sleep(5, loop=self.loop)
945 else: # timeout_ns_deploy
946 raise ROclient.ROClientException("Timeout waiting ns to be ready")
947
948 # Updating NSR
949 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
950
951 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
952 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
953 stage[2] = "Deployed at VIM"
954 db_nsr_update["detailed-status"] = " ".join(stage)
955 self.update_db_2("nsrs", nsr_id, db_nsr_update)
956 self._write_op_status(nslcmop_id, stage)
957 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
958 # self.logger.debug(logging_text + "Deployed at VIM")
959 except (ROclient.ROClientException, LcmException, DbException) as e:
960 self.set_vnfr_at_error(db_vnfrs, str(e))
961 raise
962
963 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
964 """
965 Wait for ip addres at RO, and optionally, insert public key in virtual machine
966 :param logging_text: prefix use for logging
967 :param nsr_id:
968 :param vnfr_id:
969 :param vdu_id:
970 :param vdu_index:
971 :param pub_key: public ssh key to inject, None to skip
972 :param user: user to apply the public ssh key
973 :return: IP address
974 """
975
976 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
977 ro_nsr_id = None
978 ip_address = None
979 nb_tries = 0
980 target_vdu_id = None
981 ro_retries = 0
982
983 while True:
984
985 ro_retries += 1
986 if ro_retries >= 360: # 1 hour
987 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
988
989 await asyncio.sleep(10, loop=self.loop)
990
991 # get ip address
992 if not target_vdu_id:
993 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
994
995 if not vdu_id: # for the VNF case
996 if db_vnfr.get("status") == "ERROR":
997 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
998 ip_address = db_vnfr.get("ip-address")
999 if not ip_address:
1000 continue
1001 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1002 else: # VDU case
1003 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1004 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1005
1006 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1007 vdur = db_vnfr["vdur"][0]
1008 if not vdur:
1009 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1010 vdu_index))
1011
1012 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1013 ip_address = vdur.get("ip-address")
1014 if not ip_address:
1015 continue
1016 target_vdu_id = vdur["vdu-id-ref"]
1017 elif vdur.get("status") == "ERROR":
1018 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1019
1020 if not target_vdu_id:
1021 continue
1022
1023 # inject public key into machine
1024 if pub_key and user:
1025 # wait until NS is deployed at RO
1026 if not ro_nsr_id:
1027 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1028 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1029 if not ro_nsr_id:
1030 continue
1031
1032 # self.logger.debug(logging_text + "Inserting RO key")
1033 if vdur.get("pdu-type"):
1034 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1035 return ip_address
1036 try:
1037 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1038 result_dict = await self.RO.create_action(
1039 item="ns",
1040 item_id_name=ro_nsr_id,
1041 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1042 )
1043 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1044 if not result_dict or not isinstance(result_dict, dict):
1045 raise LcmException("Unknown response from RO when injecting key")
1046 for result in result_dict.values():
1047 if result.get("vim_result") == 200:
1048 break
1049 else:
1050 raise ROclient.ROClientException("error injecting key: {}".format(
1051 result.get("description")))
1052 break
1053 except ROclient.ROClientException as e:
1054 if not nb_tries:
1055 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1056 format(e, 20*10))
1057 nb_tries += 1
1058 if nb_tries >= 20:
1059 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1060 else:
1061 break
1062
1063 return ip_address
1064
1065 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1066 """
1067 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1068 """
1069 my_vca = vca_deployed_list[vca_index]
1070 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1071 # vdu or kdu: no dependencies
1072 return
1073 timeout = 300
1074 while timeout >= 0:
1075 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1076 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1077 configuration_status_list = db_nsr["configurationStatus"]
1078 for index, vca_deployed in enumerate(configuration_status_list):
1079 if index == vca_index:
1080 # myself
1081 continue
1082 if not my_vca.get("member-vnf-index") or \
1083 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1084 internal_status = configuration_status_list[index].get("status")
1085 if internal_status == 'READY':
1086 continue
1087 elif internal_status == 'BROKEN':
1088 raise LcmException("Configuration aborted because dependent charm/s has failed")
1089 else:
1090 break
1091 else:
1092 # no dependencies, return
1093 return
1094 await asyncio.sleep(10)
1095 timeout -= 1
1096
1097 raise LcmException("Configuration aborted because dependent charm/s timeout")
1098
1099 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1100 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1101 nsr_id = db_nsr["_id"]
1102 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1103 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1104 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1105 db_dict = {
1106 'collection': 'nsrs',
1107 'filter': {'_id': nsr_id},
1108 'path': db_update_entry
1109 }
1110 step = ""
1111 try:
1112
1113 element_type = 'NS'
1114 element_under_configuration = nsr_id
1115
1116 vnfr_id = None
1117 if db_vnfr:
1118 vnfr_id = db_vnfr["_id"]
1119
1120 namespace = "{nsi}.{ns}".format(
1121 nsi=nsi_id if nsi_id else "",
1122 ns=nsr_id)
1123
1124 if vnfr_id:
1125 element_type = 'VNF'
1126 element_under_configuration = vnfr_id
1127 namespace += ".{}".format(vnfr_id)
1128 if vdu_id:
1129 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1130 element_type = 'VDU'
1131 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1132
1133 # Get artifact path
1134 self.fs.sync() # Sync from FSMongo
1135 artifact_path = "{}/{}/charms/{}".format(
1136 base_folder["folder"],
1137 base_folder["pkg-dir"],
1138 config_descriptor["juju"]["charm"]
1139 )
1140
1141 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1142 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1143 is_proxy_charm = False
1144
1145 # n2vc_redesign STEP 3.1
1146
1147 # find old ee_id if exists
1148 ee_id = vca_deployed.get("ee_id")
1149
1150 # create or register execution environment in VCA
1151 if is_proxy_charm:
1152
1153 self._write_configuration_status(
1154 nsr_id=nsr_id,
1155 vca_index=vca_index,
1156 status='CREATING',
1157 element_under_configuration=element_under_configuration,
1158 element_type=element_type
1159 )
1160
1161 step = "create execution environment"
1162 self.logger.debug(logging_text + step)
1163 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1164 reuse_ee_id=ee_id,
1165 db_dict=db_dict)
1166
1167 else:
1168 step = "Waiting to VM being up and getting IP address"
1169 self.logger.debug(logging_text + step)
1170 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1171 user=None, pub_key=None)
1172 credentials = {"hostname": rw_mgmt_ip}
1173 # get username
1174 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1175 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1176 # merged. Meanwhile let's get username from initial-config-primitive
1177 if not username and config_descriptor.get("initial-config-primitive"):
1178 for config_primitive in config_descriptor["initial-config-primitive"]:
1179 for param in config_primitive.get("parameter", ()):
1180 if param["name"] == "ssh-username":
1181 username = param["value"]
1182 break
1183 if not username:
1184 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1185 "'config-access.ssh-access.default-user'")
1186 credentials["username"] = username
1187 # n2vc_redesign STEP 3.2
1188
1189 self._write_configuration_status(
1190 nsr_id=nsr_id,
1191 vca_index=vca_index,
1192 status='REGISTERING',
1193 element_under_configuration=element_under_configuration,
1194 element_type=element_type
1195 )
1196
1197 step = "register execution environment {}".format(credentials)
1198 self.logger.debug(logging_text + step)
1199 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1200 db_dict=db_dict)
1201
1202 # for compatibility with MON/POL modules, the need model and application name at database
1203 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1204 ee_id_parts = ee_id.split('.')
1205 model_name = ee_id_parts[0]
1206 application_name = ee_id_parts[1]
1207 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1208 db_update_entry + "application": application_name,
1209 db_update_entry + "ee_id": ee_id})
1210
1211 # n2vc_redesign STEP 3.3
1212
1213 step = "Install configuration Software"
1214
1215 self._write_configuration_status(
1216 nsr_id=nsr_id,
1217 vca_index=vca_index,
1218 status='INSTALLING SW',
1219 element_under_configuration=element_under_configuration,
1220 element_type=element_type
1221 )
1222
1223 # TODO check if already done
1224 self.logger.debug(logging_text + step)
1225 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1226
1227 # write in db flag of configuration_sw already installed
1228 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1229
1230 # add relations for this VCA (wait for other peers related with this VCA)
1231 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1232
1233 # if SSH access is required, then get execution environment SSH public
1234 if is_proxy_charm: # if native charm we have waited already to VM be UP
1235 pub_key = None
1236 user = None
1237 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1238 # Needed to inject a ssh key
1239 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1240 step = "Install configuration Software, getting public ssh key"
1241 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1242
1243 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1244 else:
1245 step = "Waiting to VM being up and getting IP address"
1246 self.logger.debug(logging_text + step)
1247
1248 # n2vc_redesign STEP 5.1
1249 # wait for RO (ip-address) Insert pub_key into VM
1250 if vnfr_id:
1251 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1252 user=user, pub_key=pub_key)
1253 else:
1254 rw_mgmt_ip = None # This is for a NS configuration
1255
1256 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1257
1258 # store rw_mgmt_ip in deploy params for later replacement
1259 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1260
1261 # n2vc_redesign STEP 6 Execute initial config primitive
1262 step = 'execute initial config primitive'
1263 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1264
1265 # sort initial config primitives by 'seq'
1266 if initial_config_primitive_list:
1267 try:
1268 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1269 except Exception as e:
1270 self.logger.error(logging_text + step + ": " + str(e))
1271 else:
1272 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1273
1274 # add config if not present for NS charm
1275 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1276 vca_deployed)
1277
1278 # wait for dependent primitives execution (NS -> VNF -> VDU)
1279 if initial_config_primitive_list:
1280 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1281
1282 # stage, in function of element type: vdu, kdu, vnf or ns
1283 my_vca = vca_deployed_list[vca_index]
1284 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1285 # VDU or KDU
1286 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1287 elif my_vca.get("member-vnf-index"):
1288 # VNF
1289 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1290 else:
1291 # NS
1292 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1293
1294 self._write_configuration_status(
1295 nsr_id=nsr_id,
1296 vca_index=vca_index,
1297 status='EXECUTING PRIMITIVE'
1298 )
1299
1300 self._write_op_status(
1301 op_id=nslcmop_id,
1302 stage=stage
1303 )
1304
1305 check_if_terminated_needed = True
1306 for initial_config_primitive in initial_config_primitive_list:
1307 # adding information on the vca_deployed if it is a NS execution environment
1308 if not vca_deployed["member-vnf-index"]:
1309 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1310 # TODO check if already done
1311 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1312
1313 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1314 self.logger.debug(logging_text + step)
1315 await self.n2vc.exec_primitive(
1316 ee_id=ee_id,
1317 primitive_name=initial_config_primitive["name"],
1318 params_dict=primitive_params_,
1319 db_dict=db_dict
1320 )
1321 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1322 if check_if_terminated_needed:
1323 if config_descriptor.get('terminate-config-primitive'):
1324 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1325 check_if_terminated_needed = False
1326
1327 # TODO register in database that primitive is done
1328
1329 step = "instantiated at VCA"
1330 self.logger.debug(logging_text + step)
1331
1332 self._write_configuration_status(
1333 nsr_id=nsr_id,
1334 vca_index=vca_index,
1335 status='READY'
1336 )
1337
1338 except Exception as e: # TODO not use Exception but N2VC exception
1339 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1340 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1341 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1342 self._write_configuration_status(
1343 nsr_id=nsr_id,
1344 vca_index=vca_index,
1345 status='BROKEN'
1346 )
1347 raise LcmException("{} {}".format(step, e)) from e
1348
1349 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1350 error_description: str = None, other_update: dict = None):
1351 """
1352 Update db_nsr fields.
1353 :param nsr_id:
1354 :param ns_state:
1355 :param current_operation:
1356 :param current_operation_id:
1357 :param error_description:
1358 :param other_update: Other required changes at database if provided, will be cleared
1359 :return:
1360 """
1361 try:
1362 db_dict = other_update or {}
1363 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1364 db_dict["_admin.current-operation"] = current_operation_id
1365 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1366 db_dict["currentOperation"] = current_operation
1367 db_dict["currentOperationID"] = current_operation_id
1368 db_dict["errorDescription"] = error_description
1369
1370 if ns_state:
1371 db_dict["nsState"] = ns_state
1372 self.update_db_2("nsrs", nsr_id, db_dict)
1373 except DbException as e:
1374 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1375
1376 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1377 operation_state: str = None, other_update: dict = None):
1378 try:
1379 db_dict = other_update or {}
1380 db_dict['queuePosition'] = queuePosition
1381 if isinstance(stage, list):
1382 db_dict['stage'] = stage[0]
1383 db_dict['detailed-status'] = " ".join(stage)
1384 elif stage is not None:
1385 db_dict['stage'] = str(stage)
1386
1387 if error_message is not None:
1388 db_dict['errorMessage'] = error_message
1389 if operation_state is not None:
1390 db_dict['operationState'] = operation_state
1391 db_dict["statusEnteredTime"] = time()
1392 self.update_db_2("nslcmops", op_id, db_dict)
1393 except DbException as e:
1394 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1395
1396 def _write_all_config_status(self, nsr_id: str, status: str):
1397 try:
1398 # nsrs record
1399 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1400 # configurationStatus
1401 config_status = db_nsr.get('configurationStatus')
1402 if config_status:
1403 # update status
1404 db_dict = dict()
1405 db_dict['configurationStatus'] = list()
1406 for c in config_status:
1407 c['status'] = status
1408 db_dict['configurationStatus'].append(c)
1409 self.update_db_2("nsrs", nsr_id, db_dict)
1410
1411 except DbException as e:
1412 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1413
1414 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1415 element_under_configuration: str = None, element_type: str = None):
1416
1417 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1418 # .format(vca_index, status))
1419
1420 try:
1421 db_path = 'configurationStatus.{}.'.format(vca_index)
1422 db_dict = dict()
1423 if status:
1424 db_dict[db_path + 'status'] = status
1425 if element_under_configuration:
1426 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1427 if element_type:
1428 db_dict[db_path + 'elementType'] = element_type
1429 self.update_db_2("nsrs", nsr_id, db_dict)
1430 except DbException as e:
1431 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1432 .format(status, nsr_id, vca_index, e))
1433
1434 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1435 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1436 if placement_engine == "PLA":
1437 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1438 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1439 db_poll_interval = 5
1440 wait = db_poll_interval * 4
1441 pla_result = None
1442 while not pla_result and wait >= 0:
1443 await asyncio.sleep(db_poll_interval)
1444 wait -= db_poll_interval
1445 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1446 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1447
1448 if not pla_result:
1449 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1450
1451 for pla_vnf in pla_result['vnf']:
1452 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1453 if not pla_vnf.get('vimAccountId') or not vnfr:
1454 continue
1455 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1456 return
1457
1458 def update_nsrs_with_pla_result(self, params):
1459 try:
1460 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1461 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1462 except Exception as e:
1463 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1464
1465 async def instantiate(self, nsr_id, nslcmop_id):
1466 """
1467
1468 :param nsr_id: ns instance to deploy
1469 :param nslcmop_id: operation to run
1470 :return:
1471 """
1472
1473 # Try to lock HA task here
1474 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1475 if not task_is_locked_by_me:
1476 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1477 return
1478
1479 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1480 self.logger.debug(logging_text + "Enter")
1481
1482 # get all needed from database
1483
1484 # database nsrs record
1485 db_nsr = None
1486
1487 # database nslcmops record
1488 db_nslcmop = None
1489
1490 # update operation on nsrs
1491 db_nsr_update = {}
1492 # update operation on nslcmops
1493 db_nslcmop_update = {}
1494
1495 nslcmop_operation_state = None
1496 db_vnfrs = {} # vnf's info indexed by member-index
1497 # n2vc_info = {}
1498 tasks_dict_info = {} # from task to info text
1499 exc = None
1500 error_list = []
1501 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1502 # ^ stage, step, VIM progress
1503 try:
1504 # wait for any previous tasks in process
1505 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1506
1507 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1508 stage[1] = "Reading from database,"
1509 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1510 db_nsr_update["detailed-status"] = "creating"
1511 db_nsr_update["operational-status"] = "init"
1512 self._write_ns_status(
1513 nsr_id=nsr_id,
1514 ns_state="BUILDING",
1515 current_operation="INSTANTIATING",
1516 current_operation_id=nslcmop_id,
1517 other_update=db_nsr_update
1518 )
1519 self._write_op_status(
1520 op_id=nslcmop_id,
1521 stage=stage,
1522 queuePosition=0
1523 )
1524
1525 # read from db: operation
1526 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1527 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1528 ns_params = db_nslcmop.get("operationParams")
1529 if ns_params and ns_params.get("timeout_ns_deploy"):
1530 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1531 else:
1532 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1533
1534 # read from db: ns
1535 stage[1] = "Getting nsr={} from db".format(nsr_id)
1536 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1537 # nsd is replicated into ns (no db read)
1538 nsd = db_nsr["nsd"]
1539 # nsr_name = db_nsr["name"] # TODO short-name??
1540
1541 # read from db: vnf's of this ns
1542 stage[1] = "Getting vnfrs from db"
1543 self.logger.debug(logging_text + stage[1])
1544 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1545
1546 # read from db: vnfd's for every vnf
1547 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1548 db_vnfds = {} # every vnfd data indexed by vnf id
1549 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1550
1551 # for each vnf in ns, read vnfd
1552 for vnfr in db_vnfrs_list:
1553 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1554 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1555 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1556 # if we haven't this vnfd, read it from db
1557 if vnfd_id not in db_vnfds:
1558 # read from db
1559 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1560 self.logger.debug(logging_text + stage[1])
1561 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1562
1563 # store vnfd
1564 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1565 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1566 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1567
1568 # Get or generates the _admin.deployed.VCA list
1569 vca_deployed_list = None
1570 if db_nsr["_admin"].get("deployed"):
1571 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1572 if vca_deployed_list is None:
1573 vca_deployed_list = []
1574 configuration_status_list = []
1575 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1576 db_nsr_update["configurationStatus"] = configuration_status_list
1577 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1578 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1579 elif isinstance(vca_deployed_list, dict):
1580 # maintain backward compatibility. Change a dict to list at database
1581 vca_deployed_list = list(vca_deployed_list.values())
1582 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1583 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1584
1585 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1586 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1587 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1588
1589 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1590 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1591 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1592
1593 # n2vc_redesign STEP 2 Deploy Network Scenario
1594 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1595 self._write_op_status(
1596 op_id=nslcmop_id,
1597 stage=stage
1598 )
1599
1600 stage[1] = "Deploying KDUs,"
1601 # self.logger.debug(logging_text + "Before deploy_kdus")
1602 # Call to deploy_kdus in case exists the "vdu:kdu" param
1603 await self.deploy_kdus(
1604 logging_text=logging_text,
1605 nsr_id=nsr_id,
1606 nslcmop_id=nslcmop_id,
1607 db_vnfrs=db_vnfrs,
1608 db_vnfds=db_vnfds,
1609 task_instantiation_info=tasks_dict_info,
1610 )
1611
1612 stage[1] = "Getting VCA public key."
1613 # n2vc_redesign STEP 1 Get VCA public ssh-key
1614 # feature 1429. Add n2vc public key to needed VMs
1615 n2vc_key = self.n2vc.get_public_key()
1616 n2vc_key_list = [n2vc_key]
1617 if self.vca_config.get("public_key"):
1618 n2vc_key_list.append(self.vca_config["public_key"])
1619
1620 stage[1] = "Deploying NS at VIM."
1621 task_ro = asyncio.ensure_future(
1622 self.instantiate_RO(
1623 logging_text=logging_text,
1624 nsr_id=nsr_id,
1625 nsd=nsd,
1626 db_nsr=db_nsr,
1627 db_nslcmop=db_nslcmop,
1628 db_vnfrs=db_vnfrs,
1629 db_vnfds_ref=db_vnfds_ref,
1630 n2vc_key_list=n2vc_key_list,
1631 stage=stage
1632 )
1633 )
1634 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1635 tasks_dict_info[task_ro] = "Deploy at VIM"
1636
1637 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1638 stage[1] = "Deploying Execution Environments."
1639 self.logger.debug(logging_text + stage[1])
1640
1641 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1642 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1643 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1644 vnfd_id = c_vnf["vnfd-id-ref"]
1645 vnfd = db_vnfds_ref[vnfd_id]
1646 member_vnf_index = str(c_vnf["member-vnf-index"])
1647 db_vnfr = db_vnfrs[member_vnf_index]
1648 base_folder = vnfd["_admin"]["storage"]
1649 vdu_id = None
1650 vdu_index = 0
1651 vdu_name = None
1652 kdu_name = None
1653
1654 # Get additional parameters
1655 deploy_params = {}
1656 if db_vnfr.get("additionalParamsForVnf"):
1657 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1658
1659 descriptor_config = vnfd.get("vnf-configuration")
1660 if descriptor_config and descriptor_config.get("juju"):
1661 self._deploy_n2vc(
1662 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1663 db_nsr=db_nsr,
1664 db_vnfr=db_vnfr,
1665 nslcmop_id=nslcmop_id,
1666 nsr_id=nsr_id,
1667 nsi_id=nsi_id,
1668 vnfd_id=vnfd_id,
1669 vdu_id=vdu_id,
1670 kdu_name=kdu_name,
1671 member_vnf_index=member_vnf_index,
1672 vdu_index=vdu_index,
1673 vdu_name=vdu_name,
1674 deploy_params=deploy_params,
1675 descriptor_config=descriptor_config,
1676 base_folder=base_folder,
1677 task_instantiation_info=tasks_dict_info,
1678 stage=stage
1679 )
1680
1681 # Deploy charms for each VDU that supports one.
1682 for vdud in get_iterable(vnfd, 'vdu'):
1683 vdu_id = vdud["id"]
1684 descriptor_config = vdud.get('vdu-configuration')
1685 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1686 if vdur.get("additionalParams"):
1687 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1688 else:
1689 deploy_params_vdu = deploy_params
1690 if descriptor_config and descriptor_config.get("juju"):
1691 # look for vdu index in the db_vnfr["vdu"] section
1692 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1693 # if vdur["vdu-id-ref"] == vdu_id:
1694 # break
1695 # else:
1696 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1697 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1698 # vdu_name = vdur.get("name")
1699 vdu_name = None
1700 kdu_name = None
1701 for vdu_index in range(int(vdud.get("count", 1))):
1702 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1703 self._deploy_n2vc(
1704 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1705 member_vnf_index, vdu_id, vdu_index),
1706 db_nsr=db_nsr,
1707 db_vnfr=db_vnfr,
1708 nslcmop_id=nslcmop_id,
1709 nsr_id=nsr_id,
1710 nsi_id=nsi_id,
1711 vnfd_id=vnfd_id,
1712 vdu_id=vdu_id,
1713 kdu_name=kdu_name,
1714 member_vnf_index=member_vnf_index,
1715 vdu_index=vdu_index,
1716 vdu_name=vdu_name,
1717 deploy_params=deploy_params_vdu,
1718 descriptor_config=descriptor_config,
1719 base_folder=base_folder,
1720 task_instantiation_info=tasks_dict_info
1721 )
1722 for kdud in get_iterable(vnfd, 'kdu'):
1723 kdu_name = kdud["name"]
1724 descriptor_config = kdud.get('kdu-configuration')
1725 if descriptor_config and descriptor_config.get("juju"):
1726 vdu_id = None
1727 vdu_index = 0
1728 vdu_name = None
1729 # look for vdu index in the db_vnfr["vdu"] section
1730 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1731 # if vdur["vdu-id-ref"] == vdu_id:
1732 # break
1733 # else:
1734 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1735 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1736 # vdu_name = vdur.get("name")
1737 # vdu_name = None
1738
1739 self._deploy_n2vc(
1740 logging_text=logging_text,
1741 db_nsr=db_nsr,
1742 db_vnfr=db_vnfr,
1743 nslcmop_id=nslcmop_id,
1744 nsr_id=nsr_id,
1745 nsi_id=nsi_id,
1746 vnfd_id=vnfd_id,
1747 vdu_id=vdu_id,
1748 kdu_name=kdu_name,
1749 member_vnf_index=member_vnf_index,
1750 vdu_index=vdu_index,
1751 vdu_name=vdu_name,
1752 deploy_params=deploy_params,
1753 descriptor_config=descriptor_config,
1754 base_folder=base_folder,
1755 task_instantiation_info=tasks_dict_info
1756 )
1757
1758 # Check if this NS has a charm configuration
1759 descriptor_config = nsd.get("ns-configuration")
1760 if descriptor_config and descriptor_config.get("juju"):
1761 vnfd_id = None
1762 db_vnfr = None
1763 member_vnf_index = None
1764 vdu_id = None
1765 kdu_name = None
1766 vdu_index = 0
1767 vdu_name = None
1768
1769 # Get additional parameters
1770 deploy_params = {}
1771 if db_nsr.get("additionalParamsForNs"):
1772 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1773 base_folder = nsd["_admin"]["storage"]
1774 self._deploy_n2vc(
1775 logging_text=logging_text,
1776 db_nsr=db_nsr,
1777 db_vnfr=db_vnfr,
1778 nslcmop_id=nslcmop_id,
1779 nsr_id=nsr_id,
1780 nsi_id=nsi_id,
1781 vnfd_id=vnfd_id,
1782 vdu_id=vdu_id,
1783 kdu_name=kdu_name,
1784 member_vnf_index=member_vnf_index,
1785 vdu_index=vdu_index,
1786 vdu_name=vdu_name,
1787 deploy_params=deploy_params,
1788 descriptor_config=descriptor_config,
1789 base_folder=base_folder,
1790 task_instantiation_info=tasks_dict_info
1791 )
1792
1793 # rest of staff will be done at finally
1794
1795 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1796 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1797 exc = e
1798 except asyncio.CancelledError:
1799 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1800 exc = "Operation was cancelled"
1801 except Exception as e:
1802 exc = traceback.format_exc()
1803 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1804 finally:
1805 if exc:
1806 error_list.append(str(exc))
1807 try:
1808 # wait for pending tasks
1809 if tasks_dict_info:
1810 stage[1] = "Waiting for instantiate pending tasks."
1811 self.logger.debug(logging_text + stage[1])
1812 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1813 stage, nslcmop_id, nsr_id=nsr_id)
1814 stage[1] = stage[2] = ""
1815 except asyncio.CancelledError:
1816 error_list.append("Cancelled")
1817 # TODO cancel all tasks
1818 except Exception as exc:
1819 error_list.append(str(exc))
1820
1821 # update operation-status
1822 db_nsr_update["operational-status"] = "running"
1823 # let's begin with VCA 'configured' status (later we can change it)
1824 db_nsr_update["config-status"] = "configured"
1825 for task, task_name in tasks_dict_info.items():
1826 if not task.done() or task.cancelled() or task.exception():
1827 if task_name.startswith(self.task_name_deploy_vca):
1828 # A N2VC task is pending
1829 db_nsr_update["config-status"] = "failed"
1830 else:
1831 # RO or KDU task is pending
1832 db_nsr_update["operational-status"] = "failed"
1833
1834 # update status at database
1835 if error_list:
1836 error_detail = "; ".join(error_list)
1837 self.logger.error(logging_text + error_detail)
1838 error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail)
1839 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, error_description_nslcmop)
1840
1841 db_nsr_update["detailed-status"] = error_description_nsr
1842 db_nslcmop_update["detailed-status"] = error_detail
1843 nslcmop_operation_state = "FAILED"
1844 ns_state = "BROKEN"
1845 else:
1846 error_description_nsr = error_description_nslcmop = None
1847 ns_state = "READY"
1848 db_nsr_update["detailed-status"] = "Done"
1849 db_nslcmop_update["detailed-status"] = "Done"
1850 nslcmop_operation_state = "COMPLETED"
1851
1852 if db_nsr:
1853 self._write_ns_status(
1854 nsr_id=nsr_id,
1855 ns_state=ns_state,
1856 current_operation="IDLE",
1857 current_operation_id=None,
1858 error_description=error_description_nsr,
1859 other_update=db_nsr_update
1860 )
1861 if db_nslcmop:
1862 self._write_op_status(
1863 op_id=nslcmop_id,
1864 stage="",
1865 error_message=error_description_nslcmop,
1866 operation_state=nslcmop_operation_state,
1867 other_update=db_nslcmop_update,
1868 )
1869
1870 if nslcmop_operation_state:
1871 try:
1872 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1873 "operationState": nslcmop_operation_state},
1874 loop=self.loop)
1875 except Exception as e:
1876 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1877
1878 self.logger.debug(logging_text + "Exit")
1879 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1880
1881 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1882
1883 # steps:
1884 # 1. find all relations for this VCA
1885 # 2. wait for other peers related
1886 # 3. add relations
1887
1888 try:
1889
1890 # STEP 1: find all relations for this VCA
1891
1892 # read nsr record
1893 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1894
1895 # this VCA data
1896 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1897
1898 # read all ns-configuration relations
1899 ns_relations = list()
1900 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1901 if db_ns_relations:
1902 for r in db_ns_relations:
1903 # check if this VCA is in the relation
1904 if my_vca.get('member-vnf-index') in\
1905 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1906 ns_relations.append(r)
1907
1908 # read all vnf-configuration relations
1909 vnf_relations = list()
1910 db_vnfd_list = db_nsr.get('vnfd-id')
1911 if db_vnfd_list:
1912 for vnfd in db_vnfd_list:
1913 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1914 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1915 if db_vnf_relations:
1916 for r in db_vnf_relations:
1917 # check if this VCA is in the relation
1918 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1919 vnf_relations.append(r)
1920
1921 # if no relations, terminate
1922 if not ns_relations and not vnf_relations:
1923 self.logger.debug(logging_text + ' No relations')
1924 return True
1925
1926 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1927
1928 # add all relations
1929 start = time()
1930 while True:
1931 # check timeout
1932 now = time()
1933 if now - start >= timeout:
1934 self.logger.error(logging_text + ' : timeout adding relations')
1935 return False
1936
1937 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1938 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1939
1940 # for each defined NS relation, find the VCA's related
1941 for r in ns_relations:
1942 from_vca_ee_id = None
1943 to_vca_ee_id = None
1944 from_vca_endpoint = None
1945 to_vca_endpoint = None
1946 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1947 for vca in vca_list:
1948 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1949 and vca.get('config_sw_installed'):
1950 from_vca_ee_id = vca.get('ee_id')
1951 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1952 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1953 and vca.get('config_sw_installed'):
1954 to_vca_ee_id = vca.get('ee_id')
1955 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1956 if from_vca_ee_id and to_vca_ee_id:
1957 # add relation
1958 await self.n2vc.add_relation(
1959 ee_id_1=from_vca_ee_id,
1960 ee_id_2=to_vca_ee_id,
1961 endpoint_1=from_vca_endpoint,
1962 endpoint_2=to_vca_endpoint)
1963 # remove entry from relations list
1964 ns_relations.remove(r)
1965 else:
1966 # check failed peers
1967 try:
1968 vca_status_list = db_nsr.get('configurationStatus')
1969 if vca_status_list:
1970 for i in range(len(vca_list)):
1971 vca = vca_list[i]
1972 vca_status = vca_status_list[i]
1973 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1974 if vca_status.get('status') == 'BROKEN':
1975 # peer broken: remove relation from list
1976 ns_relations.remove(r)
1977 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1978 if vca_status.get('status') == 'BROKEN':
1979 # peer broken: remove relation from list
1980 ns_relations.remove(r)
1981 except Exception:
1982 # ignore
1983 pass
1984
1985 # for each defined VNF relation, find the VCA's related
1986 for r in vnf_relations:
1987 from_vca_ee_id = None
1988 to_vca_ee_id = None
1989 from_vca_endpoint = None
1990 to_vca_endpoint = None
1991 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1992 for vca in vca_list:
1993 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1994 from_vca_ee_id = vca.get('ee_id')
1995 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1996 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1997 to_vca_ee_id = vca.get('ee_id')
1998 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1999 if from_vca_ee_id and to_vca_ee_id:
2000 # add relation
2001 await self.n2vc.add_relation(
2002 ee_id_1=from_vca_ee_id,
2003 ee_id_2=to_vca_ee_id,
2004 endpoint_1=from_vca_endpoint,
2005 endpoint_2=to_vca_endpoint)
2006 # remove entry from relations list
2007 vnf_relations.remove(r)
2008 else:
2009 # check failed peers
2010 try:
2011 vca_status_list = db_nsr.get('configurationStatus')
2012 if vca_status_list:
2013 for i in range(len(vca_list)):
2014 vca = vca_list[i]
2015 vca_status = vca_status_list[i]
2016 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2017 if vca_status.get('status') == 'BROKEN':
2018 # peer broken: remove relation from list
2019 ns_relations.remove(r)
2020 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2021 if vca_status.get('status') == 'BROKEN':
2022 # peer broken: remove relation from list
2023 ns_relations.remove(r)
2024 except Exception:
2025 # ignore
2026 pass
2027
2028 # wait for next try
2029 await asyncio.sleep(5.0)
2030
2031 if not ns_relations and not vnf_relations:
2032 self.logger.debug('Relations added')
2033 break
2034
2035 return True
2036
2037 except Exception as e:
2038 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2039 return False
2040
2041 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2042 # Launch kdus if present in the descriptor
2043
2044 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2045
2046 def _get_cluster_id(cluster_id, cluster_type):
2047 nonlocal k8scluster_id_2_uuic
2048 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2049 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2050
2051 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2052 if not db_k8scluster:
2053 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2054 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2055 if not k8s_id:
2056 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2057 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2058 return k8s_id
2059
2060 logging_text += "Deploy kdus: "
2061 step = ""
2062 try:
2063 db_nsr_update = {"_admin.deployed.K8s": []}
2064 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2065
2066 index = 0
2067 updated_cluster_list = []
2068
2069 for vnfr_data in db_vnfrs.values():
2070 for kdur in get_iterable(vnfr_data, "kdur"):
2071 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2072 vnfd_id = vnfr_data.get('vnfd-id')
2073 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2074 if kdur.get("helm-chart"):
2075 kdumodel = kdur["helm-chart"]
2076 k8sclustertype = "helm-chart"
2077 elif kdur.get("juju-bundle"):
2078 kdumodel = kdur["juju-bundle"]
2079 k8sclustertype = "juju-bundle"
2080 else:
2081 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2082 "juju-bundle. Maybe an old NBI version is running".
2083 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2084 # check if kdumodel is a file and exists
2085 try:
2086 # path format: /vnfdid/pkkdir/kdumodel
2087 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
2088 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2089 kdumodel = self.fs.path + filename
2090 except asyncio.CancelledError:
2091 raise
2092 except Exception: # it is not a file
2093 pass
2094
2095 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2096 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2097 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2098
2099 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2100 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2101 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2102 if del_repo_list or added_repo_dict:
2103 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2104 updated = {'_admin.helm_charts_added.' +
2105 item: name for item, name in added_repo_dict.items()}
2106 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2107 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2108 added_repo_dict))
2109 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2110 updated_cluster_list.append(cluster_uuid)
2111
2112 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2113 kdur["kdu-name"], k8s_cluster_id)
2114
2115 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2116 "k8scluster-type": k8sclustertype,
2117 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2118 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2119 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2120
2121 db_dict = {"collection": "nsrs",
2122 "filter": {"_id": nsr_id},
2123 "path": "_admin.deployed.K8s.{}".format(index)}
2124
2125 if k8sclustertype == "helm-chart":
2126 task = asyncio.ensure_future(
2127 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2128 params=desc_params, db_dict=db_dict, timeout=3600)
2129 )
2130 else:
2131 task = asyncio.ensure_future(
2132 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2133 atomic=True, params=desc_params,
2134 db_dict=db_dict, timeout=600,
2135 kdu_name=kdur["kdu-name"])
2136 )
2137
2138 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2139 task_instantiation_info[task] = "Deploy KDU {}".format(kdur["kdu-name"])
2140
2141 index += 1
2142
2143 except (LcmException, asyncio.CancelledError):
2144 raise
2145 except Exception as e:
2146 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2147 if isinstance(e, (N2VCException, DbException)):
2148 self.logger.error(logging_text + msg)
2149 else:
2150 self.logger.critical(logging_text + msg, exc_info=True)
2151 raise LcmException(msg)
2152 finally:
2153 if db_nsr_update:
2154 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2155
2156 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2157 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2158 base_folder, task_instantiation_info, stage):
2159 # launch instantiate_N2VC in a asyncio task and register task object
2160 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2161 # if not found, create one entry and update database
2162
2163 # fill db_nsr._admin.deployed.VCA.<index>
2164 vca_index = -1
2165 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2166 if not vca_deployed:
2167 continue
2168 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2169 vca_deployed.get("vdu_id") == vdu_id and \
2170 vca_deployed.get("kdu_name") == kdu_name and \
2171 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2172 break
2173 else:
2174 # not found, create one.
2175 vca_deployed = {
2176 "member-vnf-index": member_vnf_index,
2177 "vdu_id": vdu_id,
2178 "kdu_name": kdu_name,
2179 "vdu_count_index": vdu_index,
2180 "operational-status": "init", # TODO revise
2181 "detailed-status": "", # TODO revise
2182 "step": "initial-deploy", # TODO revise
2183 "vnfd_id": vnfd_id,
2184 "vdu_name": vdu_name,
2185 }
2186 vca_index += 1
2187
2188 # create VCA and configurationStatus in db
2189 db_dict = {
2190 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2191 "configurationStatus.{}".format(vca_index): dict()
2192 }
2193 self.update_db_2("nsrs", nsr_id, db_dict)
2194
2195 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2196
2197 # Launch task
2198 task_n2vc = asyncio.ensure_future(
2199 self.instantiate_N2VC(
2200 logging_text=logging_text,
2201 vca_index=vca_index,
2202 nsi_id=nsi_id,
2203 db_nsr=db_nsr,
2204 db_vnfr=db_vnfr,
2205 vdu_id=vdu_id,
2206 kdu_name=kdu_name,
2207 vdu_index=vdu_index,
2208 deploy_params=deploy_params,
2209 config_descriptor=descriptor_config,
2210 base_folder=base_folder,
2211 nslcmop_id=nslcmop_id,
2212 stage=stage
2213 )
2214 )
2215 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2216 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2217 member_vnf_index or "", vdu_id or "")
2218
2219 # Check if this VNFD has a configured terminate action
2220 def _has_terminate_config_primitive(self, vnfd):
2221 vnf_config = vnfd.get("vnf-configuration")
2222 if vnf_config and vnf_config.get("terminate-config-primitive"):
2223 return True
2224 else:
2225 return False
2226
2227 @staticmethod
2228 def _get_terminate_config_primitive_seq_list(vnfd):
2229 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2230 # No need to check for existing primitive twice, already done before
2231 vnf_config = vnfd.get("vnf-configuration")
2232 seq_list = vnf_config.get("terminate-config-primitive")
2233 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2234 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2235 return seq_list_sorted
2236
2237 @staticmethod
2238 def _create_nslcmop(nsr_id, operation, params):
2239 """
2240 Creates a ns-lcm-opp content to be stored at database.
2241 :param nsr_id: internal id of the instance
2242 :param operation: instantiate, terminate, scale, action, ...
2243 :param params: user parameters for the operation
2244 :return: dictionary following SOL005 format
2245 """
2246 # Raise exception if invalid arguments
2247 if not (nsr_id and operation and params):
2248 raise LcmException(
2249 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2250 now = time()
2251 _id = str(uuid4())
2252 nslcmop = {
2253 "id": _id,
2254 "_id": _id,
2255 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2256 "operationState": "PROCESSING",
2257 "statusEnteredTime": now,
2258 "nsInstanceId": nsr_id,
2259 "lcmOperationType": operation,
2260 "startTime": now,
2261 "isAutomaticInvocation": False,
2262 "operationParams": params,
2263 "isCancelPending": False,
2264 "links": {
2265 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2266 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2267 }
2268 }
2269 return nslcmop
2270
2271 def _format_additional_params(self, params):
2272 params = params or {}
2273 for key, value in params.items():
2274 if str(value).startswith("!!yaml "):
2275 params[key] = yaml.safe_load(value[7:])
2276 return params
2277
2278 def _get_terminate_primitive_params(self, seq, vnf_index):
2279 primitive = seq.get('name')
2280 primitive_params = {}
2281 params = {
2282 "member_vnf_index": vnf_index,
2283 "primitive": primitive,
2284 "primitive_params": primitive_params,
2285 }
2286 desc_params = {}
2287 return self._map_primitive_params(seq, params, desc_params)
2288
2289 # sub-operations
2290
2291 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2292 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2293 if (op.get('operationState') == 'COMPLETED'):
2294 # b. Skip sub-operation
2295 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2296 return self.SUBOPERATION_STATUS_SKIP
2297 else:
2298 # c. Reintent executing sub-operation
2299 # The sub-operation exists, and operationState != 'COMPLETED'
2300 # Update operationState = 'PROCESSING' to indicate a reintent.
2301 operationState = 'PROCESSING'
2302 detailed_status = 'In progress'
2303 self._update_suboperation_status(
2304 db_nslcmop, op_index, operationState, detailed_status)
2305 # Return the sub-operation index
2306 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2307 # with arguments extracted from the sub-operation
2308 return op_index
2309
2310 # Find a sub-operation where all keys in a matching dictionary must match
2311 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2312 def _find_suboperation(self, db_nslcmop, match):
2313 if (db_nslcmop and match):
2314 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2315 for i, op in enumerate(op_list):
2316 if all(op.get(k) == match[k] for k in match):
2317 return i
2318 return self.SUBOPERATION_STATUS_NOT_FOUND
2319
2320 # Update status for a sub-operation given its index
2321 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2322 # Update DB for HA tasks
2323 q_filter = {'_id': db_nslcmop['_id']}
2324 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2325 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2326 self.db.set_one("nslcmops",
2327 q_filter=q_filter,
2328 update_dict=update_dict,
2329 fail_on_empty=False)
2330
2331 # Add sub-operation, return the index of the added sub-operation
2332 # Optionally, set operationState, detailed-status, and operationType
2333 # Status and type are currently set for 'scale' sub-operations:
2334 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2335 # 'detailed-status' : status message
2336 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2337 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2338 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2339 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2340 RO_nsr_id=None, RO_scaling_info=None):
2341 if not db_nslcmop:
2342 return self.SUBOPERATION_STATUS_NOT_FOUND
2343 # Get the "_admin.operations" list, if it exists
2344 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2345 op_list = db_nslcmop_admin.get('operations')
2346 # Create or append to the "_admin.operations" list
2347 new_op = {'member_vnf_index': vnf_index,
2348 'vdu_id': vdu_id,
2349 'vdu_count_index': vdu_count_index,
2350 'primitive': primitive,
2351 'primitive_params': mapped_primitive_params}
2352 if operationState:
2353 new_op['operationState'] = operationState
2354 if detailed_status:
2355 new_op['detailed-status'] = detailed_status
2356 if operationType:
2357 new_op['lcmOperationType'] = operationType
2358 if RO_nsr_id:
2359 new_op['RO_nsr_id'] = RO_nsr_id
2360 if RO_scaling_info:
2361 new_op['RO_scaling_info'] = RO_scaling_info
2362 if not op_list:
2363 # No existing operations, create key 'operations' with current operation as first list element
2364 db_nslcmop_admin.update({'operations': [new_op]})
2365 op_list = db_nslcmop_admin.get('operations')
2366 else:
2367 # Existing operations, append operation to list
2368 op_list.append(new_op)
2369
2370 db_nslcmop_update = {'_admin.operations': op_list}
2371 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2372 op_index = len(op_list) - 1
2373 return op_index
2374
2375 # Helper methods for scale() sub-operations
2376
2377 # pre-scale/post-scale:
2378 # Check for 3 different cases:
2379 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2380 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2381 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2382 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2383 operationType, RO_nsr_id=None, RO_scaling_info=None):
2384 # Find this sub-operation
2385 if (RO_nsr_id and RO_scaling_info):
2386 operationType = 'SCALE-RO'
2387 match = {
2388 'member_vnf_index': vnf_index,
2389 'RO_nsr_id': RO_nsr_id,
2390 'RO_scaling_info': RO_scaling_info,
2391 }
2392 else:
2393 match = {
2394 'member_vnf_index': vnf_index,
2395 'primitive': vnf_config_primitive,
2396 'primitive_params': primitive_params,
2397 'lcmOperationType': operationType
2398 }
2399 op_index = self._find_suboperation(db_nslcmop, match)
2400 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2401 # a. New sub-operation
2402 # The sub-operation does not exist, add it.
2403 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2404 # The following parameters are set to None for all kind of scaling:
2405 vdu_id = None
2406 vdu_count_index = None
2407 vdu_name = None
2408 if (RO_nsr_id and RO_scaling_info):
2409 vnf_config_primitive = None
2410 primitive_params = None
2411 else:
2412 RO_nsr_id = None
2413 RO_scaling_info = None
2414 # Initial status for sub-operation
2415 operationState = 'PROCESSING'
2416 detailed_status = 'In progress'
2417 # Add sub-operation for pre/post-scaling (zero or more operations)
2418 self._add_suboperation(db_nslcmop,
2419 vnf_index,
2420 vdu_id,
2421 vdu_count_index,
2422 vdu_name,
2423 vnf_config_primitive,
2424 primitive_params,
2425 operationState,
2426 detailed_status,
2427 operationType,
2428 RO_nsr_id,
2429 RO_scaling_info)
2430 return self.SUBOPERATION_STATUS_NEW
2431 else:
2432 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2433 # or op_index (operationState != 'COMPLETED')
2434 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2435
2436 # Function to return execution_environment id
2437
2438 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2439 # TODO vdu_index_count
2440 for vca in vca_deployed_list:
2441 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2442 return vca["ee_id"]
2443
2444 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2445 """
2446 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2447 :param logging_text:
2448 :param db_nslcmop:
2449 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2450 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2451 :param vca_index: index in the database _admin.deployed.VCA
2452 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2453 :return: None or exception
2454 """
2455 # execute terminate_primitives
2456 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2457 vdu_id = vca_deployed.get("vdu_id")
2458 vdu_count_index = vca_deployed.get("vdu_count_index")
2459 vdu_name = vca_deployed.get("vdu_name")
2460 vnf_index = vca_deployed.get("member-vnf-index")
2461 if terminate_primitives and vca_deployed.get("needed_terminate"):
2462 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2463 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2464 for seq in terminate_primitives:
2465 # For each sequence in list, get primitive and call _ns_execute_primitive()
2466 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2467 vnf_index, seq.get("name"))
2468 self.logger.debug(logging_text + step)
2469 # Create the primitive for each sequence, i.e. "primitive": "touch"
2470 primitive = seq.get('name')
2471 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2472 # The following 3 parameters are currently set to None for 'terminate':
2473 # vdu_id, vdu_count_index, vdu_name
2474
2475 # Add sub-operation
2476 self._add_suboperation(db_nslcmop,
2477 vnf_index,
2478 vdu_id,
2479 vdu_count_index,
2480 vdu_name,
2481 primitive,
2482 mapped_primitive_params)
2483 # Sub-operations: Call _ns_execute_primitive() instead of action()
2484 try:
2485 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2486 mapped_primitive_params)
2487 except LcmException:
2488 # this happens when VCA is not deployed. In this case it is not needed to terminate
2489 continue
2490 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2491 if result not in result_ok:
2492 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2493 "error {}".format(seq.get("name"), vnf_index, result_detail))
2494 # set that this VCA do not need terminated
2495 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2496 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2497
2498 if destroy_ee:
2499 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2500
2501 async def _delete_N2VC(self, nsr_id: str):
2502 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2503 namespace = "." + nsr_id
2504 await self.n2vc.delete_namespace(namespace=namespace)
2505 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2506
2507 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2508 """
2509 Terminates a deployment from RO
2510 :param logging_text:
2511 :param nsr_deployed: db_nsr._admin.deployed
2512 :param nsr_id:
2513 :param nslcmop_id:
2514 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2515 this method will update only the index 2, but it will write on database the concatenated content of the list
2516 :return:
2517 """
2518 db_nsr_update = {}
2519 failed_detail = []
2520 ro_nsr_id = ro_delete_action = None
2521 if nsr_deployed and nsr_deployed.get("RO"):
2522 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2523 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2524 try:
2525 if ro_nsr_id:
2526 stage[2] = "Deleting ns from VIM."
2527 db_nsr_update["detailed-status"] = " ".join(stage)
2528 self._write_op_status(nslcmop_id, stage)
2529 self.logger.debug(logging_text + stage[2])
2530 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2531 self._write_op_status(nslcmop_id, stage)
2532 desc = await self.RO.delete("ns", ro_nsr_id)
2533 ro_delete_action = desc["action_id"]
2534 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2535 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2536 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2537 if ro_delete_action:
2538 # wait until NS is deleted from VIM
2539 stage[2] = "Waiting ns deleted from VIM."
2540 detailed_status_old = None
2541 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2542 ro_delete_action))
2543 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2544 self._write_op_status(nslcmop_id, stage)
2545
2546 delete_timeout = 20 * 60 # 20 minutes
2547 while delete_timeout > 0:
2548 desc = await self.RO.show(
2549 "ns",
2550 item_id_name=ro_nsr_id,
2551 extra_item="action",
2552 extra_item_id=ro_delete_action)
2553
2554 # deploymentStatus
2555 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2556
2557 ns_status, ns_status_info = self.RO.check_action_status(desc)
2558 if ns_status == "ERROR":
2559 raise ROclient.ROClientException(ns_status_info)
2560 elif ns_status == "BUILD":
2561 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2562 elif ns_status == "ACTIVE":
2563 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2564 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2565 break
2566 else:
2567 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2568 if stage[2] != detailed_status_old:
2569 detailed_status_old = stage[2]
2570 db_nsr_update["detailed-status"] = " ".join(stage)
2571 self._write_op_status(nslcmop_id, stage)
2572 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2573 await asyncio.sleep(5, loop=self.loop)
2574 delete_timeout -= 5
2575 else: # delete_timeout <= 0:
2576 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2577
2578 except Exception as e:
2579 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2580 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2581 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2582 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2583 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2584 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2585 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2586 failed_detail.append("RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2587 self.logger.debug(logging_text + failed_detail[-1])
2588 else:
2589 failed_detail.append("RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2590 self.logger.error(logging_text + failed_detail[-1])
2591
2592 # Delete nsd
2593 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2594 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2595 try:
2596 stage[2] = "Deleting nsd from RO."
2597 db_nsr_update["detailed-status"] = " ".join(stage)
2598 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2599 self._write_op_status(nslcmop_id, stage)
2600 await self.RO.delete("nsd", ro_nsd_id)
2601 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2602 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2603 except Exception as e:
2604 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2605 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2606 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2607 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2608 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2609 self.logger.debug(logging_text + failed_detail[-1])
2610 else:
2611 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2612 self.logger.error(logging_text + failed_detail[-1])
2613
2614 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2615 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2616 if not vnf_deployed or not vnf_deployed["id"]:
2617 continue
2618 try:
2619 ro_vnfd_id = vnf_deployed["id"]
2620 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2621 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2622 db_nsr_update["detailed-status"] = " ".join(stage)
2623 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2624 self._write_op_status(nslcmop_id, stage)
2625 await self.RO.delete("vnfd", ro_vnfd_id)
2626 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2627 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2628 except Exception as e:
2629 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2630 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2631 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2632 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2633 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2634 self.logger.debug(logging_text + failed_detail[-1])
2635 else:
2636 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2637 self.logger.error(logging_text + failed_detail[-1])
2638
2639 stage[2] = "Deleted from VIM"
2640 db_nsr_update["detailed-status"] = " ".join(stage)
2641 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2642 self._write_op_status(nslcmop_id, stage)
2643
2644 if failed_detail:
2645 raise LcmException("Error while {}: {}".format(stage[2], "; ".join(failed_detail)))
2646
2647 async def terminate(self, nsr_id, nslcmop_id):
2648 # Try to lock HA task here
2649 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2650 if not task_is_locked_by_me:
2651 return
2652
2653 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2654 self.logger.debug(logging_text + "Enter")
2655 timeout_ns_terminate = self.timeout_ns_terminate
2656 db_nsr = None
2657 db_nslcmop = None
2658 exc = None
2659 error_list = [] # annotates all failed error messages
2660 db_nslcmop_update = {}
2661 autoremove = False # autoremove after terminated
2662 tasks_dict_info = {}
2663 db_nsr_update = {}
2664 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2665 # ^ contains [stage, step, VIM-status]
2666 try:
2667 # wait for any previous tasks in process
2668 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2669
2670 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2671 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2672 operation_params = db_nslcmop.get("operationParams") or {}
2673 if operation_params.get("timeout_ns_terminate"):
2674 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2675 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2676 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2677
2678 db_nsr_update["operational-status"] = "terminating"
2679 db_nsr_update["config-status"] = "terminating"
2680 self._write_ns_status(
2681 nsr_id=nsr_id,
2682 ns_state="TERMINATING",
2683 current_operation="TERMINATING",
2684 current_operation_id=nslcmop_id,
2685 other_update=db_nsr_update
2686 )
2687 self._write_op_status(
2688 op_id=nslcmop_id,
2689 queuePosition=0,
2690 stage=stage
2691 )
2692 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2693 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2694 return
2695
2696 stage[1] = "Getting vnf descriptors from db."
2697 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2698 db_vnfds_from_id = {}
2699 db_vnfds_from_member_index = {}
2700 # Loop over VNFRs
2701 for vnfr in db_vnfrs_list:
2702 vnfd_id = vnfr["vnfd-id"]
2703 if vnfd_id not in db_vnfds_from_id:
2704 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2705 db_vnfds_from_id[vnfd_id] = vnfd
2706 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2707
2708 # Destroy individual execution environments when there are terminating primitives.
2709 # Rest of EE will be deleted at once
2710 if not operation_params.get("skip_terminate_primitives"):
2711 stage[0] = "Stage 2/3 execute terminating primitives."
2712 stage[1] = "Looking execution environment that needs terminate."
2713 self.logger.debug(logging_text + stage[1])
2714 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2715 config_descriptor = None
2716 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2717 continue
2718 if not vca.get("member-vnf-index"):
2719 # ns
2720 config_descriptor = db_nsr.get("ns-configuration")
2721 elif vca.get("vdu_id"):
2722 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2723 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2724 if vdud:
2725 config_descriptor = vdud.get("vdu-configuration")
2726 elif vca.get("kdu_name"):
2727 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2728 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2729 if kdud:
2730 config_descriptor = kdud.get("kdu-configuration")
2731 else:
2732 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2733 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2734 vca_index, False))
2735 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2736
2737 # wait for pending tasks of terminate primitives
2738 if tasks_dict_info:
2739 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2740 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2741 min(self.timeout_charm_delete, timeout_ns_terminate),
2742 stage, nslcmop_id)
2743 if error_list:
2744 return # raise LcmException("; ".join(error_list))
2745 tasks_dict_info.clear()
2746
2747 # remove All execution environments at once
2748 stage[0] = "Stage 3/3 delete all."
2749 stage[1] = "Deleting all execution environments."
2750 self.logger.debug(logging_text + stage[1])
2751
2752 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2753 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2754 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2755
2756 # Delete from k8scluster
2757 stage[1] = "Deleting KDUs."
2758 self.logger.debug(logging_text + stage[1])
2759 # print(nsr_deployed)
2760 for kdu in get_iterable(nsr_deployed, "K8s"):
2761 if not kdu or not kdu.get("kdu-instance"):
2762 continue
2763 kdu_instance = kdu.get("kdu-instance")
2764 if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
2765 task_delete_kdu_instance = asyncio.ensure_future(
2766 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2767 kdu_instance=kdu_instance))
2768 elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
2769 task_delete_kdu_instance = asyncio.ensure_future(
2770 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2771 kdu_instance=kdu_instance))
2772 else:
2773 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2774 format(kdu.get("k8scluster-type")))
2775 continue
2776 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2777
2778 # remove from RO
2779 stage[1] = "Deleting ns from VIM."
2780 task_delete_ro = asyncio.ensure_future(
2781 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2782 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2783
2784 # rest of staff will be done at finally
2785
2786 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2787 self.logger.error(logging_text + "Exit Exception {}".format(e))
2788 exc = e
2789 except asyncio.CancelledError:
2790 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2791 exc = "Operation was cancelled"
2792 except Exception as e:
2793 exc = traceback.format_exc()
2794 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2795 finally:
2796 if exc:
2797 error_list.append(str(exc))
2798 try:
2799 # wait for pending tasks
2800 if tasks_dict_info:
2801 stage[1] = "Waiting for terminate pending tasks."
2802 self.logger.debug(logging_text + stage[1])
2803 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2804 stage, nslcmop_id)
2805 stage[1] = stage[2] = ""
2806 except asyncio.CancelledError:
2807 error_list.append("Cancelled")
2808 # TODO cancell all tasks
2809 except Exception as exc:
2810 error_list.append(str(exc))
2811 # update status at database
2812 if error_list:
2813 error_detail = "; ".join(error_list)
2814 # self.logger.error(logging_text + error_detail)
2815 error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail)
2816 error_description_nsr = 'Operation: TERMINATING.{}, {}'.format(nslcmop_id, error_description_nslcmop)
2817
2818 db_nsr_update["operational-status"] = "failed"
2819 db_nsr_update["detailed-status"] = error_description_nsr
2820 db_nslcmop_update["detailed-status"] = error_detail
2821 nslcmop_operation_state = "FAILED"
2822 ns_state = "BROKEN"
2823 else:
2824 error_description_nsr = error_description_nslcmop = None
2825 ns_state = "NOT_INSTANTIATED"
2826 db_nsr_update["operational-status"] = "terminated"
2827 db_nsr_update["detailed-status"] = "Done"
2828 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2829 db_nslcmop_update["detailed-status"] = "Done"
2830 nslcmop_operation_state = "COMPLETED"
2831
2832 if db_nsr:
2833 self._write_ns_status(
2834 nsr_id=nsr_id,
2835 ns_state=ns_state,
2836 current_operation="IDLE",
2837 current_operation_id=None,
2838 error_description=error_description_nsr,
2839 other_update=db_nsr_update
2840 )
2841 if db_nslcmop:
2842 self._write_op_status(
2843 op_id=nslcmop_id,
2844 stage="",
2845 error_message=error_description_nslcmop,
2846 operation_state=nslcmop_operation_state,
2847 other_update=db_nslcmop_update,
2848 )
2849 autoremove = operation_params.get("autoremove", False)
2850 if nslcmop_operation_state:
2851 try:
2852 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2853 "operationState": nslcmop_operation_state,
2854 "autoremove": autoremove},
2855 loop=self.loop)
2856 except Exception as e:
2857 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2858
2859 self.logger.debug(logging_text + "Exit")
2860 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2861
2862 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2863 time_start = time()
2864 error_list = []
2865 pending_tasks = list(created_tasks_info.keys())
2866 num_tasks = len(pending_tasks)
2867 num_done = 0
2868 stage[1] = "{}/{}.".format(num_done, num_tasks)
2869 self._write_op_status(nslcmop_id, stage)
2870 new_error = False
2871 while pending_tasks:
2872 _timeout = timeout + time_start - time()
2873 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2874 return_when=asyncio.FIRST_COMPLETED)
2875 num_done += len(done)
2876 if not done: # Timeout
2877 for task in pending_tasks:
2878 error_list.append(created_tasks_info[task] + ": Timeout")
2879 break
2880 for task in done:
2881 if task.cancelled():
2882 self.logger.warn(logging_text + created_tasks_info[task] + ": Cancelled")
2883 error_list.append(created_tasks_info[task] + ": Cancelled")
2884 new_error = True
2885 else:
2886 exc = task.exception()
2887 if exc:
2888 error_list.append(created_tasks_info[task] + ": {}".format(exc))
2889 new_error = True
2890 if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)):
2891 self.logger.error(logging_text + created_tasks_info[task] + ": " + str(exc))
2892 else:
2893 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2894 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2895 else:
2896 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2897 stage[1] = "{}/{}.".format(num_done, num_tasks)
2898 if new_error:
2899 stage[1] += "Errors: " + ". ".join(error_list) + "."
2900 new_error = False
2901 if nsr_id: # update also nsr
2902 self.update_db_2("nsrs", nsr_id, {"errorDescription": ". ".join(error_list)})
2903 self._write_op_status(nslcmop_id, stage)
2904 return error_list
2905
2906 @staticmethod
2907 def _map_primitive_params(primitive_desc, params, instantiation_params):
2908 """
2909 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2910 The default-value is used. If it is between < > it look for a value at instantiation_params
2911 :param primitive_desc: portion of VNFD/NSD that describes primitive
2912 :param params: Params provided by user
2913 :param instantiation_params: Instantiation params provided by user
2914 :return: a dictionary with the calculated params
2915 """
2916 calculated_params = {}
2917 for parameter in primitive_desc.get("parameter", ()):
2918 param_name = parameter["name"]
2919 if param_name in params:
2920 calculated_params[param_name] = params[param_name]
2921 elif "default-value" in parameter or "value" in parameter:
2922 if "value" in parameter:
2923 calculated_params[param_name] = parameter["value"]
2924 else:
2925 calculated_params[param_name] = parameter["default-value"]
2926 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2927 and calculated_params[param_name].endswith(">"):
2928 if calculated_params[param_name][1:-1] in instantiation_params:
2929 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2930 else:
2931 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2932 format(calculated_params[param_name], primitive_desc["name"]))
2933 else:
2934 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2935 format(param_name, primitive_desc["name"]))
2936
2937 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2938 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2939 width=256)
2940 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2941 calculated_params[param_name] = calculated_params[param_name][7:]
2942
2943 # add always ns_config_info if primitive name is config
2944 if primitive_desc["name"] == "config":
2945 if "ns_config_info" in instantiation_params:
2946 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2947 return calculated_params
2948
2949 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None):
2950 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2951 for vca in deployed_vca:
2952 if not vca:
2953 continue
2954 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2955 continue
2956 if vdu_name and vdu_name != vca["vdu_name"]:
2957 continue
2958 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2959 continue
2960 if kdu_name and kdu_name != vca["kdu_name"]:
2961 continue
2962 break
2963 else:
2964 # vca_deployed not found
2965 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2966 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2967
2968 # get ee_id
2969 ee_id = vca.get("ee_id")
2970 if not ee_id:
2971 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2972 "execution environment"
2973 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2974 return ee_id
2975
2976 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
2977 retries_interval=30) -> (str, str):
2978 try:
2979 if primitive == "config":
2980 primitive_params = {"params": primitive_params}
2981
2982 while retries >= 0:
2983 try:
2984 output = await self.n2vc.exec_primitive(
2985 ee_id=ee_id,
2986 primitive_name=primitive,
2987 params_dict=primitive_params
2988 )
2989 # execution was OK
2990 break
2991 except Exception as e:
2992 retries -= 1
2993 if retries >= 0:
2994 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2995 # wait and retry
2996 await asyncio.sleep(retries_interval, loop=self.loop)
2997 else:
2998 return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e)
2999
3000 return 'COMPLETED', output
3001
3002 except LcmException:
3003 raise
3004 except Exception as e:
3005 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3006
3007 async def action(self, nsr_id, nslcmop_id):
3008
3009 # Try to lock HA task here
3010 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3011 if not task_is_locked_by_me:
3012 return
3013
3014 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3015 self.logger.debug(logging_text + "Enter")
3016 # get all needed from database
3017 db_nsr = None
3018 db_nslcmop = None
3019 db_nsr_update = {}
3020 db_nslcmop_update = {}
3021 nslcmop_operation_state = None
3022 nslcmop_operation_state_detail = None
3023 exc = None
3024 try:
3025 # wait for any previous tasks in process
3026 step = "Waiting for previous operations to terminate"
3027 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3028
3029 self._write_ns_status(
3030 nsr_id=nsr_id,
3031 ns_state=None,
3032 current_operation="RUNNING ACTION",
3033 current_operation_id=nslcmop_id
3034 )
3035
3036 step = "Getting information from database"
3037 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3038 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3039
3040 nsr_deployed = db_nsr["_admin"].get("deployed")
3041 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3042 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3043 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3044 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3045 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3046
3047 if vnf_index:
3048 step = "Getting vnfr from database"
3049 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3050 step = "Getting vnfd from database"
3051 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3052 else:
3053 if db_nsr.get("nsd"):
3054 db_nsd = db_nsr.get("nsd") # TODO this will be removed
3055 else:
3056 step = "Getting nsd from database"
3057 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3058
3059 # for backward compatibility
3060 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3061 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3062 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3063 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3064
3065 primitive = db_nslcmop["operationParams"]["primitive"]
3066 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3067
3068 # look for primitive
3069 config_primitive_desc = None
3070 if vdu_id:
3071 for vdu in get_iterable(db_vnfd, "vdu"):
3072 if vdu_id == vdu["id"]:
3073 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
3074 if config_primitive["name"] == primitive:
3075 config_primitive_desc = config_primitive
3076 break
3077 elif kdu_name:
3078 self.logger.debug(logging_text + "Checking actions in KDUs")
3079 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3080 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
3081 if primitive_params:
3082 desc_params.update(primitive_params)
3083 # TODO Check if we will need something at vnf level
3084 index = 0
3085 for kdu in get_iterable(nsr_deployed, "K8s"):
3086 if kdu_name == kdu["kdu-name"]:
3087 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3088 "path": "_admin.deployed.K8s.{}".format(index)}
3089 if primitive == "upgrade":
3090 if desc_params.get("kdu_model"):
3091 kdu_model = desc_params.get("kdu_model")
3092 del desc_params["kdu_model"]
3093 else:
3094 kdu_model = kdu.get("kdu-model")
3095 parts = kdu_model.split(sep=":")
3096 if len(parts) == 2:
3097 kdu_model = parts[0]
3098
3099 if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
3100 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3101 kdu_instance=kdu.get("kdu-instance"),
3102 atomic=True, kdu_model=kdu_model,
3103 params=desc_params, db_dict=db_dict,
3104 timeout=300)
3105 elif kdu.get("k8scluster-type")in ("juju-bundle", "juju"):
3106 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3107 kdu_instance=kdu.get("kdu-instance"),
3108 atomic=True, kdu_model=kdu_model,
3109 params=desc_params, db_dict=db_dict,
3110 timeout=300)
3111
3112 else:
3113 msg = "k8scluster-type not defined"
3114 raise LcmException(msg)
3115
3116 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3117 break
3118 elif primitive == "rollback":
3119 if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
3120 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3121 kdu_instance=kdu.get("kdu-instance"),
3122 db_dict=db_dict)
3123 elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
3124 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3125 kdu_instance=kdu.get("kdu-instance"),
3126 db_dict=db_dict)
3127 else:
3128 msg = "k8scluster-type not defined"
3129 raise LcmException(msg)
3130 break
3131 elif primitive == "status":
3132 if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
3133 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3134 kdu_instance=kdu.get("kdu-instance"))
3135 elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
3136 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3137 kdu_instance=kdu.get("kdu-instance"))
3138 else:
3139 msg = "k8scluster-type not defined"
3140 raise LcmException(msg)
3141 break
3142 index += 1
3143
3144 else:
3145 raise LcmException("KDU '{}' not found".format(kdu_name))
3146 if output:
3147 db_nslcmop_update["detailed-status"] = output
3148 db_nslcmop_update["operationState"] = 'COMPLETED'
3149 db_nslcmop_update["statusEnteredTime"] = time()
3150 else:
3151 db_nslcmop_update["detailed-status"] = ''
3152 db_nslcmop_update["operationState"] = 'FAILED'
3153 db_nslcmop_update["statusEnteredTime"] = time()
3154 return
3155 elif vnf_index:
3156 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3157 if config_primitive["name"] == primitive:
3158 config_primitive_desc = config_primitive
3159 break
3160 else:
3161 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3162 if config_primitive["name"] == primitive:
3163 config_primitive_desc = config_primitive
3164 break
3165
3166 if not config_primitive_desc:
3167 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3168 format(primitive))
3169
3170 desc_params = {}
3171 if vnf_index:
3172 if db_vnfr.get("additionalParamsForVnf"):
3173 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3174 if vdu_id:
3175 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3176 if vdur.get("additionalParams"):
3177 desc_params = self._format_additional_params(vdur["additionalParams"])
3178 else:
3179 if db_nsr.get("additionalParamsForNs"):
3180 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3181
3182 # TODO check if ns is in a proper status
3183 result, detailed_status = await self._ns_execute_primitive(
3184 self._look_for_deployed_vca(nsr_deployed["VCA"],
3185 member_vnf_index=vnf_index,
3186 vdu_id=vdu_id,
3187 vdu_name=vdu_name,
3188 vdu_count_index=vdu_count_index),
3189 primitive=primitive,
3190 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3191
3192 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3193 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3194 db_nslcmop_update["statusEnteredTime"] = time()
3195 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3196 return # database update is called inside finally
3197
3198 except (DbException, LcmException) as e:
3199 self.logger.error(logging_text + "Exit Exception {}".format(e))
3200 exc = e
3201 except asyncio.CancelledError:
3202 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3203 exc = "Operation was cancelled"
3204 except Exception as e:
3205 exc = traceback.format_exc()
3206 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3207 finally:
3208 if exc and db_nslcmop:
3209 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3210 "FAILED {}: {}".format(step, exc)
3211 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3212 db_nslcmop_update["statusEnteredTime"] = time()
3213 try:
3214 if db_nslcmop_update:
3215 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3216 if db_nsr:
3217 self._write_ns_status(
3218 nsr_id=nsr_id,
3219 ns_state=None,
3220 current_operation="IDLE",
3221 current_operation_id=None,
3222 other_update=db_nsr_update
3223 )
3224 if exc:
3225 self._write_op_status(
3226 op_id=nslcmop_id,
3227 error_message=nslcmop_operation_state_detail
3228 )
3229 except DbException as e:
3230 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3231 self.logger.debug(logging_text + "Exit")
3232 if nslcmop_operation_state:
3233 try:
3234 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3235 "operationState": nslcmop_operation_state},
3236 loop=self.loop)
3237 except Exception as e:
3238 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3239 self.logger.debug(logging_text + "Exit")
3240 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3241 return nslcmop_operation_state, nslcmop_operation_state_detail
3242
3243 async def scale(self, nsr_id, nslcmop_id):
3244
3245 # Try to lock HA task here
3246 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3247 if not task_is_locked_by_me:
3248 return
3249
3250 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3251 self.logger.debug(logging_text + "Enter")
3252 # get all needed from database
3253 db_nsr = None
3254 db_nslcmop = None
3255 db_nslcmop_update = {}
3256 nslcmop_operation_state = None
3257 db_nsr_update = {}
3258 exc = None
3259 # in case of error, indicates what part of scale was failed to put nsr at error status
3260 scale_process = None
3261 old_operational_status = ""
3262 old_config_status = ""
3263 vnfr_scaled = False
3264 try:
3265 # wait for any previous tasks in process
3266 step = "Waiting for previous operations to terminate"
3267 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3268
3269 self._write_ns_status(
3270 nsr_id=nsr_id,
3271 ns_state=None,
3272 current_operation="SCALING",
3273 current_operation_id=nslcmop_id
3274 )
3275
3276 step = "Getting nslcmop from database"
3277 self.logger.debug(step + " after having waited for previous tasks to be completed")
3278 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3279 step = "Getting nsr from database"
3280 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3281
3282 old_operational_status = db_nsr["operational-status"]
3283 old_config_status = db_nsr["config-status"]
3284 step = "Parsing scaling parameters"
3285 # self.logger.debug(step)
3286 db_nsr_update["operational-status"] = "scaling"
3287 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3288 nsr_deployed = db_nsr["_admin"].get("deployed")
3289
3290 #######
3291 nsr_deployed = db_nsr["_admin"].get("deployed")
3292 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3293 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3294 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3295 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3296 #######
3297
3298 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3299 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3300 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3301 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3302 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3303
3304 # for backward compatibility
3305 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3306 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3307 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3308 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3309
3310 step = "Getting vnfr from database"
3311 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3312 step = "Getting vnfd from database"
3313 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3314
3315 step = "Getting scaling-group-descriptor"
3316 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3317 if scaling_descriptor["name"] == scaling_group:
3318 break
3319 else:
3320 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3321 "at vnfd:scaling-group-descriptor".format(scaling_group))
3322
3323 # cooldown_time = 0
3324 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3325 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3326 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3327 # break
3328
3329 # TODO check if ns is in a proper status
3330 step = "Sending scale order to VIM"
3331 nb_scale_op = 0
3332 if not db_nsr["_admin"].get("scaling-group"):
3333 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3334 admin_scale_index = 0
3335 else:
3336 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3337 if admin_scale_info["name"] == scaling_group:
3338 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3339 break
3340 else: # not found, set index one plus last element and add new entry with the name
3341 admin_scale_index += 1
3342 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3343 RO_scaling_info = []
3344 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3345 if scaling_type == "SCALE_OUT":
3346 # count if max-instance-count is reached
3347 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3348 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3349 if nb_scale_op >= max_instance_count:
3350 raise LcmException("reached the limit of {} (max-instance-count) "
3351 "scaling-out operations for the "
3352 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3353
3354 nb_scale_op += 1
3355 vdu_scaling_info["scaling_direction"] = "OUT"
3356 vdu_scaling_info["vdu-create"] = {}
3357 for vdu_scale_info in scaling_descriptor["vdu"]:
3358 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3359 "type": "create", "count": vdu_scale_info.get("count", 1)})
3360 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3361
3362 elif scaling_type == "SCALE_IN":
3363 # count if min-instance-count is reached
3364 min_instance_count = 0
3365 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3366 min_instance_count = int(scaling_descriptor["min-instance-count"])
3367 if nb_scale_op <= min_instance_count:
3368 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3369 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3370 nb_scale_op -= 1
3371 vdu_scaling_info["scaling_direction"] = "IN"
3372 vdu_scaling_info["vdu-delete"] = {}
3373 for vdu_scale_info in scaling_descriptor["vdu"]:
3374 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3375 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3376 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3377
3378 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3379 vdu_create = vdu_scaling_info.get("vdu-create")
3380 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3381 if vdu_scaling_info["scaling_direction"] == "IN":
3382 for vdur in reversed(db_vnfr["vdur"]):
3383 if vdu_delete.get(vdur["vdu-id-ref"]):
3384 vdu_delete[vdur["vdu-id-ref"]] -= 1
3385 vdu_scaling_info["vdu"].append({
3386 "name": vdur["name"],
3387 "vdu_id": vdur["vdu-id-ref"],
3388 "interface": []
3389 })
3390 for interface in vdur["interfaces"]:
3391 vdu_scaling_info["vdu"][-1]["interface"].append({
3392 "name": interface["name"],
3393 "ip_address": interface["ip-address"],
3394 "mac_address": interface.get("mac-address"),
3395 })
3396 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3397
3398 # PRE-SCALE BEGIN
3399 step = "Executing pre-scale vnf-config-primitive"
3400 if scaling_descriptor.get("scaling-config-action"):
3401 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3402 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3403 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3404 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3405 step = db_nslcmop_update["detailed-status"] = \
3406 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3407
3408 # look for primitive
3409 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3410 if config_primitive["name"] == vnf_config_primitive:
3411 break
3412 else:
3413 raise LcmException(
3414 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3415 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3416 "primitive".format(scaling_group, config_primitive))
3417
3418 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3419 if db_vnfr.get("additionalParamsForVnf"):
3420 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3421
3422 scale_process = "VCA"
3423 db_nsr_update["config-status"] = "configuring pre-scaling"
3424 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3425
3426 # Pre-scale reintent check: Check if this sub-operation has been executed before
3427 op_index = self._check_or_add_scale_suboperation(
3428 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3429 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3430 # Skip sub-operation
3431 result = 'COMPLETED'
3432 result_detail = 'Done'
3433 self.logger.debug(logging_text +
3434 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3435 vnf_config_primitive, result, result_detail))
3436 else:
3437 if (op_index == self.SUBOPERATION_STATUS_NEW):
3438 # New sub-operation: Get index of this sub-operation
3439 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3440 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3441 format(vnf_config_primitive))
3442 else:
3443 # Reintent: Get registered params for this existing sub-operation
3444 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3445 vnf_index = op.get('member_vnf_index')
3446 vnf_config_primitive = op.get('primitive')
3447 primitive_params = op.get('primitive_params')
3448 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3449 format(vnf_config_primitive))
3450 # Execute the primitive, either with new (first-time) or registered (reintent) args
3451 result, result_detail = await self._ns_execute_primitive(
3452 self._look_for_deployed_vca(nsr_deployed["VCA"],
3453 member_vnf_index=vnf_index,
3454 vdu_id=None,
3455 vdu_name=None,
3456 vdu_count_index=None),
3457 vnf_config_primitive, primitive_params)
3458 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3459 vnf_config_primitive, result, result_detail))
3460 # Update operationState = COMPLETED | FAILED
3461 self._update_suboperation_status(
3462 db_nslcmop, op_index, result, result_detail)
3463
3464 if result == "FAILED":
3465 raise LcmException(result_detail)
3466 db_nsr_update["config-status"] = old_config_status
3467 scale_process = None
3468 # PRE-SCALE END
3469
3470 # SCALE RO - BEGIN
3471 # Should this block be skipped if 'RO_nsr_id' == None ?
3472 # if (RO_nsr_id and RO_scaling_info):
3473 if RO_scaling_info:
3474 scale_process = "RO"
3475 # Scale RO reintent check: Check if this sub-operation has been executed before
3476 op_index = self._check_or_add_scale_suboperation(
3477 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3478 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3479 # Skip sub-operation
3480 result = 'COMPLETED'
3481 result_detail = 'Done'
3482 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3483 result, result_detail))
3484 else:
3485 if (op_index == self.SUBOPERATION_STATUS_NEW):
3486 # New sub-operation: Get index of this sub-operation
3487 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3488 self.logger.debug(logging_text + "New sub-operation RO")
3489 else:
3490 # Reintent: Get registered params for this existing sub-operation
3491 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3492 RO_nsr_id = op.get('RO_nsr_id')
3493 RO_scaling_info = op.get('RO_scaling_info')
3494 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3495 vnf_config_primitive))
3496
3497 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3498 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3499 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3500 # wait until ready
3501 RO_nslcmop_id = RO_desc["instance_action_id"]
3502 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3503
3504 RO_task_done = False
3505 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3506 detailed_status_old = None
3507 self.logger.debug(logging_text + step)
3508
3509 deployment_timeout = 1 * 3600 # One hour
3510 while deployment_timeout > 0:
3511 if not RO_task_done:
3512 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3513 extra_item_id=RO_nslcmop_id)
3514
3515 # deploymentStatus
3516 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3517
3518 ns_status, ns_status_info = self.RO.check_action_status(desc)
3519 if ns_status == "ERROR":
3520 raise ROclient.ROClientException(ns_status_info)
3521 elif ns_status == "BUILD":
3522 detailed_status = step + "; {}".format(ns_status_info)
3523 elif ns_status == "ACTIVE":
3524 RO_task_done = True
3525 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3526 self.logger.debug(logging_text + step)
3527 else:
3528 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3529 else:
3530
3531 if ns_status == "ERROR":
3532 raise ROclient.ROClientException(ns_status_info)
3533 elif ns_status == "BUILD":
3534 detailed_status = step + "; {}".format(ns_status_info)
3535 elif ns_status == "ACTIVE":
3536 step = detailed_status = \
3537 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3538 if not vnfr_scaled:
3539 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3540 vnfr_scaled = True
3541 try:
3542 desc = await self.RO.show("ns", RO_nsr_id)
3543
3544 # deploymentStatus
3545 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3546
3547 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3548 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3549 break
3550 except LcmExceptionNoMgmtIP:
3551 pass
3552 else:
3553 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3554 if detailed_status != detailed_status_old:
3555 self._update_suboperation_status(
3556 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3557 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3558 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3559
3560 await asyncio.sleep(5, loop=self.loop)
3561 deployment_timeout -= 5
3562 if deployment_timeout <= 0:
3563 self._update_suboperation_status(
3564 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3565 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3566
3567 # update VDU_SCALING_INFO with the obtained ip_addresses
3568 if vdu_scaling_info["scaling_direction"] == "OUT":
3569 for vdur in reversed(db_vnfr["vdur"]):
3570 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3571 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3572 vdu_scaling_info["vdu"].append({
3573 "name": vdur["name"],
3574 "vdu_id": vdur["vdu-id-ref"],
3575 "interface": []
3576 })
3577 for interface in vdur["interfaces"]:
3578 vdu_scaling_info["vdu"][-1]["interface"].append({
3579 "name": interface["name"],
3580 "ip_address": interface["ip-address"],
3581 "mac_address": interface.get("mac-address"),
3582 })
3583 del vdu_scaling_info["vdu-create"]
3584
3585 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3586 # SCALE RO - END
3587
3588 scale_process = None
3589 if db_nsr_update:
3590 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3591
3592 # POST-SCALE BEGIN
3593 # execute primitive service POST-SCALING
3594 step = "Executing post-scale vnf-config-primitive"
3595 if scaling_descriptor.get("scaling-config-action"):
3596 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3597 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3598 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3599 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3600 step = db_nslcmop_update["detailed-status"] = \
3601 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3602
3603 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3604 if db_vnfr.get("additionalParamsForVnf"):
3605 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3606
3607 # look for primitive
3608 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3609 if config_primitive["name"] == vnf_config_primitive:
3610 break
3611 else:
3612 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3613 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3614 "match any vnf-configuration:config-primitive".format(scaling_group,
3615 config_primitive))
3616 scale_process = "VCA"
3617 db_nsr_update["config-status"] = "configuring post-scaling"
3618 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3619
3620 # Post-scale reintent check: Check if this sub-operation has been executed before
3621 op_index = self._check_or_add_scale_suboperation(
3622 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3623 if op_index == self.SUBOPERATION_STATUS_SKIP:
3624 # Skip sub-operation
3625 result = 'COMPLETED'
3626 result_detail = 'Done'
3627 self.logger.debug(logging_text +
3628 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3629 format(vnf_config_primitive, result, result_detail))
3630 else:
3631 if op_index == self.SUBOPERATION_STATUS_NEW:
3632 # New sub-operation: Get index of this sub-operation
3633 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3634 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3635 format(vnf_config_primitive))
3636 else:
3637 # Reintent: Get registered params for this existing sub-operation
3638 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3639 vnf_index = op.get('member_vnf_index')
3640 vnf_config_primitive = op.get('primitive')
3641 primitive_params = op.get('primitive_params')
3642 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3643 format(vnf_config_primitive))
3644 # Execute the primitive, either with new (first-time) or registered (reintent) args
3645 result, result_detail = await self._ns_execute_primitive(
3646 self._look_for_deployed_vca(nsr_deployed["VCA"],
3647 member_vnf_index=vnf_index,
3648 vdu_id=None,
3649 vdu_name=None,
3650 vdu_count_index=None),
3651 vnf_config_primitive, primitive_params)
3652 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3653 vnf_config_primitive, result, result_detail))
3654 # Update operationState = COMPLETED | FAILED
3655 self._update_suboperation_status(
3656 db_nslcmop, op_index, result, result_detail)
3657
3658 if result == "FAILED":
3659 raise LcmException(result_detail)
3660 db_nsr_update["config-status"] = old_config_status
3661 scale_process = None
3662 # POST-SCALE END
3663
3664 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3665 db_nslcmop_update["statusEnteredTime"] = time()
3666 db_nslcmop_update["detailed-status"] = "done"
3667 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3668 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3669 else old_operational_status
3670 db_nsr_update["config-status"] = old_config_status
3671 return
3672 except (ROclient.ROClientException, DbException, LcmException) as e:
3673 self.logger.error(logging_text + "Exit Exception {}".format(e))
3674 exc = e
3675 except asyncio.CancelledError:
3676 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3677 exc = "Operation was cancelled"
3678 except Exception as e:
3679 exc = traceback.format_exc()
3680 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3681 finally:
3682 self._write_ns_status(
3683 nsr_id=nsr_id,
3684 ns_state=None,
3685 current_operation="IDLE",
3686 current_operation_id=None
3687 )
3688 if exc:
3689 if db_nslcmop:
3690 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3691 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3692 db_nslcmop_update["statusEnteredTime"] = time()
3693 if db_nsr:
3694 db_nsr_update["operational-status"] = old_operational_status
3695 db_nsr_update["config-status"] = old_config_status
3696 db_nsr_update["detailed-status"] = ""
3697 if scale_process:
3698 if "VCA" in scale_process:
3699 db_nsr_update["config-status"] = "failed"
3700 if "RO" in scale_process:
3701 db_nsr_update["operational-status"] = "failed"
3702 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3703 exc)
3704 try:
3705 if db_nslcmop and db_nslcmop_update:
3706 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3707 if db_nsr:
3708 self._write_ns_status(
3709 nsr_id=nsr_id,
3710 ns_state=None,
3711 current_operation="IDLE",
3712 current_operation_id=None,
3713 other_update=db_nsr_update
3714 )
3715
3716 except DbException as e:
3717 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3718 if nslcmop_operation_state:
3719 try:
3720 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3721 "operationState": nslcmop_operation_state},
3722 loop=self.loop)
3723 # if cooldown_time:
3724 # await asyncio.sleep(cooldown_time, loop=self.loop)
3725 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3726 except Exception as e:
3727 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3728 self.logger.debug(logging_text + "Exit")
3729 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")