fix at n2vc_deploy
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52
53 SUBOPERATION_STATUS_NOT_FOUND = -1
54 SUBOPERATION_STATUS_NEW = -2
55 SUBOPERATION_STATUS_SKIP = -3
56 task_name_deploy_vca = "Deploying VCA"
57
58 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 super().__init__(
65 db=db,
66 msg=msg,
67 fs=fs,
68 logger=logging.getLogger('lcm.ns')
69 )
70
71 self.loop = loop
72 self.lcm_tasks = lcm_tasks
73 self.timeout = config["timeout"]
74 self.ro_config = config["ro_config"]
75 self.vca_config = config["VCA"].copy()
76
77 # create N2VC connector
78 self.n2vc = N2VCJujuConnector(
79 db=self.db,
80 fs=self.fs,
81 log=self.logger,
82 loop=self.loop,
83 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
84 username=self.vca_config.get('user', None),
85 vca_config=self.vca_config,
86 on_update_db=self._on_update_n2vc_db
87 )
88
89 self.k8sclusterhelm = K8sHelmConnector(
90 kubectl_command=self.vca_config.get("kubectlpath"),
91 helm_command=self.vca_config.get("helmpath"),
92 fs=self.fs,
93 log=self.logger,
94 db=self.db,
95 on_update_db=None,
96 )
97
98 self.k8sclusterjuju = K8sJujuConnector(
99 kubectl_command=self.vca_config.get("kubectlpath"),
100 juju_command=self.vca_config.get("jujupath"),
101 fs=self.fs,
102 log=self.logger,
103 db=self.db,
104 on_update_db=None,
105 )
106
107 self.k8scluster_map = {
108 "helm-chart": self.k8sclusterhelm,
109 "chart": self.k8sclusterhelm,
110 "juju-bundle": self.k8sclusterjuju,
111 "juju": self.k8sclusterjuju,
112 }
113 # create RO client
114 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
115
116 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
117
118 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
119
120 try:
121 # TODO filter RO descriptor fields...
122
123 # write to database
124 db_dict = dict()
125 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
126 db_dict['deploymentStatus'] = ro_descriptor
127 self.update_db_2("nsrs", nsrs_id, db_dict)
128
129 except Exception as e:
130 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
131
132 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
133
134 # remove last dot from path (if exists)
135 if path.endswith('.'):
136 path = path[:-1]
137
138 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
139 # .format(table, filter, path, updated_data))
140
141 try:
142
143 nsr_id = filter.get('_id')
144
145 # read ns record from database
146 nsr = self.db.get_one(table='nsrs', q_filter=filter)
147 current_ns_status = nsr.get('nsState')
148
149 # get vca status for NS
150 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
151
152 # vcaStatus
153 db_dict = dict()
154 db_dict['vcaStatus'] = status_dict
155
156 # update configurationStatus for this VCA
157 try:
158 vca_index = int(path[path.rfind(".")+1:])
159
160 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
161 vca_status = vca_list[vca_index].get('status')
162
163 configuration_status_list = nsr.get('configurationStatus')
164 config_status = configuration_status_list[vca_index].get('status')
165
166 if config_status == 'BROKEN' and vca_status != 'failed':
167 db_dict['configurationStatus'][vca_index] = 'READY'
168 elif config_status != 'BROKEN' and vca_status == 'failed':
169 db_dict['configurationStatus'][vca_index] = 'BROKEN'
170 except Exception as e:
171 # not update configurationStatus
172 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
173
174 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
175 # if nsState = 'DEGRADED' check if all is OK
176 is_degraded = False
177 if current_ns_status in ('READY', 'DEGRADED'):
178 error_description = ''
179 # check machines
180 if status_dict.get('machines'):
181 for machine_id in status_dict.get('machines'):
182 machine = status_dict.get('machines').get(machine_id)
183 # check machine agent-status
184 if machine.get('agent-status'):
185 s = machine.get('agent-status').get('status')
186 if s != 'started':
187 is_degraded = True
188 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
189 # check machine instance status
190 if machine.get('instance-status'):
191 s = machine.get('instance-status').get('status')
192 if s != 'running':
193 is_degraded = True
194 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
195 # check applications
196 if status_dict.get('applications'):
197 for app_id in status_dict.get('applications'):
198 app = status_dict.get('applications').get(app_id)
199 # check application status
200 if app.get('status'):
201 s = app.get('status').get('status')
202 if s != 'active':
203 is_degraded = True
204 error_description += 'application {} status={} ; '.format(app_id, s)
205
206 if error_description:
207 db_dict['errorDescription'] = error_description
208 if current_ns_status == 'READY' and is_degraded:
209 db_dict['nsState'] = 'DEGRADED'
210 if current_ns_status == 'DEGRADED' and not is_degraded:
211 db_dict['nsState'] = 'READY'
212
213 # write to database
214 self.update_db_2("nsrs", nsr_id, db_dict)
215
216 except Exception as e:
217 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
218
219 return
220
221 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
222 """
223 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
224 :param vnfd: input vnfd
225 :param new_id: overrides vnf id if provided
226 :param additionalParams: Instantiation params for VNFs provided
227 :param nsrId: Id of the NSR
228 :return: copy of vnfd
229 """
230 try:
231 vnfd_RO = deepcopy(vnfd)
232 # remove unused by RO configuration, monitoring, scaling and internal keys
233 vnfd_RO.pop("_id", None)
234 vnfd_RO.pop("_admin", None)
235 vnfd_RO.pop("vnf-configuration", None)
236 vnfd_RO.pop("monitoring-param", None)
237 vnfd_RO.pop("scaling-group-descriptor", None)
238 vnfd_RO.pop("kdu", None)
239 vnfd_RO.pop("k8s-cluster", None)
240 if new_id:
241 vnfd_RO["id"] = new_id
242
243 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
244 for vdu in get_iterable(vnfd_RO, "vdu"):
245 cloud_init_file = None
246 if vdu.get("cloud-init-file"):
247 base_folder = vnfd["_admin"]["storage"]
248 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
249 vdu["cloud-init-file"])
250 with self.fs.file_open(cloud_init_file, "r") as ci_file:
251 cloud_init_content = ci_file.read()
252 vdu.pop("cloud-init-file", None)
253 elif vdu.get("cloud-init"):
254 cloud_init_content = vdu["cloud-init"]
255 else:
256 continue
257
258 env = Environment()
259 ast = env.parse(cloud_init_content)
260 mandatory_vars = meta.find_undeclared_variables(ast)
261 if mandatory_vars:
262 for var in mandatory_vars:
263 if not additionalParams or var not in additionalParams.keys():
264 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
265 "file, must be provided in the instantiation parameters inside the "
266 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
267 template = Template(cloud_init_content)
268 cloud_init_content = template.render(additionalParams or {})
269 vdu["cloud-init"] = cloud_init_content
270
271 return vnfd_RO
272 except FsException as e:
273 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
274 format(vnfd["id"], vdu["id"], cloud_init_file, e))
275 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
276 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
277 format(vnfd["id"], vdu["id"], e))
278
279 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
280 """
281 Creates a RO ns descriptor from OSM ns_instantiate params
282 :param ns_params: OSM instantiate params
283 :return: The RO ns descriptor
284 """
285 vim_2_RO = {}
286 wim_2_RO = {}
287 # TODO feature 1417: Check that no instantiation is set over PDU
288 # check if PDU forces a concrete vim-network-id and add it
289 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
290
291 def vim_account_2_RO(vim_account):
292 if vim_account in vim_2_RO:
293 return vim_2_RO[vim_account]
294
295 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
296 if db_vim["_admin"]["operationalState"] != "ENABLED":
297 raise LcmException("VIM={} is not available. operationalState={}".format(
298 vim_account, db_vim["_admin"]["operationalState"]))
299 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
300 vim_2_RO[vim_account] = RO_vim_id
301 return RO_vim_id
302
303 def wim_account_2_RO(wim_account):
304 if isinstance(wim_account, str):
305 if wim_account in wim_2_RO:
306 return wim_2_RO[wim_account]
307
308 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
309 if db_wim["_admin"]["operationalState"] != "ENABLED":
310 raise LcmException("WIM={} is not available. operationalState={}".format(
311 wim_account, db_wim["_admin"]["operationalState"]))
312 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
313 wim_2_RO[wim_account] = RO_wim_id
314 return RO_wim_id
315 else:
316 return wim_account
317
318 def ip_profile_2_RO(ip_profile):
319 RO_ip_profile = deepcopy((ip_profile))
320 if "dns-server" in RO_ip_profile:
321 if isinstance(RO_ip_profile["dns-server"], list):
322 RO_ip_profile["dns-address"] = []
323 for ds in RO_ip_profile.pop("dns-server"):
324 RO_ip_profile["dns-address"].append(ds['address'])
325 else:
326 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
327 if RO_ip_profile.get("ip-version") == "ipv4":
328 RO_ip_profile["ip-version"] = "IPv4"
329 if RO_ip_profile.get("ip-version") == "ipv6":
330 RO_ip_profile["ip-version"] = "IPv6"
331 if "dhcp-params" in RO_ip_profile:
332 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
333 return RO_ip_profile
334
335 if not ns_params:
336 return None
337 RO_ns_params = {
338 # "name": ns_params["nsName"],
339 # "description": ns_params.get("nsDescription"),
340 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
341 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
342 # "scenario": ns_params["nsdId"],
343 }
344
345 n2vc_key_list = n2vc_key_list or []
346 for vnfd_ref, vnfd in vnfd_dict.items():
347 vdu_needed_access = []
348 mgmt_cp = None
349 if vnfd.get("vnf-configuration"):
350 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
351 if ssh_required and vnfd.get("mgmt-interface"):
352 if vnfd["mgmt-interface"].get("vdu-id"):
353 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
354 elif vnfd["mgmt-interface"].get("cp"):
355 mgmt_cp = vnfd["mgmt-interface"]["cp"]
356
357 for vdu in vnfd.get("vdu", ()):
358 if vdu.get("vdu-configuration"):
359 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
360 if ssh_required:
361 vdu_needed_access.append(vdu["id"])
362 elif mgmt_cp:
363 for vdu_interface in vdu.get("interface"):
364 if vdu_interface.get("external-connection-point-ref") and \
365 vdu_interface["external-connection-point-ref"] == mgmt_cp:
366 vdu_needed_access.append(vdu["id"])
367 mgmt_cp = None
368 break
369
370 if vdu_needed_access:
371 for vnf_member in nsd.get("constituent-vnfd"):
372 if vnf_member["vnfd-id-ref"] != vnfd_ref:
373 continue
374 for vdu in vdu_needed_access:
375 populate_dict(RO_ns_params,
376 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
377 n2vc_key_list)
378
379 if ns_params.get("vduImage"):
380 RO_ns_params["vduImage"] = ns_params["vduImage"]
381
382 if ns_params.get("ssh_keys"):
383 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
384 for vnf_params in get_iterable(ns_params, "vnf"):
385 for constituent_vnfd in nsd["constituent-vnfd"]:
386 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
387 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
388 break
389 else:
390 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
391 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
392 if vnf_params.get("vimAccountId"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
394 vim_account_2_RO(vnf_params["vimAccountId"]))
395
396 for vdu_params in get_iterable(vnf_params, "vdu"):
397 # TODO feature 1417: check that this VDU exist and it is not a PDU
398 if vdu_params.get("volume"):
399 for volume_params in vdu_params["volume"]:
400 if volume_params.get("vim-volume-id"):
401 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
402 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
403 volume_params["vim-volume-id"])
404 if vdu_params.get("interface"):
405 for interface_params in vdu_params["interface"]:
406 if interface_params.get("ip-address"):
407 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
408 vdu_params["id"], "interfaces", interface_params["name"],
409 "ip_address"),
410 interface_params["ip-address"])
411 if interface_params.get("mac-address"):
412 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
413 vdu_params["id"], "interfaces", interface_params["name"],
414 "mac_address"),
415 interface_params["mac-address"])
416 if interface_params.get("floating-ip-required"):
417 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
418 vdu_params["id"], "interfaces", interface_params["name"],
419 "floating-ip"),
420 interface_params["floating-ip-required"])
421
422 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
423 if internal_vld_params.get("vim-network-name"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "vim-network-name"),
426 internal_vld_params["vim-network-name"])
427 if internal_vld_params.get("vim-network-id"):
428 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
429 internal_vld_params["name"], "vim-network-id"),
430 internal_vld_params["vim-network-id"])
431 if internal_vld_params.get("ip-profile"):
432 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
433 internal_vld_params["name"], "ip-profile"),
434 ip_profile_2_RO(internal_vld_params["ip-profile"]))
435 if internal_vld_params.get("provider-network"):
436
437 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
438 internal_vld_params["name"], "provider-network"),
439 internal_vld_params["provider-network"].copy())
440
441 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
442 # look for interface
443 iface_found = False
444 for vdu_descriptor in vnf_descriptor["vdu"]:
445 for vdu_interface in vdu_descriptor["interface"]:
446 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
447 if icp_params.get("ip-address"):
448 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
449 vdu_descriptor["id"], "interfaces",
450 vdu_interface["name"], "ip_address"),
451 icp_params["ip-address"])
452
453 if icp_params.get("mac-address"):
454 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
455 vdu_descriptor["id"], "interfaces",
456 vdu_interface["name"], "mac_address"),
457 icp_params["mac-address"])
458 iface_found = True
459 break
460 if iface_found:
461 break
462 else:
463 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
464 "internal-vld:id-ref={} is not present at vnfd:internal-"
465 "connection-point".format(vnf_params["member-vnf-index"],
466 icp_params["id-ref"]))
467
468 for vld_params in get_iterable(ns_params, "vld"):
469 if "ip-profile" in vld_params:
470 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
471 ip_profile_2_RO(vld_params["ip-profile"]))
472
473 if vld_params.get("provider-network"):
474
475 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
476 vld_params["provider-network"].copy())
477
478 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
479 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
480 wim_account_2_RO(vld_params["wimAccountId"])),
481 if vld_params.get("vim-network-name"):
482 RO_vld_sites = []
483 if isinstance(vld_params["vim-network-name"], dict):
484 for vim_account, vim_net in vld_params["vim-network-name"].items():
485 RO_vld_sites.append({
486 "netmap-use": vim_net,
487 "datacenter": vim_account_2_RO(vim_account)
488 })
489 else: # isinstance str
490 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
491 if RO_vld_sites:
492 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
493
494 if vld_params.get("vim-network-id"):
495 RO_vld_sites = []
496 if isinstance(vld_params["vim-network-id"], dict):
497 for vim_account, vim_net in vld_params["vim-network-id"].items():
498 RO_vld_sites.append({
499 "netmap-use": vim_net,
500 "datacenter": vim_account_2_RO(vim_account)
501 })
502 else: # isinstance str
503 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
504 if RO_vld_sites:
505 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
506 if vld_params.get("ns-net"):
507 if isinstance(vld_params["ns-net"], dict):
508 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
509 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
510 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
511 if "vnfd-connection-point-ref" in vld_params:
512 for cp_params in vld_params["vnfd-connection-point-ref"]:
513 # look for interface
514 for constituent_vnfd in nsd["constituent-vnfd"]:
515 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
516 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
517 break
518 else:
519 raise LcmException(
520 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
521 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
522 match_cp = False
523 for vdu_descriptor in vnf_descriptor["vdu"]:
524 for interface_descriptor in vdu_descriptor["interface"]:
525 if interface_descriptor.get("external-connection-point-ref") == \
526 cp_params["vnfd-connection-point-ref"]:
527 match_cp = True
528 break
529 if match_cp:
530 break
531 else:
532 raise LcmException(
533 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
534 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
535 cp_params["member-vnf-index-ref"],
536 cp_params["vnfd-connection-point-ref"],
537 vnf_descriptor["id"]))
538 if cp_params.get("ip-address"):
539 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
540 vdu_descriptor["id"], "interfaces",
541 interface_descriptor["name"], "ip_address"),
542 cp_params["ip-address"])
543 if cp_params.get("mac-address"):
544 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
545 vdu_descriptor["id"], "interfaces",
546 interface_descriptor["name"], "mac_address"),
547 cp_params["mac-address"])
548 return RO_ns_params
549
550 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
551 # make a copy to do not change
552 vdu_create = copy(vdu_create)
553 vdu_delete = copy(vdu_delete)
554
555 vdurs = db_vnfr.get("vdur")
556 if vdurs is None:
557 vdurs = []
558 vdu_index = len(vdurs)
559 while vdu_index:
560 vdu_index -= 1
561 vdur = vdurs[vdu_index]
562 if vdur.get("pdu-type"):
563 continue
564 vdu_id_ref = vdur["vdu-id-ref"]
565 if vdu_create and vdu_create.get(vdu_id_ref):
566 for index in range(0, vdu_create[vdu_id_ref]):
567 vdur = deepcopy(vdur)
568 vdur["_id"] = str(uuid4())
569 vdur["count-index"] += 1
570 vdurs.insert(vdu_index+1+index, vdur)
571 del vdu_create[vdu_id_ref]
572 if vdu_delete and vdu_delete.get(vdu_id_ref):
573 del vdurs[vdu_index]
574 vdu_delete[vdu_id_ref] -= 1
575 if not vdu_delete[vdu_id_ref]:
576 del vdu_delete[vdu_id_ref]
577 # check all operations are done
578 if vdu_create or vdu_delete:
579 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
580 vdu_create))
581 if vdu_delete:
582 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
583 vdu_delete))
584
585 vnfr_update = {"vdur": vdurs}
586 db_vnfr["vdur"] = vdurs
587 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
588
589 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
590 """
591 Updates database nsr with the RO info for the created vld
592 :param ns_update_nsr: dictionary to be filled with the updated info
593 :param db_nsr: content of db_nsr. This is also modified
594 :param nsr_desc_RO: nsr descriptor from RO
595 :return: Nothing, LcmException is raised on errors
596 """
597
598 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
599 for net_RO in get_iterable(nsr_desc_RO, "nets"):
600 if vld["id"] != net_RO.get("ns_net_osm_id"):
601 continue
602 vld["vim-id"] = net_RO.get("vim_net_id")
603 vld["name"] = net_RO.get("vim_name")
604 vld["status"] = net_RO.get("status")
605 vld["status-detailed"] = net_RO.get("error_msg")
606 ns_update_nsr["vld.{}".format(vld_index)] = vld
607 break
608 else:
609 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
610
611 def set_vnfr_at_error(self, db_vnfrs, error_text):
612 try:
613 for db_vnfr in db_vnfrs.values():
614 vnfr_update = {"status": "ERROR"}
615 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
616 if "status" not in vdur:
617 vdur["status"] = "ERROR"
618 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
619 if error_text:
620 vdur["status-detailed"] = str(error_text)
621 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
622 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
623 except DbException as e:
624 self.logger.error("Cannot update vnf. {}".format(e))
625
626 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
627 """
628 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
629 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
632 """
633 for vnf_index, db_vnfr in db_vnfrs.items():
634 for vnf_RO in nsr_desc_RO["vnfs"]:
635 if vnf_RO["member_vnf_index"] != vnf_index:
636 continue
637 vnfr_update = {}
638 if vnf_RO.get("ip_address"):
639 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
640 elif not db_vnfr.get("ip-address"):
641 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
642 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
643
644 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
645 vdur_RO_count_index = 0
646 if vdur.get("pdu-type"):
647 continue
648 for vdur_RO in get_iterable(vnf_RO, "vms"):
649 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
650 continue
651 if vdur["count-index"] != vdur_RO_count_index:
652 vdur_RO_count_index += 1
653 continue
654 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
655 if vdur_RO.get("ip_address"):
656 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
657 else:
658 vdur["ip-address"] = None
659 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
660 vdur["name"] = vdur_RO.get("vim_name")
661 vdur["status"] = vdur_RO.get("status")
662 vdur["status-detailed"] = vdur_RO.get("error_msg")
663 for ifacer in get_iterable(vdur, "interfaces"):
664 for interface_RO in get_iterable(vdur_RO, "interfaces"):
665 if ifacer["name"] == interface_RO.get("internal_name"):
666 ifacer["ip-address"] = interface_RO.get("ip_address")
667 ifacer["mac-address"] = interface_RO.get("mac_address")
668 break
669 else:
670 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
671 "from VIM info"
672 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
673 vnfr_update["vdur.{}".format(vdu_index)] = vdur
674 break
675 else:
676 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
677 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
678
679 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
680 for net_RO in get_iterable(nsr_desc_RO, "nets"):
681 if vld["id"] != net_RO.get("vnf_net_osm_id"):
682 continue
683 vld["vim-id"] = net_RO.get("vim_net_id")
684 vld["name"] = net_RO.get("vim_name")
685 vld["status"] = net_RO.get("status")
686 vld["status-detailed"] = net_RO.get("error_msg")
687 vnfr_update["vld.{}".format(vld_index)] = vld
688 break
689 else:
690 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
691 vnf_index, vld["id"]))
692
693 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
694 break
695
696 else:
697 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
698
699 def _get_ns_config_info(self, nsr_id):
700 """
701 Generates a mapping between vnf,vdu elements and the N2VC id
702 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
703 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
704 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
705 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
706 """
707 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
708 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
709 mapping = {}
710 ns_config_info = {"osm-config-mapping": mapping}
711 for vca in vca_deployed_list:
712 if not vca["member-vnf-index"]:
713 continue
714 if not vca["vdu_id"]:
715 mapping[vca["member-vnf-index"]] = vca["application"]
716 else:
717 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
718 vca["application"]
719 return ns_config_info
720
721 @staticmethod
722 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
723 """
724 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
725 primitives as verify-ssh-credentials, or config when needed
726 :param desc_primitive_list: information of the descriptor
727 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
728 this element contains a ssh public key
729 :return: The modified list. Can ba an empty list, but always a list
730 """
731 if desc_primitive_list:
732 primitive_list = desc_primitive_list.copy()
733 else:
734 primitive_list = []
735 # look for primitive config, and get the position. None if not present
736 config_position = None
737 for index, primitive in enumerate(primitive_list):
738 if primitive["name"] == "config":
739 config_position = index
740 break
741
742 # for NS, add always a config primitive if not present (bug 874)
743 if not vca_deployed["member-vnf-index"] and config_position is None:
744 primitive_list.insert(0, {"name": "config", "parameter": []})
745 config_position = 0
746 # for VNF/VDU add verify-ssh-credentials after config
747 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
748 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
749 return primitive_list
750
751 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
752 n2vc_key_list, stage):
753 try:
754 db_nsr_update = {}
755 RO_descriptor_number = 0 # number of descriptors created at RO
756 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
757 nslcmop_id = db_nslcmop["_id"]
758 start_deploy = time()
759 ns_params = db_nslcmop.get("operationParams")
760 if ns_params and ns_params.get("timeout_ns_deploy"):
761 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
762 else:
763 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
764
765 # Check for and optionally request placement optimization. Database will be updated if placement activated
766 stage[2] = "Waiting for Placement."
767 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
768
769 # deploy RO
770
771 # get vnfds, instantiate at RO
772 for c_vnf in nsd.get("constituent-vnfd", ()):
773 member_vnf_index = c_vnf["member-vnf-index"]
774 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
775 vnfd_ref = vnfd["id"]
776
777 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
778 db_nsr_update["detailed-status"] = " ".join(stage)
779 self.update_db_2("nsrs", nsr_id, db_nsr_update)
780 self._write_op_status(nslcmop_id, stage)
781
782 # self.logger.debug(logging_text + stage[2])
783 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
784 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
785 RO_descriptor_number += 1
786
787 # look position at deployed.RO.vnfd if not present it will be appended at the end
788 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
789 if vnf_deployed["member-vnf-index"] == member_vnf_index:
790 break
791 else:
792 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
793 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
794
795 # look if present
796 RO_update = {"member-vnf-index": member_vnf_index}
797 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
798 if vnfd_list:
799 RO_update["id"] = vnfd_list[0]["uuid"]
800 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
801 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
802 else:
803 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
804 get("additionalParamsForVnf"), nsr_id)
805 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
806 RO_update["id"] = desc["uuid"]
807 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
808 vnfd_ref, member_vnf_index, desc["uuid"]))
809 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
810 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
811
812 # create nsd at RO
813 nsd_ref = nsd["id"]
814
815 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
816 db_nsr_update["detailed-status"] = " ".join(stage)
817 self.update_db_2("nsrs", nsr_id, db_nsr_update)
818 self._write_op_status(nslcmop_id, stage)
819
820 # self.logger.debug(logging_text + stage[2])
821 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
822 RO_descriptor_number += 1
823 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
824 if nsd_list:
825 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
826 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
827 nsd_ref, RO_nsd_uuid))
828 else:
829 nsd_RO = deepcopy(nsd)
830 nsd_RO["id"] = RO_osm_nsd_id
831 nsd_RO.pop("_id", None)
832 nsd_RO.pop("_admin", None)
833 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
834 member_vnf_index = c_vnf["member-vnf-index"]
835 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
836 for c_vld in nsd_RO.get("vld", ()):
837 for cp in c_vld.get("vnfd-connection-point-ref", ()):
838 member_vnf_index = cp["member-vnf-index-ref"]
839 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
840
841 desc = await self.RO.create("nsd", descriptor=nsd_RO)
842 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
843 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
844 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
845 self.update_db_2("nsrs", nsr_id, db_nsr_update)
846
847 # Crate ns at RO
848 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
849 db_nsr_update["detailed-status"] = " ".join(stage)
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 self._write_op_status(nslcmop_id, stage)
852
853 # if present use it unless in error status
854 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
855 if RO_nsr_id:
856 try:
857 stage[2] = "Looking for existing ns at RO"
858 db_nsr_update["detailed-status"] = " ".join(stage)
859 self.update_db_2("nsrs", nsr_id, db_nsr_update)
860 self._write_op_status(nslcmop_id, stage)
861 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
862 desc = await self.RO.show("ns", RO_nsr_id)
863
864 except ROclient.ROClientException as e:
865 if e.http_code != HTTPStatus.NOT_FOUND:
866 raise
867 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
868 if RO_nsr_id:
869 ns_status, ns_status_info = self.RO.check_ns_status(desc)
870 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
871 if ns_status == "ERROR":
872 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
873 self.logger.debug(logging_text + stage[2])
874 await self.RO.delete("ns", RO_nsr_id)
875 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
876 if not RO_nsr_id:
877 stage[2] = "Checking dependencies"
878 db_nsr_update["detailed-status"] = " ".join(stage)
879 self.update_db_2("nsrs", nsr_id, db_nsr_update)
880 self._write_op_status(nslcmop_id, stage)
881 # self.logger.debug(logging_text + stage[2])
882
883 # check if VIM is creating and wait look if previous tasks in process
884 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
885 if task_dependency:
886 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
887 self.logger.debug(logging_text + stage[2])
888 await asyncio.wait(task_dependency, timeout=3600)
889 if ns_params.get("vnf"):
890 for vnf in ns_params["vnf"]:
891 if "vimAccountId" in vnf:
892 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
893 vnf["vimAccountId"])
894 if task_dependency:
895 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
896 self.logger.debug(logging_text + stage[2])
897 await asyncio.wait(task_dependency, timeout=3600)
898
899 stage[2] = "Checking instantiation parameters."
900 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
901 stage[2] = "Deploying ns at VIM."
902 db_nsr_update["detailed-status"] = " ".join(stage)
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 self._write_op_status(nslcmop_id, stage)
905
906 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
907 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
908 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
909 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
910 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
911
912 # wait until NS is ready
913 stage[2] = "Waiting VIM to deploy ns."
914 db_nsr_update["detailed-status"] = " ".join(stage)
915 self.update_db_2("nsrs", nsr_id, db_nsr_update)
916 self._write_op_status(nslcmop_id, stage)
917 detailed_status_old = None
918 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
919
920 old_desc = None
921 while time() <= start_deploy + timeout_ns_deploy:
922 desc = await self.RO.show("ns", RO_nsr_id)
923
924 # deploymentStatus
925 if desc != old_desc:
926 # desc has changed => update db
927 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
928 old_desc = desc
929
930 ns_status, ns_status_info = self.RO.check_ns_status(desc)
931 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
932 if ns_status == "ERROR":
933 raise ROclient.ROClientException(ns_status_info)
934 elif ns_status == "BUILD":
935 stage[2] = "VIM: ({})".format(ns_status_info)
936 elif ns_status == "ACTIVE":
937 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
938 try:
939 self.ns_update_vnfr(db_vnfrs, desc)
940 break
941 except LcmExceptionNoMgmtIP:
942 pass
943 else:
944 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
945 if stage[2] != detailed_status_old:
946 detailed_status_old = stage[2]
947 db_nsr_update["detailed-status"] = " ".join(stage)
948 self.update_db_2("nsrs", nsr_id, db_nsr_update)
949 self._write_op_status(nslcmop_id, stage)
950 await asyncio.sleep(5, loop=self.loop)
951 else: # timeout_ns_deploy
952 raise ROclient.ROClientException("Timeout waiting ns to be ready")
953
954 # Updating NSR
955 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
956
957 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
958 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
959 stage[2] = "Deployed at VIM"
960 db_nsr_update["detailed-status"] = " ".join(stage)
961 self.update_db_2("nsrs", nsr_id, db_nsr_update)
962 self._write_op_status(nslcmop_id, stage)
963 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
964 # self.logger.debug(logging_text + "Deployed at VIM")
965 except (ROclient.ROClientException, LcmException, DbException) as e:
966 stage[2] = "ERROR deployig at VIM"
967 self.set_vnfr_at_error(db_vnfrs, str(e))
968 raise
969
970 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
971 """
972 Wait for ip addres at RO, and optionally, insert public key in virtual machine
973 :param logging_text: prefix use for logging
974 :param nsr_id:
975 :param vnfr_id:
976 :param vdu_id:
977 :param vdu_index:
978 :param pub_key: public ssh key to inject, None to skip
979 :param user: user to apply the public ssh key
980 :return: IP address
981 """
982
983 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
984 ro_nsr_id = None
985 ip_address = None
986 nb_tries = 0
987 target_vdu_id = None
988 ro_retries = 0
989
990 while True:
991
992 ro_retries += 1
993 if ro_retries >= 360: # 1 hour
994 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
995
996 await asyncio.sleep(10, loop=self.loop)
997
998 # get ip address
999 if not target_vdu_id:
1000 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1001
1002 if not vdu_id: # for the VNF case
1003 if db_vnfr.get("status") == "ERROR":
1004 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1005 ip_address = db_vnfr.get("ip-address")
1006 if not ip_address:
1007 continue
1008 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1009 else: # VDU case
1010 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1011 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1012
1013 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1014 vdur = db_vnfr["vdur"][0]
1015 if not vdur:
1016 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1017 vdu_index))
1018
1019 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1020 ip_address = vdur.get("ip-address")
1021 if not ip_address:
1022 continue
1023 target_vdu_id = vdur["vdu-id-ref"]
1024 elif vdur.get("status") == "ERROR":
1025 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1026
1027 if not target_vdu_id:
1028 continue
1029
1030 # inject public key into machine
1031 if pub_key and user:
1032 # wait until NS is deployed at RO
1033 if not ro_nsr_id:
1034 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1035 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1036 if not ro_nsr_id:
1037 continue
1038
1039 # self.logger.debug(logging_text + "Inserting RO key")
1040 if vdur.get("pdu-type"):
1041 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1042 return ip_address
1043 try:
1044 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1045 result_dict = await self.RO.create_action(
1046 item="ns",
1047 item_id_name=ro_nsr_id,
1048 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1049 )
1050 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1051 if not result_dict or not isinstance(result_dict, dict):
1052 raise LcmException("Unknown response from RO when injecting key")
1053 for result in result_dict.values():
1054 if result.get("vim_result") == 200:
1055 break
1056 else:
1057 raise ROclient.ROClientException("error injecting key: {}".format(
1058 result.get("description")))
1059 break
1060 except ROclient.ROClientException as e:
1061 if not nb_tries:
1062 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1063 format(e, 20*10))
1064 nb_tries += 1
1065 if nb_tries >= 20:
1066 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1067 else:
1068 break
1069
1070 return ip_address
1071
1072 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1073 """
1074 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1075 """
1076 my_vca = vca_deployed_list[vca_index]
1077 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1078 # vdu or kdu: no dependencies
1079 return
1080 timeout = 300
1081 while timeout >= 0:
1082 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1083 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1084 configuration_status_list = db_nsr["configurationStatus"]
1085 for index, vca_deployed in enumerate(configuration_status_list):
1086 if index == vca_index:
1087 # myself
1088 continue
1089 if not my_vca.get("member-vnf-index") or \
1090 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1091 internal_status = configuration_status_list[index].get("status")
1092 if internal_status == 'READY':
1093 continue
1094 elif internal_status == 'BROKEN':
1095 raise LcmException("Configuration aborted because dependent charm/s has failed")
1096 else:
1097 break
1098 else:
1099 # no dependencies, return
1100 return
1101 await asyncio.sleep(10)
1102 timeout -= 1
1103
1104 raise LcmException("Configuration aborted because dependent charm/s timeout")
1105
1106 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1107 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1108 nsr_id = db_nsr["_id"]
1109 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1110 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1111 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1112 db_dict = {
1113 'collection': 'nsrs',
1114 'filter': {'_id': nsr_id},
1115 'path': db_update_entry
1116 }
1117 step = ""
1118 try:
1119
1120 element_type = 'NS'
1121 element_under_configuration = nsr_id
1122
1123 vnfr_id = None
1124 if db_vnfr:
1125 vnfr_id = db_vnfr["_id"]
1126
1127 namespace = "{nsi}.{ns}".format(
1128 nsi=nsi_id if nsi_id else "",
1129 ns=nsr_id)
1130
1131 if vnfr_id:
1132 element_type = 'VNF'
1133 element_under_configuration = vnfr_id
1134 namespace += ".{}".format(vnfr_id)
1135 if vdu_id:
1136 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1137 element_type = 'VDU'
1138 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1139
1140 # Get artifact path
1141 self.fs.sync() # Sync from FSMongo
1142 artifact_path = "{}/{}/charms/{}".format(
1143 base_folder["folder"],
1144 base_folder["pkg-dir"],
1145 config_descriptor["juju"]["charm"]
1146 )
1147
1148 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1149 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1150 is_proxy_charm = False
1151
1152 # n2vc_redesign STEP 3.1
1153
1154 # find old ee_id if exists
1155 ee_id = vca_deployed.get("ee_id")
1156
1157 # create or register execution environment in VCA
1158 if is_proxy_charm:
1159
1160 self._write_configuration_status(
1161 nsr_id=nsr_id,
1162 vca_index=vca_index,
1163 status='CREATING',
1164 element_under_configuration=element_under_configuration,
1165 element_type=element_type
1166 )
1167
1168 step = "create execution environment"
1169 self.logger.debug(logging_text + step)
1170 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1171 reuse_ee_id=ee_id,
1172 db_dict=db_dict)
1173
1174 else:
1175 step = "Waiting to VM being up and getting IP address"
1176 self.logger.debug(logging_text + step)
1177 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1178 user=None, pub_key=None)
1179 credentials = {"hostname": rw_mgmt_ip}
1180 # get username
1181 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1182 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1183 # merged. Meanwhile let's get username from initial-config-primitive
1184 if not username and config_descriptor.get("initial-config-primitive"):
1185 for config_primitive in config_descriptor["initial-config-primitive"]:
1186 for param in config_primitive.get("parameter", ()):
1187 if param["name"] == "ssh-username":
1188 username = param["value"]
1189 break
1190 if not username:
1191 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1192 "'config-access.ssh-access.default-user'")
1193 credentials["username"] = username
1194 # n2vc_redesign STEP 3.2
1195
1196 self._write_configuration_status(
1197 nsr_id=nsr_id,
1198 vca_index=vca_index,
1199 status='REGISTERING',
1200 element_under_configuration=element_under_configuration,
1201 element_type=element_type
1202 )
1203
1204 step = "register execution environment {}".format(credentials)
1205 self.logger.debug(logging_text + step)
1206 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1207 db_dict=db_dict)
1208
1209 # for compatibility with MON/POL modules, the need model and application name at database
1210 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1211 ee_id_parts = ee_id.split('.')
1212 model_name = ee_id_parts[0]
1213 application_name = ee_id_parts[1]
1214 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1215 db_update_entry + "application": application_name,
1216 db_update_entry + "ee_id": ee_id})
1217
1218 # n2vc_redesign STEP 3.3
1219
1220 step = "Install configuration Software"
1221
1222 self._write_configuration_status(
1223 nsr_id=nsr_id,
1224 vca_index=vca_index,
1225 status='INSTALLING SW',
1226 element_under_configuration=element_under_configuration,
1227 element_type=element_type
1228 )
1229
1230 # TODO check if already done
1231 self.logger.debug(logging_text + step)
1232 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1233
1234 # write in db flag of configuration_sw already installed
1235 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1236
1237 # add relations for this VCA (wait for other peers related with this VCA)
1238 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1239
1240 # if SSH access is required, then get execution environment SSH public
1241 if is_proxy_charm: # if native charm we have waited already to VM be UP
1242 pub_key = None
1243 user = None
1244 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1245 # Needed to inject a ssh key
1246 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1247 step = "Install configuration Software, getting public ssh key"
1248 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1249
1250 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1251 else:
1252 step = "Waiting to VM being up and getting IP address"
1253 self.logger.debug(logging_text + step)
1254
1255 # n2vc_redesign STEP 5.1
1256 # wait for RO (ip-address) Insert pub_key into VM
1257 if vnfr_id:
1258 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1259 user=user, pub_key=pub_key)
1260 else:
1261 rw_mgmt_ip = None # This is for a NS configuration
1262
1263 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1264
1265 # store rw_mgmt_ip in deploy params for later replacement
1266 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1267
1268 # n2vc_redesign STEP 6 Execute initial config primitive
1269 step = 'execute initial config primitive'
1270 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1271
1272 # sort initial config primitives by 'seq'
1273 if initial_config_primitive_list:
1274 try:
1275 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1276 except Exception as e:
1277 self.logger.error(logging_text + step + ": " + str(e))
1278 else:
1279 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1280
1281 # add config if not present for NS charm
1282 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1283 vca_deployed)
1284
1285 # wait for dependent primitives execution (NS -> VNF -> VDU)
1286 if initial_config_primitive_list:
1287 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1288
1289 # stage, in function of element type: vdu, kdu, vnf or ns
1290 my_vca = vca_deployed_list[vca_index]
1291 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1292 # VDU or KDU
1293 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1294 elif my_vca.get("member-vnf-index"):
1295 # VNF
1296 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1297 else:
1298 # NS
1299 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1300
1301 self._write_configuration_status(
1302 nsr_id=nsr_id,
1303 vca_index=vca_index,
1304 status='EXECUTING PRIMITIVE'
1305 )
1306
1307 self._write_op_status(
1308 op_id=nslcmop_id,
1309 stage=stage
1310 )
1311
1312 check_if_terminated_needed = True
1313 for initial_config_primitive in initial_config_primitive_list:
1314 # adding information on the vca_deployed if it is a NS execution environment
1315 if not vca_deployed["member-vnf-index"]:
1316 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1317 # TODO check if already done
1318 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1319
1320 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1321 self.logger.debug(logging_text + step)
1322 await self.n2vc.exec_primitive(
1323 ee_id=ee_id,
1324 primitive_name=initial_config_primitive["name"],
1325 params_dict=primitive_params_,
1326 db_dict=db_dict
1327 )
1328 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1329 if check_if_terminated_needed:
1330 if config_descriptor.get('terminate-config-primitive'):
1331 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1332 check_if_terminated_needed = False
1333
1334 # TODO register in database that primitive is done
1335
1336 step = "instantiated at VCA"
1337 self.logger.debug(logging_text + step)
1338
1339 self._write_configuration_status(
1340 nsr_id=nsr_id,
1341 vca_index=vca_index,
1342 status='READY'
1343 )
1344
1345 except Exception as e: # TODO not use Exception but N2VC exception
1346 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1347 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1348 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1349 self._write_configuration_status(
1350 nsr_id=nsr_id,
1351 vca_index=vca_index,
1352 status='BROKEN'
1353 )
1354 raise LcmException("{} {}".format(step, e)) from e
1355
1356 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1357 error_description: str = None, error_detail: str = None, other_update: dict = None):
1358 """
1359 Update db_nsr fields.
1360 :param nsr_id:
1361 :param ns_state:
1362 :param current_operation:
1363 :param current_operation_id:
1364 :param error_description:
1365 :param error_detail:
1366 :param other_update: Other required changes at database if provided, will be cleared
1367 :return:
1368 """
1369 try:
1370 db_dict = other_update or {}
1371 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1372 db_dict["_admin.current-operation"] = current_operation_id
1373 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1374 db_dict["currentOperation"] = current_operation
1375 db_dict["currentOperationID"] = current_operation_id
1376 db_dict["errorDescription"] = error_description
1377 db_dict["errorDetail"] = error_detail
1378
1379 if ns_state:
1380 db_dict["nsState"] = ns_state
1381 self.update_db_2("nsrs", nsr_id, db_dict)
1382 except DbException as e:
1383 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1384
1385 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1386 operation_state: str = None, other_update: dict = None):
1387 try:
1388 db_dict = other_update or {}
1389 db_dict['queuePosition'] = queuePosition
1390 if isinstance(stage, list):
1391 db_dict['stage'] = stage[0]
1392 db_dict['detailed-status'] = " ".join(stage)
1393 elif stage is not None:
1394 db_dict['stage'] = str(stage)
1395
1396 if error_message is not None:
1397 db_dict['errorMessage'] = error_message
1398 if operation_state is not None:
1399 db_dict['operationState'] = operation_state
1400 db_dict["statusEnteredTime"] = time()
1401 self.update_db_2("nslcmops", op_id, db_dict)
1402 except DbException as e:
1403 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1404
1405 def _write_all_config_status(self, nsr_id: str, status: str):
1406 try:
1407 # nsrs record
1408 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1409 # configurationStatus
1410 config_status = db_nsr.get('configurationStatus')
1411 if config_status:
1412 # update status
1413 db_dict = dict()
1414 db_dict['configurationStatus'] = list()
1415 for c in config_status:
1416 c['status'] = status
1417 db_dict['configurationStatus'].append(c)
1418 self.update_db_2("nsrs", nsr_id, db_dict)
1419
1420 except DbException as e:
1421 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1422
1423 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1424 element_under_configuration: str = None, element_type: str = None):
1425
1426 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1427 # .format(vca_index, status))
1428
1429 try:
1430 db_path = 'configurationStatus.{}.'.format(vca_index)
1431 db_dict = dict()
1432 if status:
1433 db_dict[db_path + 'status'] = status
1434 if element_under_configuration:
1435 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1436 if element_type:
1437 db_dict[db_path + 'elementType'] = element_type
1438 self.update_db_2("nsrs", nsr_id, db_dict)
1439 except DbException as e:
1440 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1441 .format(status, nsr_id, vca_index, e))
1442
1443 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1444 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1445 if placement_engine == "PLA":
1446 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1447 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1448 db_poll_interval = 5
1449 wait = db_poll_interval * 4
1450 pla_result = None
1451 while not pla_result and wait >= 0:
1452 await asyncio.sleep(db_poll_interval)
1453 wait -= db_poll_interval
1454 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1455 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1456
1457 if not pla_result:
1458 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1459
1460 for pla_vnf in pla_result['vnf']:
1461 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1462 if not pla_vnf.get('vimAccountId') or not vnfr:
1463 continue
1464 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1465 return
1466
1467 def update_nsrs_with_pla_result(self, params):
1468 try:
1469 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1470 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1471 except Exception as e:
1472 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1473
1474 async def instantiate(self, nsr_id, nslcmop_id):
1475 """
1476
1477 :param nsr_id: ns instance to deploy
1478 :param nslcmop_id: operation to run
1479 :return:
1480 """
1481
1482 # Try to lock HA task here
1483 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1484 if not task_is_locked_by_me:
1485 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1486 return
1487
1488 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1489 self.logger.debug(logging_text + "Enter")
1490
1491 # get all needed from database
1492
1493 # database nsrs record
1494 db_nsr = None
1495
1496 # database nslcmops record
1497 db_nslcmop = None
1498
1499 # update operation on nsrs
1500 db_nsr_update = {}
1501 # update operation on nslcmops
1502 db_nslcmop_update = {}
1503
1504 nslcmop_operation_state = None
1505 db_vnfrs = {} # vnf's info indexed by member-index
1506 # n2vc_info = {}
1507 tasks_dict_info = {} # from task to info text
1508 exc = None
1509 error_list = []
1510 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1511 # ^ stage, step, VIM progress
1512 try:
1513 # wait for any previous tasks in process
1514 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1515
1516 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1517 stage[1] = "Reading from database,"
1518 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1519 db_nsr_update["detailed-status"] = "creating"
1520 db_nsr_update["operational-status"] = "init"
1521 self._write_ns_status(
1522 nsr_id=nsr_id,
1523 ns_state="BUILDING",
1524 current_operation="INSTANTIATING",
1525 current_operation_id=nslcmop_id,
1526 other_update=db_nsr_update
1527 )
1528 self._write_op_status(
1529 op_id=nslcmop_id,
1530 stage=stage,
1531 queuePosition=0
1532 )
1533
1534 # read from db: operation
1535 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1536 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1537 ns_params = db_nslcmop.get("operationParams")
1538 if ns_params and ns_params.get("timeout_ns_deploy"):
1539 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1540 else:
1541 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1542
1543 # read from db: ns
1544 stage[1] = "Getting nsr={} from db".format(nsr_id)
1545 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1546 # nsd is replicated into ns (no db read)
1547 nsd = db_nsr["nsd"]
1548 # nsr_name = db_nsr["name"] # TODO short-name??
1549
1550 # read from db: vnf's of this ns
1551 stage[1] = "Getting vnfrs from db"
1552 self.logger.debug(logging_text + stage[1])
1553 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1554
1555 # read from db: vnfd's for every vnf
1556 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1557 db_vnfds = {} # every vnfd data indexed by vnf id
1558 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1559
1560 # for each vnf in ns, read vnfd
1561 for vnfr in db_vnfrs_list:
1562 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1563 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1564 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1565 # if we haven't this vnfd, read it from db
1566 if vnfd_id not in db_vnfds:
1567 # read from db
1568 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1569 self.logger.debug(logging_text + stage[1])
1570 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1571
1572 # store vnfd
1573 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1574 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1575 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1576
1577 # Get or generates the _admin.deployed.VCA list
1578 vca_deployed_list = None
1579 if db_nsr["_admin"].get("deployed"):
1580 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1581 if vca_deployed_list is None:
1582 vca_deployed_list = []
1583 configuration_status_list = []
1584 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1585 db_nsr_update["configurationStatus"] = configuration_status_list
1586 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1587 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1588 elif isinstance(vca_deployed_list, dict):
1589 # maintain backward compatibility. Change a dict to list at database
1590 vca_deployed_list = list(vca_deployed_list.values())
1591 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1592 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1593
1594 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1595 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1596 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1597
1598 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1599 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1600 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1601
1602 # n2vc_redesign STEP 2 Deploy Network Scenario
1603 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1604 self._write_op_status(
1605 op_id=nslcmop_id,
1606 stage=stage
1607 )
1608
1609 stage[1] = "Deploying KDUs,"
1610 # self.logger.debug(logging_text + "Before deploy_kdus")
1611 # Call to deploy_kdus in case exists the "vdu:kdu" param
1612 await self.deploy_kdus(
1613 logging_text=logging_text,
1614 nsr_id=nsr_id,
1615 nslcmop_id=nslcmop_id,
1616 db_vnfrs=db_vnfrs,
1617 db_vnfds=db_vnfds,
1618 task_instantiation_info=tasks_dict_info,
1619 )
1620
1621 stage[1] = "Getting VCA public key."
1622 # n2vc_redesign STEP 1 Get VCA public ssh-key
1623 # feature 1429. Add n2vc public key to needed VMs
1624 n2vc_key = self.n2vc.get_public_key()
1625 n2vc_key_list = [n2vc_key]
1626 if self.vca_config.get("public_key"):
1627 n2vc_key_list.append(self.vca_config["public_key"])
1628
1629 stage[1] = "Deploying NS at VIM."
1630 task_ro = asyncio.ensure_future(
1631 self.instantiate_RO(
1632 logging_text=logging_text,
1633 nsr_id=nsr_id,
1634 nsd=nsd,
1635 db_nsr=db_nsr,
1636 db_nslcmop=db_nslcmop,
1637 db_vnfrs=db_vnfrs,
1638 db_vnfds_ref=db_vnfds_ref,
1639 n2vc_key_list=n2vc_key_list,
1640 stage=stage
1641 )
1642 )
1643 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1644 tasks_dict_info[task_ro] = "Deploying at VIM"
1645
1646 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1647 stage[1] = "Deploying Execution Environments."
1648 self.logger.debug(logging_text + stage[1])
1649
1650 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1651 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1652 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1653 vnfd_id = c_vnf["vnfd-id-ref"]
1654 vnfd = db_vnfds_ref[vnfd_id]
1655 member_vnf_index = str(c_vnf["member-vnf-index"])
1656 db_vnfr = db_vnfrs[member_vnf_index]
1657 base_folder = vnfd["_admin"]["storage"]
1658 vdu_id = None
1659 vdu_index = 0
1660 vdu_name = None
1661 kdu_name = None
1662
1663 # Get additional parameters
1664 deploy_params = {}
1665 if db_vnfr.get("additionalParamsForVnf"):
1666 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1667
1668 descriptor_config = vnfd.get("vnf-configuration")
1669 if descriptor_config and descriptor_config.get("juju"):
1670 self._deploy_n2vc(
1671 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1672 db_nsr=db_nsr,
1673 db_vnfr=db_vnfr,
1674 nslcmop_id=nslcmop_id,
1675 nsr_id=nsr_id,
1676 nsi_id=nsi_id,
1677 vnfd_id=vnfd_id,
1678 vdu_id=vdu_id,
1679 kdu_name=kdu_name,
1680 member_vnf_index=member_vnf_index,
1681 vdu_index=vdu_index,
1682 vdu_name=vdu_name,
1683 deploy_params=deploy_params,
1684 descriptor_config=descriptor_config,
1685 base_folder=base_folder,
1686 task_instantiation_info=tasks_dict_info,
1687 stage=stage
1688 )
1689
1690 # Deploy charms for each VDU that supports one.
1691 for vdud in get_iterable(vnfd, 'vdu'):
1692 vdu_id = vdud["id"]
1693 descriptor_config = vdud.get('vdu-configuration')
1694 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1695 if vdur.get("additionalParams"):
1696 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1697 else:
1698 deploy_params_vdu = deploy_params
1699 if descriptor_config and descriptor_config.get("juju"):
1700 # look for vdu index in the db_vnfr["vdu"] section
1701 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1702 # if vdur["vdu-id-ref"] == vdu_id:
1703 # break
1704 # else:
1705 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1706 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1707 # vdu_name = vdur.get("name")
1708 vdu_name = None
1709 kdu_name = None
1710 for vdu_index in range(int(vdud.get("count", 1))):
1711 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1712 self._deploy_n2vc(
1713 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1714 member_vnf_index, vdu_id, vdu_index),
1715 db_nsr=db_nsr,
1716 db_vnfr=db_vnfr,
1717 nslcmop_id=nslcmop_id,
1718 nsr_id=nsr_id,
1719 nsi_id=nsi_id,
1720 vnfd_id=vnfd_id,
1721 vdu_id=vdu_id,
1722 kdu_name=kdu_name,
1723 member_vnf_index=member_vnf_index,
1724 vdu_index=vdu_index,
1725 vdu_name=vdu_name,
1726 deploy_params=deploy_params_vdu,
1727 descriptor_config=descriptor_config,
1728 base_folder=base_folder,
1729 task_instantiation_info=tasks_dict_info,
1730 stage=stage
1731 )
1732 for kdud in get_iterable(vnfd, 'kdu'):
1733 kdu_name = kdud["name"]
1734 descriptor_config = kdud.get('kdu-configuration')
1735 if descriptor_config and descriptor_config.get("juju"):
1736 vdu_id = None
1737 vdu_index = 0
1738 vdu_name = None
1739 # look for vdu index in the db_vnfr["vdu"] section
1740 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1741 # if vdur["vdu-id-ref"] == vdu_id:
1742 # break
1743 # else:
1744 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1745 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1746 # vdu_name = vdur.get("name")
1747 # vdu_name = None
1748
1749 self._deploy_n2vc(
1750 logging_text=logging_text,
1751 db_nsr=db_nsr,
1752 db_vnfr=db_vnfr,
1753 nslcmop_id=nslcmop_id,
1754 nsr_id=nsr_id,
1755 nsi_id=nsi_id,
1756 vnfd_id=vnfd_id,
1757 vdu_id=vdu_id,
1758 kdu_name=kdu_name,
1759 member_vnf_index=member_vnf_index,
1760 vdu_index=vdu_index,
1761 vdu_name=vdu_name,
1762 deploy_params=deploy_params,
1763 descriptor_config=descriptor_config,
1764 base_folder=base_folder,
1765 task_instantiation_info=tasks_dict_info,
1766 stage=stage
1767 )
1768
1769 # Check if this NS has a charm configuration
1770 descriptor_config = nsd.get("ns-configuration")
1771 if descriptor_config and descriptor_config.get("juju"):
1772 vnfd_id = None
1773 db_vnfr = None
1774 member_vnf_index = None
1775 vdu_id = None
1776 kdu_name = None
1777 vdu_index = 0
1778 vdu_name = None
1779
1780 # Get additional parameters
1781 deploy_params = {}
1782 if db_nsr.get("additionalParamsForNs"):
1783 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1784 base_folder = nsd["_admin"]["storage"]
1785 self._deploy_n2vc(
1786 logging_text=logging_text,
1787 db_nsr=db_nsr,
1788 db_vnfr=db_vnfr,
1789 nslcmop_id=nslcmop_id,
1790 nsr_id=nsr_id,
1791 nsi_id=nsi_id,
1792 vnfd_id=vnfd_id,
1793 vdu_id=vdu_id,
1794 kdu_name=kdu_name,
1795 member_vnf_index=member_vnf_index,
1796 vdu_index=vdu_index,
1797 vdu_name=vdu_name,
1798 deploy_params=deploy_params,
1799 descriptor_config=descriptor_config,
1800 base_folder=base_folder,
1801 task_instantiation_info=tasks_dict_info,
1802 stage=stage
1803 )
1804
1805 # rest of staff will be done at finally
1806
1807 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1808 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1809 exc = e
1810 except asyncio.CancelledError:
1811 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1812 exc = "Operation was cancelled"
1813 except Exception as e:
1814 exc = traceback.format_exc()
1815 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1816 finally:
1817 if exc:
1818 error_list.append(str(exc))
1819 try:
1820 # wait for pending tasks
1821 if tasks_dict_info:
1822 stage[1] = "Waiting for instantiate pending tasks."
1823 self.logger.debug(logging_text + stage[1])
1824 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1825 stage, nslcmop_id, nsr_id=nsr_id)
1826 stage[1] = stage[2] = ""
1827 except asyncio.CancelledError:
1828 error_list.append("Cancelled")
1829 # TODO cancel all tasks
1830 except Exception as exc:
1831 error_list.append(str(exc))
1832
1833 # update operation-status
1834 db_nsr_update["operational-status"] = "running"
1835 # let's begin with VCA 'configured' status (later we can change it)
1836 db_nsr_update["config-status"] = "configured"
1837 for task, task_name in tasks_dict_info.items():
1838 if not task.done() or task.cancelled() or task.exception():
1839 if task_name.startswith(self.task_name_deploy_vca):
1840 # A N2VC task is pending
1841 db_nsr_update["config-status"] = "failed"
1842 else:
1843 # RO or KDU task is pending
1844 db_nsr_update["operational-status"] = "failed"
1845
1846 # update status at database
1847 if error_list:
1848 error_detail = ". ".join(error_list)
1849 self.logger.error(logging_text + error_detail)
1850 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1851 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1852
1853 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1854 db_nslcmop_update["detailed-status"] = error_detail
1855 nslcmop_operation_state = "FAILED"
1856 ns_state = "BROKEN"
1857 else:
1858 error_detail = None
1859 error_description_nsr = error_description_nslcmop = None
1860 ns_state = "READY"
1861 db_nsr_update["detailed-status"] = "Done"
1862 db_nslcmop_update["detailed-status"] = "Done"
1863 nslcmop_operation_state = "COMPLETED"
1864
1865 if db_nsr:
1866 self._write_ns_status(
1867 nsr_id=nsr_id,
1868 ns_state=ns_state,
1869 current_operation="IDLE",
1870 current_operation_id=None,
1871 error_description=error_description_nsr,
1872 error_detail=error_detail,
1873 other_update=db_nsr_update
1874 )
1875 if db_nslcmop:
1876 self._write_op_status(
1877 op_id=nslcmop_id,
1878 stage="",
1879 error_message=error_description_nslcmop,
1880 operation_state=nslcmop_operation_state,
1881 other_update=db_nslcmop_update,
1882 )
1883
1884 if nslcmop_operation_state:
1885 try:
1886 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1887 "operationState": nslcmop_operation_state},
1888 loop=self.loop)
1889 except Exception as e:
1890 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1891
1892 self.logger.debug(logging_text + "Exit")
1893 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1894
1895 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1896
1897 # steps:
1898 # 1. find all relations for this VCA
1899 # 2. wait for other peers related
1900 # 3. add relations
1901
1902 try:
1903
1904 # STEP 1: find all relations for this VCA
1905
1906 # read nsr record
1907 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1908
1909 # this VCA data
1910 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1911
1912 # read all ns-configuration relations
1913 ns_relations = list()
1914 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1915 if db_ns_relations:
1916 for r in db_ns_relations:
1917 # check if this VCA is in the relation
1918 if my_vca.get('member-vnf-index') in\
1919 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1920 ns_relations.append(r)
1921
1922 # read all vnf-configuration relations
1923 vnf_relations = list()
1924 db_vnfd_list = db_nsr.get('vnfd-id')
1925 if db_vnfd_list:
1926 for vnfd in db_vnfd_list:
1927 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1928 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1929 if db_vnf_relations:
1930 for r in db_vnf_relations:
1931 # check if this VCA is in the relation
1932 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1933 vnf_relations.append(r)
1934
1935 # if no relations, terminate
1936 if not ns_relations and not vnf_relations:
1937 self.logger.debug(logging_text + ' No relations')
1938 return True
1939
1940 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1941
1942 # add all relations
1943 start = time()
1944 while True:
1945 # check timeout
1946 now = time()
1947 if now - start >= timeout:
1948 self.logger.error(logging_text + ' : timeout adding relations')
1949 return False
1950
1951 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1952 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1953
1954 # for each defined NS relation, find the VCA's related
1955 for r in ns_relations:
1956 from_vca_ee_id = None
1957 to_vca_ee_id = None
1958 from_vca_endpoint = None
1959 to_vca_endpoint = None
1960 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1961 for vca in vca_list:
1962 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1963 and vca.get('config_sw_installed'):
1964 from_vca_ee_id = vca.get('ee_id')
1965 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1966 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1967 and vca.get('config_sw_installed'):
1968 to_vca_ee_id = vca.get('ee_id')
1969 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1970 if from_vca_ee_id and to_vca_ee_id:
1971 # add relation
1972 await self.n2vc.add_relation(
1973 ee_id_1=from_vca_ee_id,
1974 ee_id_2=to_vca_ee_id,
1975 endpoint_1=from_vca_endpoint,
1976 endpoint_2=to_vca_endpoint)
1977 # remove entry from relations list
1978 ns_relations.remove(r)
1979 else:
1980 # check failed peers
1981 try:
1982 vca_status_list = db_nsr.get('configurationStatus')
1983 if vca_status_list:
1984 for i in range(len(vca_list)):
1985 vca = vca_list[i]
1986 vca_status = vca_status_list[i]
1987 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1988 if vca_status.get('status') == 'BROKEN':
1989 # peer broken: remove relation from list
1990 ns_relations.remove(r)
1991 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1992 if vca_status.get('status') == 'BROKEN':
1993 # peer broken: remove relation from list
1994 ns_relations.remove(r)
1995 except Exception:
1996 # ignore
1997 pass
1998
1999 # for each defined VNF relation, find the VCA's related
2000 for r in vnf_relations:
2001 from_vca_ee_id = None
2002 to_vca_ee_id = None
2003 from_vca_endpoint = None
2004 to_vca_endpoint = None
2005 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2006 for vca in vca_list:
2007 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2008 from_vca_ee_id = vca.get('ee_id')
2009 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2010 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2011 to_vca_ee_id = vca.get('ee_id')
2012 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2013 if from_vca_ee_id and to_vca_ee_id:
2014 # add relation
2015 await self.n2vc.add_relation(
2016 ee_id_1=from_vca_ee_id,
2017 ee_id_2=to_vca_ee_id,
2018 endpoint_1=from_vca_endpoint,
2019 endpoint_2=to_vca_endpoint)
2020 # remove entry from relations list
2021 vnf_relations.remove(r)
2022 else:
2023 # check failed peers
2024 try:
2025 vca_status_list = db_nsr.get('configurationStatus')
2026 if vca_status_list:
2027 for i in range(len(vca_list)):
2028 vca = vca_list[i]
2029 vca_status = vca_status_list[i]
2030 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2031 if vca_status.get('status') == 'BROKEN':
2032 # peer broken: remove relation from list
2033 ns_relations.remove(r)
2034 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2035 if vca_status.get('status') == 'BROKEN':
2036 # peer broken: remove relation from list
2037 ns_relations.remove(r)
2038 except Exception:
2039 # ignore
2040 pass
2041
2042 # wait for next try
2043 await asyncio.sleep(5.0)
2044
2045 if not ns_relations and not vnf_relations:
2046 self.logger.debug('Relations added')
2047 break
2048
2049 return True
2050
2051 except Exception as e:
2052 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2053 return False
2054
2055 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2056 # Launch kdus if present in the descriptor
2057
2058 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2059
2060 def _get_cluster_id(cluster_id, cluster_type):
2061 nonlocal k8scluster_id_2_uuic
2062 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2063 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2064
2065 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2066 if not db_k8scluster:
2067 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2068 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2069 if not k8s_id:
2070 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2071 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2072 return k8s_id
2073
2074 logging_text += "Deploy kdus: "
2075 step = ""
2076 try:
2077 db_nsr_update = {"_admin.deployed.K8s": []}
2078 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2079
2080 index = 0
2081 updated_cluster_list = []
2082
2083 for vnfr_data in db_vnfrs.values():
2084 for kdur in get_iterable(vnfr_data, "kdur"):
2085 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2086 vnfd_id = vnfr_data.get('vnfd-id')
2087 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2088 if kdur.get("helm-chart"):
2089 kdumodel = kdur["helm-chart"]
2090 k8sclustertype = "helm-chart"
2091 elif kdur.get("juju-bundle"):
2092 kdumodel = kdur["juju-bundle"]
2093 k8sclustertype = "juju-bundle"
2094 else:
2095 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2096 "juju-bundle. Maybe an old NBI version is running".
2097 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2098 # check if kdumodel is a file and exists
2099 try:
2100 # path format: /vnfdid/pkkdir/kdumodel
2101 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
2102 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2103 kdumodel = self.fs.path + filename
2104 except asyncio.CancelledError:
2105 raise
2106 except Exception: # it is not a file
2107 pass
2108
2109 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2110 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2111 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2112
2113 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2114 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2115 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2116 if del_repo_list or added_repo_dict:
2117 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2118 updated = {'_admin.helm_charts_added.' +
2119 item: name for item, name in added_repo_dict.items()}
2120 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2121 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2122 added_repo_dict))
2123 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2124 updated_cluster_list.append(cluster_uuid)
2125
2126 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2127 kdur["kdu-name"], k8s_cluster_id)
2128
2129 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2130 "k8scluster-type": k8sclustertype,
2131 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2132 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2133 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2134
2135 db_dict = {"collection": "nsrs",
2136 "filter": {"_id": nsr_id},
2137 "path": "_admin.deployed.K8s.{}".format(index)}
2138
2139 task = asyncio.ensure_future(
2140 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2141 atomic=True, params=desc_params,
2142 db_dict=db_dict, timeout=600,
2143 kdu_name=kdur["kdu-name"]))
2144
2145 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2146 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2147
2148 index += 1
2149
2150 except (LcmException, asyncio.CancelledError):
2151 raise
2152 except Exception as e:
2153 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2154 if isinstance(e, (N2VCException, DbException)):
2155 self.logger.error(logging_text + msg)
2156 else:
2157 self.logger.critical(logging_text + msg, exc_info=True)
2158 raise LcmException(msg)
2159 finally:
2160 if db_nsr_update:
2161 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2162
2163 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2164 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2165 base_folder, task_instantiation_info, stage):
2166 # launch instantiate_N2VC in a asyncio task and register task object
2167 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2168 # if not found, create one entry and update database
2169
2170 # fill db_nsr._admin.deployed.VCA.<index>
2171 vca_index = -1
2172 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2173 if not vca_deployed:
2174 continue
2175 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2176 vca_deployed.get("vdu_id") == vdu_id and \
2177 vca_deployed.get("kdu_name") == kdu_name and \
2178 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2179 break
2180 else:
2181 # not found, create one.
2182 vca_deployed = {
2183 "member-vnf-index": member_vnf_index,
2184 "vdu_id": vdu_id,
2185 "kdu_name": kdu_name,
2186 "vdu_count_index": vdu_index,
2187 "operational-status": "init", # TODO revise
2188 "detailed-status": "", # TODO revise
2189 "step": "initial-deploy", # TODO revise
2190 "vnfd_id": vnfd_id,
2191 "vdu_name": vdu_name,
2192 }
2193 vca_index += 1
2194
2195 # create VCA and configurationStatus in db
2196 db_dict = {
2197 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2198 "configurationStatus.{}".format(vca_index): dict()
2199 }
2200 self.update_db_2("nsrs", nsr_id, db_dict)
2201
2202 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2203
2204 # Launch task
2205 task_n2vc = asyncio.ensure_future(
2206 self.instantiate_N2VC(
2207 logging_text=logging_text,
2208 vca_index=vca_index,
2209 nsi_id=nsi_id,
2210 db_nsr=db_nsr,
2211 db_vnfr=db_vnfr,
2212 vdu_id=vdu_id,
2213 kdu_name=kdu_name,
2214 vdu_index=vdu_index,
2215 deploy_params=deploy_params,
2216 config_descriptor=descriptor_config,
2217 base_folder=base_folder,
2218 nslcmop_id=nslcmop_id,
2219 stage=stage
2220 )
2221 )
2222 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2223 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2224 member_vnf_index or "", vdu_id or "")
2225
2226 # Check if this VNFD has a configured terminate action
2227 def _has_terminate_config_primitive(self, vnfd):
2228 vnf_config = vnfd.get("vnf-configuration")
2229 if vnf_config and vnf_config.get("terminate-config-primitive"):
2230 return True
2231 else:
2232 return False
2233
2234 @staticmethod
2235 def _get_terminate_config_primitive_seq_list(vnfd):
2236 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2237 # No need to check for existing primitive twice, already done before
2238 vnf_config = vnfd.get("vnf-configuration")
2239 seq_list = vnf_config.get("terminate-config-primitive")
2240 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2241 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2242 return seq_list_sorted
2243
2244 @staticmethod
2245 def _create_nslcmop(nsr_id, operation, params):
2246 """
2247 Creates a ns-lcm-opp content to be stored at database.
2248 :param nsr_id: internal id of the instance
2249 :param operation: instantiate, terminate, scale, action, ...
2250 :param params: user parameters for the operation
2251 :return: dictionary following SOL005 format
2252 """
2253 # Raise exception if invalid arguments
2254 if not (nsr_id and operation and params):
2255 raise LcmException(
2256 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2257 now = time()
2258 _id = str(uuid4())
2259 nslcmop = {
2260 "id": _id,
2261 "_id": _id,
2262 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2263 "operationState": "PROCESSING",
2264 "statusEnteredTime": now,
2265 "nsInstanceId": nsr_id,
2266 "lcmOperationType": operation,
2267 "startTime": now,
2268 "isAutomaticInvocation": False,
2269 "operationParams": params,
2270 "isCancelPending": False,
2271 "links": {
2272 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2273 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2274 }
2275 }
2276 return nslcmop
2277
2278 def _format_additional_params(self, params):
2279 params = params or {}
2280 for key, value in params.items():
2281 if str(value).startswith("!!yaml "):
2282 params[key] = yaml.safe_load(value[7:])
2283 return params
2284
2285 def _get_terminate_primitive_params(self, seq, vnf_index):
2286 primitive = seq.get('name')
2287 primitive_params = {}
2288 params = {
2289 "member_vnf_index": vnf_index,
2290 "primitive": primitive,
2291 "primitive_params": primitive_params,
2292 }
2293 desc_params = {}
2294 return self._map_primitive_params(seq, params, desc_params)
2295
2296 # sub-operations
2297
2298 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2299 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2300 if (op.get('operationState') == 'COMPLETED'):
2301 # b. Skip sub-operation
2302 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2303 return self.SUBOPERATION_STATUS_SKIP
2304 else:
2305 # c. Reintent executing sub-operation
2306 # The sub-operation exists, and operationState != 'COMPLETED'
2307 # Update operationState = 'PROCESSING' to indicate a reintent.
2308 operationState = 'PROCESSING'
2309 detailed_status = 'In progress'
2310 self._update_suboperation_status(
2311 db_nslcmop, op_index, operationState, detailed_status)
2312 # Return the sub-operation index
2313 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2314 # with arguments extracted from the sub-operation
2315 return op_index
2316
2317 # Find a sub-operation where all keys in a matching dictionary must match
2318 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2319 def _find_suboperation(self, db_nslcmop, match):
2320 if (db_nslcmop and match):
2321 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2322 for i, op in enumerate(op_list):
2323 if all(op.get(k) == match[k] for k in match):
2324 return i
2325 return self.SUBOPERATION_STATUS_NOT_FOUND
2326
2327 # Update status for a sub-operation given its index
2328 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2329 # Update DB for HA tasks
2330 q_filter = {'_id': db_nslcmop['_id']}
2331 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2332 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2333 self.db.set_one("nslcmops",
2334 q_filter=q_filter,
2335 update_dict=update_dict,
2336 fail_on_empty=False)
2337
2338 # Add sub-operation, return the index of the added sub-operation
2339 # Optionally, set operationState, detailed-status, and operationType
2340 # Status and type are currently set for 'scale' sub-operations:
2341 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2342 # 'detailed-status' : status message
2343 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2344 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2345 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2346 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2347 RO_nsr_id=None, RO_scaling_info=None):
2348 if not db_nslcmop:
2349 return self.SUBOPERATION_STATUS_NOT_FOUND
2350 # Get the "_admin.operations" list, if it exists
2351 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2352 op_list = db_nslcmop_admin.get('operations')
2353 # Create or append to the "_admin.operations" list
2354 new_op = {'member_vnf_index': vnf_index,
2355 'vdu_id': vdu_id,
2356 'vdu_count_index': vdu_count_index,
2357 'primitive': primitive,
2358 'primitive_params': mapped_primitive_params}
2359 if operationState:
2360 new_op['operationState'] = operationState
2361 if detailed_status:
2362 new_op['detailed-status'] = detailed_status
2363 if operationType:
2364 new_op['lcmOperationType'] = operationType
2365 if RO_nsr_id:
2366 new_op['RO_nsr_id'] = RO_nsr_id
2367 if RO_scaling_info:
2368 new_op['RO_scaling_info'] = RO_scaling_info
2369 if not op_list:
2370 # No existing operations, create key 'operations' with current operation as first list element
2371 db_nslcmop_admin.update({'operations': [new_op]})
2372 op_list = db_nslcmop_admin.get('operations')
2373 else:
2374 # Existing operations, append operation to list
2375 op_list.append(new_op)
2376
2377 db_nslcmop_update = {'_admin.operations': op_list}
2378 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2379 op_index = len(op_list) - 1
2380 return op_index
2381
2382 # Helper methods for scale() sub-operations
2383
2384 # pre-scale/post-scale:
2385 # Check for 3 different cases:
2386 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2387 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2388 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2389 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2390 operationType, RO_nsr_id=None, RO_scaling_info=None):
2391 # Find this sub-operation
2392 if (RO_nsr_id and RO_scaling_info):
2393 operationType = 'SCALE-RO'
2394 match = {
2395 'member_vnf_index': vnf_index,
2396 'RO_nsr_id': RO_nsr_id,
2397 'RO_scaling_info': RO_scaling_info,
2398 }
2399 else:
2400 match = {
2401 'member_vnf_index': vnf_index,
2402 'primitive': vnf_config_primitive,
2403 'primitive_params': primitive_params,
2404 'lcmOperationType': operationType
2405 }
2406 op_index = self._find_suboperation(db_nslcmop, match)
2407 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2408 # a. New sub-operation
2409 # The sub-operation does not exist, add it.
2410 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2411 # The following parameters are set to None for all kind of scaling:
2412 vdu_id = None
2413 vdu_count_index = None
2414 vdu_name = None
2415 if (RO_nsr_id and RO_scaling_info):
2416 vnf_config_primitive = None
2417 primitive_params = None
2418 else:
2419 RO_nsr_id = None
2420 RO_scaling_info = None
2421 # Initial status for sub-operation
2422 operationState = 'PROCESSING'
2423 detailed_status = 'In progress'
2424 # Add sub-operation for pre/post-scaling (zero or more operations)
2425 self._add_suboperation(db_nslcmop,
2426 vnf_index,
2427 vdu_id,
2428 vdu_count_index,
2429 vdu_name,
2430 vnf_config_primitive,
2431 primitive_params,
2432 operationState,
2433 detailed_status,
2434 operationType,
2435 RO_nsr_id,
2436 RO_scaling_info)
2437 return self.SUBOPERATION_STATUS_NEW
2438 else:
2439 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2440 # or op_index (operationState != 'COMPLETED')
2441 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2442
2443 # Function to return execution_environment id
2444
2445 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2446 # TODO vdu_index_count
2447 for vca in vca_deployed_list:
2448 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2449 return vca["ee_id"]
2450
2451 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2452 """
2453 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2454 :param logging_text:
2455 :param db_nslcmop:
2456 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2457 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2458 :param vca_index: index in the database _admin.deployed.VCA
2459 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2460 :return: None or exception
2461 """
2462 # execute terminate_primitives
2463 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2464 vdu_id = vca_deployed.get("vdu_id")
2465 vdu_count_index = vca_deployed.get("vdu_count_index")
2466 vdu_name = vca_deployed.get("vdu_name")
2467 vnf_index = vca_deployed.get("member-vnf-index")
2468 if terminate_primitives and vca_deployed.get("needed_terminate"):
2469 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2470 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2471 for seq in terminate_primitives:
2472 # For each sequence in list, get primitive and call _ns_execute_primitive()
2473 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2474 vnf_index, seq.get("name"))
2475 self.logger.debug(logging_text + step)
2476 # Create the primitive for each sequence, i.e. "primitive": "touch"
2477 primitive = seq.get('name')
2478 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2479 # The following 3 parameters are currently set to None for 'terminate':
2480 # vdu_id, vdu_count_index, vdu_name
2481
2482 # Add sub-operation
2483 self._add_suboperation(db_nslcmop,
2484 vnf_index,
2485 vdu_id,
2486 vdu_count_index,
2487 vdu_name,
2488 primitive,
2489 mapped_primitive_params)
2490 # Sub-operations: Call _ns_execute_primitive() instead of action()
2491 try:
2492 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2493 mapped_primitive_params)
2494 except LcmException:
2495 # this happens when VCA is not deployed. In this case it is not needed to terminate
2496 continue
2497 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2498 if result not in result_ok:
2499 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2500 "error {}".format(seq.get("name"), vnf_index, result_detail))
2501 # set that this VCA do not need terminated
2502 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2503 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2504
2505 if destroy_ee:
2506 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2507
2508 async def _delete_N2VC(self, nsr_id: str):
2509 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2510 namespace = "." + nsr_id
2511 await self.n2vc.delete_namespace(namespace=namespace)
2512 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2513
2514 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2515 """
2516 Terminates a deployment from RO
2517 :param logging_text:
2518 :param nsr_deployed: db_nsr._admin.deployed
2519 :param nsr_id:
2520 :param nslcmop_id:
2521 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2522 this method will update only the index 2, but it will write on database the concatenated content of the list
2523 :return:
2524 """
2525 db_nsr_update = {}
2526 failed_detail = []
2527 ro_nsr_id = ro_delete_action = None
2528 if nsr_deployed and nsr_deployed.get("RO"):
2529 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2530 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2531 try:
2532 if ro_nsr_id:
2533 stage[2] = "Deleting ns from VIM."
2534 db_nsr_update["detailed-status"] = " ".join(stage)
2535 self._write_op_status(nslcmop_id, stage)
2536 self.logger.debug(logging_text + stage[2])
2537 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2538 self._write_op_status(nslcmop_id, stage)
2539 desc = await self.RO.delete("ns", ro_nsr_id)
2540 ro_delete_action = desc["action_id"]
2541 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2542 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2543 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2544 if ro_delete_action:
2545 # wait until NS is deleted from VIM
2546 stage[2] = "Waiting ns deleted from VIM."
2547 detailed_status_old = None
2548 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2549 ro_delete_action))
2550 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2551 self._write_op_status(nslcmop_id, stage)
2552
2553 delete_timeout = 20 * 60 # 20 minutes
2554 while delete_timeout > 0:
2555 desc = await self.RO.show(
2556 "ns",
2557 item_id_name=ro_nsr_id,
2558 extra_item="action",
2559 extra_item_id=ro_delete_action)
2560
2561 # deploymentStatus
2562 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2563
2564 ns_status, ns_status_info = self.RO.check_action_status(desc)
2565 if ns_status == "ERROR":
2566 raise ROclient.ROClientException(ns_status_info)
2567 elif ns_status == "BUILD":
2568 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2569 elif ns_status == "ACTIVE":
2570 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2571 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2572 break
2573 else:
2574 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2575 if stage[2] != detailed_status_old:
2576 detailed_status_old = stage[2]
2577 db_nsr_update["detailed-status"] = " ".join(stage)
2578 self._write_op_status(nslcmop_id, stage)
2579 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2580 await asyncio.sleep(5, loop=self.loop)
2581 delete_timeout -= 5
2582 else: # delete_timeout <= 0:
2583 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2584
2585 except Exception as e:
2586 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2587 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2588 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2589 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2590 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2591 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2592 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2593 failed_detail.append("delete conflict: {}".format(e))
2594 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2595 else:
2596 failed_detail.append("delete error: {}".format(e))
2597 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2598
2599 # Delete nsd
2600 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2601 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2602 try:
2603 stage[2] = "Deleting nsd from RO."
2604 db_nsr_update["detailed-status"] = " ".join(stage)
2605 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2606 self._write_op_status(nslcmop_id, stage)
2607 await self.RO.delete("nsd", ro_nsd_id)
2608 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2609 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2610 except Exception as e:
2611 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2612 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2613 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2614 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2615 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2616 self.logger.debug(logging_text + failed_detail[-1])
2617 else:
2618 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2619 self.logger.error(logging_text + failed_detail[-1])
2620
2621 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2622 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2623 if not vnf_deployed or not vnf_deployed["id"]:
2624 continue
2625 try:
2626 ro_vnfd_id = vnf_deployed["id"]
2627 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2628 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2629 db_nsr_update["detailed-status"] = " ".join(stage)
2630 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2631 self._write_op_status(nslcmop_id, stage)
2632 await self.RO.delete("vnfd", ro_vnfd_id)
2633 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2634 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2635 except Exception as e:
2636 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2637 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2638 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2639 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2640 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2641 self.logger.debug(logging_text + failed_detail[-1])
2642 else:
2643 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2644 self.logger.error(logging_text + failed_detail[-1])
2645
2646 if failed_detail:
2647 stage[2] = "Error deleting from VIM"
2648 else:
2649 stage[2] = "Deleted from VIM"
2650 db_nsr_update["detailed-status"] = " ".join(stage)
2651 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2652 self._write_op_status(nslcmop_id, stage)
2653
2654 if failed_detail:
2655 raise LcmException("; ".join(failed_detail))
2656
2657 async def terminate(self, nsr_id, nslcmop_id):
2658 # Try to lock HA task here
2659 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2660 if not task_is_locked_by_me:
2661 return
2662
2663 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2664 self.logger.debug(logging_text + "Enter")
2665 timeout_ns_terminate = self.timeout_ns_terminate
2666 db_nsr = None
2667 db_nslcmop = None
2668 exc = None
2669 error_list = [] # annotates all failed error messages
2670 db_nslcmop_update = {}
2671 autoremove = False # autoremove after terminated
2672 tasks_dict_info = {}
2673 db_nsr_update = {}
2674 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2675 # ^ contains [stage, step, VIM-status]
2676 try:
2677 # wait for any previous tasks in process
2678 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2679
2680 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2681 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2682 operation_params = db_nslcmop.get("operationParams") or {}
2683 if operation_params.get("timeout_ns_terminate"):
2684 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2685 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2686 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2687
2688 db_nsr_update["operational-status"] = "terminating"
2689 db_nsr_update["config-status"] = "terminating"
2690 self._write_ns_status(
2691 nsr_id=nsr_id,
2692 ns_state="TERMINATING",
2693 current_operation="TERMINATING",
2694 current_operation_id=nslcmop_id,
2695 other_update=db_nsr_update
2696 )
2697 self._write_op_status(
2698 op_id=nslcmop_id,
2699 queuePosition=0,
2700 stage=stage
2701 )
2702 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2703 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2704 return
2705
2706 stage[1] = "Getting vnf descriptors from db."
2707 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2708 db_vnfds_from_id = {}
2709 db_vnfds_from_member_index = {}
2710 # Loop over VNFRs
2711 for vnfr in db_vnfrs_list:
2712 vnfd_id = vnfr["vnfd-id"]
2713 if vnfd_id not in db_vnfds_from_id:
2714 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2715 db_vnfds_from_id[vnfd_id] = vnfd
2716 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2717
2718 # Destroy individual execution environments when there are terminating primitives.
2719 # Rest of EE will be deleted at once
2720 if not operation_params.get("skip_terminate_primitives"):
2721 stage[0] = "Stage 2/3 execute terminating primitives."
2722 stage[1] = "Looking execution environment that needs terminate."
2723 self.logger.debug(logging_text + stage[1])
2724 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2725 config_descriptor = None
2726 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2727 continue
2728 if not vca.get("member-vnf-index"):
2729 # ns
2730 config_descriptor = db_nsr.get("ns-configuration")
2731 elif vca.get("vdu_id"):
2732 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2733 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2734 if vdud:
2735 config_descriptor = vdud.get("vdu-configuration")
2736 elif vca.get("kdu_name"):
2737 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2738 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2739 if kdud:
2740 config_descriptor = kdud.get("kdu-configuration")
2741 else:
2742 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2743 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2744 vca_index, False))
2745 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2746
2747 # wait for pending tasks of terminate primitives
2748 if tasks_dict_info:
2749 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2750 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2751 min(self.timeout_charm_delete, timeout_ns_terminate),
2752 stage, nslcmop_id)
2753 if error_list:
2754 return # raise LcmException("; ".join(error_list))
2755 tasks_dict_info.clear()
2756
2757 # remove All execution environments at once
2758 stage[0] = "Stage 3/3 delete all."
2759 stage[1] = "Deleting all execution environments."
2760 self.logger.debug(logging_text + stage[1])
2761
2762 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2763 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2764 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2765
2766 # Delete from k8scluster
2767 stage[1] = "Deleting KDUs."
2768 self.logger.debug(logging_text + stage[1])
2769 # print(nsr_deployed)
2770 for kdu in get_iterable(nsr_deployed, "K8s"):
2771 if not kdu or not kdu.get("kdu-instance"):
2772 continue
2773 kdu_instance = kdu.get("kdu-instance")
2774 if kdu.get("k8scluster-type") in self.k8scluster_map:
2775 task_delete_kdu_instance = asyncio.ensure_future(
2776 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2777 cluster_uuid=kdu.get("k8scluster-uuid"),
2778 kdu_instance=kdu_instance))
2779 else:
2780 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2781 format(kdu.get("k8scluster-type")))
2782 continue
2783 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2784
2785 # remove from RO
2786 stage[1] = "Deleting ns from VIM."
2787 task_delete_ro = asyncio.ensure_future(
2788 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2789 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2790
2791 # rest of staff will be done at finally
2792
2793 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2794 self.logger.error(logging_text + "Exit Exception {}".format(e))
2795 exc = e
2796 except asyncio.CancelledError:
2797 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2798 exc = "Operation was cancelled"
2799 except Exception as e:
2800 exc = traceback.format_exc()
2801 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2802 finally:
2803 if exc:
2804 error_list.append(str(exc))
2805 try:
2806 # wait for pending tasks
2807 if tasks_dict_info:
2808 stage[1] = "Waiting for terminate pending tasks."
2809 self.logger.debug(logging_text + stage[1])
2810 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2811 stage, nslcmop_id)
2812 stage[1] = stage[2] = ""
2813 except asyncio.CancelledError:
2814 error_list.append("Cancelled")
2815 # TODO cancell all tasks
2816 except Exception as exc:
2817 error_list.append(str(exc))
2818 # update status at database
2819 if error_list:
2820 error_detail = "; ".join(error_list)
2821 # self.logger.error(logging_text + error_detail)
2822 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2823 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2824
2825 db_nsr_update["operational-status"] = "failed"
2826 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2827 db_nslcmop_update["detailed-status"] = error_detail
2828 nslcmop_operation_state = "FAILED"
2829 ns_state = "BROKEN"
2830 else:
2831 error_detail = None
2832 error_description_nsr = error_description_nslcmop = None
2833 ns_state = "NOT_INSTANTIATED"
2834 db_nsr_update["operational-status"] = "terminated"
2835 db_nsr_update["detailed-status"] = "Done"
2836 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2837 db_nslcmop_update["detailed-status"] = "Done"
2838 nslcmop_operation_state = "COMPLETED"
2839
2840 if db_nsr:
2841 self._write_ns_status(
2842 nsr_id=nsr_id,
2843 ns_state=ns_state,
2844 current_operation="IDLE",
2845 current_operation_id=None,
2846 error_description=error_description_nsr,
2847 error_detail=error_detail,
2848 other_update=db_nsr_update
2849 )
2850 if db_nslcmop:
2851 self._write_op_status(
2852 op_id=nslcmop_id,
2853 stage="",
2854 error_message=error_description_nslcmop,
2855 operation_state=nslcmop_operation_state,
2856 other_update=db_nslcmop_update,
2857 )
2858 autoremove = operation_params.get("autoremove", False)
2859 if nslcmop_operation_state:
2860 try:
2861 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2862 "operationState": nslcmop_operation_state,
2863 "autoremove": autoremove},
2864 loop=self.loop)
2865 except Exception as e:
2866 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2867
2868 self.logger.debug(logging_text + "Exit")
2869 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2870
2871 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2872 time_start = time()
2873 error_detail_list = []
2874 error_list = []
2875 pending_tasks = list(created_tasks_info.keys())
2876 num_tasks = len(pending_tasks)
2877 num_done = 0
2878 stage[1] = "{}/{}.".format(num_done, num_tasks)
2879 self._write_op_status(nslcmop_id, stage)
2880 while pending_tasks:
2881 new_error = None
2882 _timeout = timeout + time_start - time()
2883 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2884 return_when=asyncio.FIRST_COMPLETED)
2885 num_done += len(done)
2886 if not done: # Timeout
2887 for task in pending_tasks:
2888 new_error = created_tasks_info[task] + ": Timeout"
2889 error_detail_list.append(new_error)
2890 error_list.append(new_error)
2891 break
2892 for task in done:
2893 if task.cancelled():
2894 new_error = created_tasks_info[task] + ": Cancelled"
2895 self.logger.warn(logging_text + new_error)
2896 error_detail_list.append(new_error)
2897 error_list.append(new_error)
2898 else:
2899 exc = task.exception()
2900 if exc:
2901 new_error = created_tasks_info[task] + ": {}".format(exc)
2902 error_list.append(created_tasks_info[task])
2903 error_detail_list.append(new_error)
2904 if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)):
2905 self.logger.error(logging_text + new_error)
2906 else:
2907 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2908 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2909 else:
2910 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2911 stage[1] = "{}/{}.".format(num_done, num_tasks)
2912 if new_error:
2913 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2914 if nsr_id: # update also nsr
2915 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2916 "errorDetail": ". ".join(error_detail_list)})
2917 self._write_op_status(nslcmop_id, stage)
2918 return error_detail_list
2919
2920 @staticmethod
2921 def _map_primitive_params(primitive_desc, params, instantiation_params):
2922 """
2923 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2924 The default-value is used. If it is between < > it look for a value at instantiation_params
2925 :param primitive_desc: portion of VNFD/NSD that describes primitive
2926 :param params: Params provided by user
2927 :param instantiation_params: Instantiation params provided by user
2928 :return: a dictionary with the calculated params
2929 """
2930 calculated_params = {}
2931 for parameter in primitive_desc.get("parameter", ()):
2932 param_name = parameter["name"]
2933 if param_name in params:
2934 calculated_params[param_name] = params[param_name]
2935 elif "default-value" in parameter or "value" in parameter:
2936 if "value" in parameter:
2937 calculated_params[param_name] = parameter["value"]
2938 else:
2939 calculated_params[param_name] = parameter["default-value"]
2940 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2941 and calculated_params[param_name].endswith(">"):
2942 if calculated_params[param_name][1:-1] in instantiation_params:
2943 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2944 else:
2945 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2946 format(calculated_params[param_name], primitive_desc["name"]))
2947 else:
2948 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2949 format(param_name, primitive_desc["name"]))
2950
2951 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2952 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2953 width=256)
2954 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2955 calculated_params[param_name] = calculated_params[param_name][7:]
2956
2957 # add always ns_config_info if primitive name is config
2958 if primitive_desc["name"] == "config":
2959 if "ns_config_info" in instantiation_params:
2960 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2961 return calculated_params
2962
2963 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None):
2964 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2965 for vca in deployed_vca:
2966 if not vca:
2967 continue
2968 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2969 continue
2970 if vdu_name and vdu_name != vca["vdu_name"]:
2971 continue
2972 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2973 continue
2974 if kdu_name and kdu_name != vca["kdu_name"]:
2975 continue
2976 break
2977 else:
2978 # vca_deployed not found
2979 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2980 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2981
2982 # get ee_id
2983 ee_id = vca.get("ee_id")
2984 if not ee_id:
2985 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2986 "execution environment"
2987 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2988 return ee_id
2989
2990 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
2991 retries_interval=30) -> (str, str):
2992 try:
2993 if primitive == "config":
2994 primitive_params = {"params": primitive_params}
2995
2996 while retries >= 0:
2997 try:
2998 output = await self.n2vc.exec_primitive(
2999 ee_id=ee_id,
3000 primitive_name=primitive,
3001 params_dict=primitive_params
3002 )
3003 # execution was OK
3004 break
3005 except Exception as e:
3006 retries -= 1
3007 if retries >= 0:
3008 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3009 # wait and retry
3010 await asyncio.sleep(retries_interval, loop=self.loop)
3011 else:
3012 return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e)
3013
3014 return 'COMPLETED', output
3015
3016 except LcmException:
3017 raise
3018 except Exception as e:
3019 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3020
3021 async def action(self, nsr_id, nslcmop_id):
3022
3023 # Try to lock HA task here
3024 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3025 if not task_is_locked_by_me:
3026 return
3027
3028 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3029 self.logger.debug(logging_text + "Enter")
3030 # get all needed from database
3031 db_nsr = None
3032 db_nslcmop = None
3033 db_nsr_update = {}
3034 db_nslcmop_update = {}
3035 nslcmop_operation_state = None
3036 nslcmop_operation_state_detail = None
3037 exc = None
3038 try:
3039 # wait for any previous tasks in process
3040 step = "Waiting for previous operations to terminate"
3041 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3042
3043 self._write_ns_status(
3044 nsr_id=nsr_id,
3045 ns_state=None,
3046 current_operation="RUNNING ACTION",
3047 current_operation_id=nslcmop_id
3048 )
3049
3050 step = "Getting information from database"
3051 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3052 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3053
3054 nsr_deployed = db_nsr["_admin"].get("deployed")
3055 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3056 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3057 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3058 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3059 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3060
3061 if vnf_index:
3062 step = "Getting vnfr from database"
3063 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3064 step = "Getting vnfd from database"
3065 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3066 else:
3067 if db_nsr.get("nsd"):
3068 db_nsd = db_nsr.get("nsd") # TODO this will be removed
3069 else:
3070 step = "Getting nsd from database"
3071 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3072
3073 # for backward compatibility
3074 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3075 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3076 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3077 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3078
3079 primitive = db_nslcmop["operationParams"]["primitive"]
3080 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3081
3082 # look for primitive
3083 config_primitive_desc = None
3084 if vdu_id:
3085 for vdu in get_iterable(db_vnfd, "vdu"):
3086 if vdu_id == vdu["id"]:
3087 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
3088 if config_primitive["name"] == primitive:
3089 config_primitive_desc = config_primitive
3090 break
3091 elif kdu_name:
3092 self.logger.debug(logging_text + "Checking actions in KDUs")
3093 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3094 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
3095 if primitive_params:
3096 desc_params.update(primitive_params)
3097 # TODO Check if we will need something at vnf level
3098 index = 0
3099 for kdu in get_iterable(nsr_deployed, "K8s"):
3100 if kdu_name == kdu["kdu-name"]:
3101 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3102 "path": "_admin.deployed.K8s.{}".format(index)}
3103 if primitive == "upgrade":
3104 if desc_params.get("kdu_model"):
3105 kdu_model = desc_params.get("kdu_model")
3106 del desc_params["kdu_model"]
3107 else:
3108 kdu_model = kdu.get("kdu-model")
3109 parts = kdu_model.split(sep=":")
3110 if len(parts) == 2:
3111 kdu_model = parts[0]
3112
3113 if kdu.get("k8scluster-type") in self.k8scluster_map:
3114 output = await self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3115 cluster_uuid=kdu.get("k8scluster-uuid"),
3116 kdu_instance=kdu.get("kdu-instance"),
3117 atomic=True, kdu_model=kdu_model,
3118 params=desc_params, db_dict=db_dict,
3119 timeout=300)
3120
3121 else:
3122 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3123 raise LcmException(msg)
3124
3125 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3126 break
3127 elif primitive == "rollback":
3128 if kdu.get("k8scluster-type") in self.k8scluster_map:
3129 output = await self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3130 cluster_uuid=kdu.get("k8scluster-uuid"),
3131 kdu_instance=kdu.get("kdu-instance"),
3132 db_dict=db_dict)
3133 else:
3134 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3135 raise LcmException(msg)
3136 break
3137 elif primitive == "status":
3138 if kdu.get("k8scluster-type") in self.k8scluster_map:
3139 output = await self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3140 cluster_uuid=kdu.get("k8scluster-uuid"),
3141 kdu_instance=kdu.get("kdu-instance"))
3142 else:
3143 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3144 raise LcmException(msg)
3145 break
3146 index += 1
3147
3148 else:
3149 raise LcmException("KDU '{}' not found".format(kdu_name))
3150 if output:
3151 db_nslcmop_update["detailed-status"] = output
3152 db_nslcmop_update["operationState"] = 'COMPLETED'
3153 db_nslcmop_update["statusEnteredTime"] = time()
3154 else:
3155 db_nslcmop_update["detailed-status"] = ''
3156 db_nslcmop_update["operationState"] = 'FAILED'
3157 db_nslcmop_update["statusEnteredTime"] = time()
3158 return
3159 elif vnf_index:
3160 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3161 if config_primitive["name"] == primitive:
3162 config_primitive_desc = config_primitive
3163 break
3164 else:
3165 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3166 if config_primitive["name"] == primitive:
3167 config_primitive_desc = config_primitive
3168 break
3169
3170 if not config_primitive_desc:
3171 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3172 format(primitive))
3173
3174 desc_params = {}
3175 if vnf_index:
3176 if db_vnfr.get("additionalParamsForVnf"):
3177 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3178 if vdu_id:
3179 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3180 if vdur.get("additionalParams"):
3181 desc_params = self._format_additional_params(vdur["additionalParams"])
3182 else:
3183 if db_nsr.get("additionalParamsForNs"):
3184 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3185
3186 # TODO check if ns is in a proper status
3187 result, detailed_status = await self._ns_execute_primitive(
3188 self._look_for_deployed_vca(nsr_deployed["VCA"],
3189 member_vnf_index=vnf_index,
3190 vdu_id=vdu_id,
3191 vdu_name=vdu_name,
3192 vdu_count_index=vdu_count_index),
3193 primitive=primitive,
3194 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3195
3196 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3197 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3198 db_nslcmop_update["statusEnteredTime"] = time()
3199 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3200 return # database update is called inside finally
3201
3202 except (DbException, LcmException) as e:
3203 self.logger.error(logging_text + "Exit Exception {}".format(e))
3204 exc = e
3205 except asyncio.CancelledError:
3206 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3207 exc = "Operation was cancelled"
3208 except Exception as e:
3209 exc = traceback.format_exc()
3210 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3211 finally:
3212 if exc and db_nslcmop:
3213 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3214 "FAILED {}: {}".format(step, exc)
3215 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3216 db_nslcmop_update["statusEnteredTime"] = time()
3217 try:
3218 if db_nslcmop_update:
3219 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3220 if db_nsr:
3221 self._write_ns_status(
3222 nsr_id=nsr_id,
3223 ns_state=None,
3224 current_operation="IDLE",
3225 current_operation_id=None,
3226 other_update=db_nsr_update
3227 )
3228 if exc:
3229 self._write_op_status(
3230 op_id=nslcmop_id,
3231 error_message=nslcmop_operation_state_detail
3232 )
3233 except DbException as e:
3234 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3235 self.logger.debug(logging_text + "Exit")
3236 if nslcmop_operation_state:
3237 try:
3238 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3239 "operationState": nslcmop_operation_state},
3240 loop=self.loop)
3241 except Exception as e:
3242 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3243 self.logger.debug(logging_text + "Exit")
3244 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3245 return nslcmop_operation_state, nslcmop_operation_state_detail
3246
3247 async def scale(self, nsr_id, nslcmop_id):
3248
3249 # Try to lock HA task here
3250 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3251 if not task_is_locked_by_me:
3252 return
3253
3254 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3255 self.logger.debug(logging_text + "Enter")
3256 # get all needed from database
3257 db_nsr = None
3258 db_nslcmop = None
3259 db_nslcmop_update = {}
3260 nslcmop_operation_state = None
3261 db_nsr_update = {}
3262 exc = None
3263 # in case of error, indicates what part of scale was failed to put nsr at error status
3264 scale_process = None
3265 old_operational_status = ""
3266 old_config_status = ""
3267 vnfr_scaled = False
3268 try:
3269 # wait for any previous tasks in process
3270 step = "Waiting for previous operations to terminate"
3271 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3272
3273 self._write_ns_status(
3274 nsr_id=nsr_id,
3275 ns_state=None,
3276 current_operation="SCALING",
3277 current_operation_id=nslcmop_id
3278 )
3279
3280 step = "Getting nslcmop from database"
3281 self.logger.debug(step + " after having waited for previous tasks to be completed")
3282 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3283 step = "Getting nsr from database"
3284 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3285
3286 old_operational_status = db_nsr["operational-status"]
3287 old_config_status = db_nsr["config-status"]
3288 step = "Parsing scaling parameters"
3289 # self.logger.debug(step)
3290 db_nsr_update["operational-status"] = "scaling"
3291 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3292 nsr_deployed = db_nsr["_admin"].get("deployed")
3293
3294 #######
3295 nsr_deployed = db_nsr["_admin"].get("deployed")
3296 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3297 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3298 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3299 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3300 #######
3301
3302 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3303 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3304 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3305 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3306 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3307
3308 # for backward compatibility
3309 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3310 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3311 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3312 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3313
3314 step = "Getting vnfr from database"
3315 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3316 step = "Getting vnfd from database"
3317 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3318
3319 step = "Getting scaling-group-descriptor"
3320 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3321 if scaling_descriptor["name"] == scaling_group:
3322 break
3323 else:
3324 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3325 "at vnfd:scaling-group-descriptor".format(scaling_group))
3326
3327 # cooldown_time = 0
3328 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3329 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3330 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3331 # break
3332
3333 # TODO check if ns is in a proper status
3334 step = "Sending scale order to VIM"
3335 nb_scale_op = 0
3336 if not db_nsr["_admin"].get("scaling-group"):
3337 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3338 admin_scale_index = 0
3339 else:
3340 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3341 if admin_scale_info["name"] == scaling_group:
3342 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3343 break
3344 else: # not found, set index one plus last element and add new entry with the name
3345 admin_scale_index += 1
3346 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3347 RO_scaling_info = []
3348 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3349 if scaling_type == "SCALE_OUT":
3350 # count if max-instance-count is reached
3351 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3352 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3353 if nb_scale_op >= max_instance_count:
3354 raise LcmException("reached the limit of {} (max-instance-count) "
3355 "scaling-out operations for the "
3356 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3357
3358 nb_scale_op += 1
3359 vdu_scaling_info["scaling_direction"] = "OUT"
3360 vdu_scaling_info["vdu-create"] = {}
3361 for vdu_scale_info in scaling_descriptor["vdu"]:
3362 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3363 "type": "create", "count": vdu_scale_info.get("count", 1)})
3364 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3365
3366 elif scaling_type == "SCALE_IN":
3367 # count if min-instance-count is reached
3368 min_instance_count = 0
3369 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3370 min_instance_count = int(scaling_descriptor["min-instance-count"])
3371 if nb_scale_op <= min_instance_count:
3372 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3373 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3374 nb_scale_op -= 1
3375 vdu_scaling_info["scaling_direction"] = "IN"
3376 vdu_scaling_info["vdu-delete"] = {}
3377 for vdu_scale_info in scaling_descriptor["vdu"]:
3378 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3379 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3380 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3381
3382 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3383 vdu_create = vdu_scaling_info.get("vdu-create")
3384 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3385 if vdu_scaling_info["scaling_direction"] == "IN":
3386 for vdur in reversed(db_vnfr["vdur"]):
3387 if vdu_delete.get(vdur["vdu-id-ref"]):
3388 vdu_delete[vdur["vdu-id-ref"]] -= 1
3389 vdu_scaling_info["vdu"].append({
3390 "name": vdur["name"],
3391 "vdu_id": vdur["vdu-id-ref"],
3392 "interface": []
3393 })
3394 for interface in vdur["interfaces"]:
3395 vdu_scaling_info["vdu"][-1]["interface"].append({
3396 "name": interface["name"],
3397 "ip_address": interface["ip-address"],
3398 "mac_address": interface.get("mac-address"),
3399 })
3400 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3401
3402 # PRE-SCALE BEGIN
3403 step = "Executing pre-scale vnf-config-primitive"
3404 if scaling_descriptor.get("scaling-config-action"):
3405 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3406 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3407 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3408 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3409 step = db_nslcmop_update["detailed-status"] = \
3410 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3411
3412 # look for primitive
3413 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3414 if config_primitive["name"] == vnf_config_primitive:
3415 break
3416 else:
3417 raise LcmException(
3418 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3419 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3420 "primitive".format(scaling_group, config_primitive))
3421
3422 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3423 if db_vnfr.get("additionalParamsForVnf"):
3424 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3425
3426 scale_process = "VCA"
3427 db_nsr_update["config-status"] = "configuring pre-scaling"
3428 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3429
3430 # Pre-scale reintent check: Check if this sub-operation has been executed before
3431 op_index = self._check_or_add_scale_suboperation(
3432 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3433 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3434 # Skip sub-operation
3435 result = 'COMPLETED'
3436 result_detail = 'Done'
3437 self.logger.debug(logging_text +
3438 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3439 vnf_config_primitive, result, result_detail))
3440 else:
3441 if (op_index == self.SUBOPERATION_STATUS_NEW):
3442 # New sub-operation: Get index of this sub-operation
3443 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3444 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3445 format(vnf_config_primitive))
3446 else:
3447 # Reintent: Get registered params for this existing sub-operation
3448 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3449 vnf_index = op.get('member_vnf_index')
3450 vnf_config_primitive = op.get('primitive')
3451 primitive_params = op.get('primitive_params')
3452 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3453 format(vnf_config_primitive))
3454 # Execute the primitive, either with new (first-time) or registered (reintent) args
3455 result, result_detail = await self._ns_execute_primitive(
3456 self._look_for_deployed_vca(nsr_deployed["VCA"],
3457 member_vnf_index=vnf_index,
3458 vdu_id=None,
3459 vdu_name=None,
3460 vdu_count_index=None),
3461 vnf_config_primitive, primitive_params)
3462 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3463 vnf_config_primitive, result, result_detail))
3464 # Update operationState = COMPLETED | FAILED
3465 self._update_suboperation_status(
3466 db_nslcmop, op_index, result, result_detail)
3467
3468 if result == "FAILED":
3469 raise LcmException(result_detail)
3470 db_nsr_update["config-status"] = old_config_status
3471 scale_process = None
3472 # PRE-SCALE END
3473
3474 # SCALE RO - BEGIN
3475 # Should this block be skipped if 'RO_nsr_id' == None ?
3476 # if (RO_nsr_id and RO_scaling_info):
3477 if RO_scaling_info:
3478 scale_process = "RO"
3479 # Scale RO reintent check: Check if this sub-operation has been executed before
3480 op_index = self._check_or_add_scale_suboperation(
3481 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3482 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3483 # Skip sub-operation
3484 result = 'COMPLETED'
3485 result_detail = 'Done'
3486 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3487 result, result_detail))
3488 else:
3489 if (op_index == self.SUBOPERATION_STATUS_NEW):
3490 # New sub-operation: Get index of this sub-operation
3491 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3492 self.logger.debug(logging_text + "New sub-operation RO")
3493 else:
3494 # Reintent: Get registered params for this existing sub-operation
3495 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3496 RO_nsr_id = op.get('RO_nsr_id')
3497 RO_scaling_info = op.get('RO_scaling_info')
3498 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3499 vnf_config_primitive))
3500
3501 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3502 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3503 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3504 # wait until ready
3505 RO_nslcmop_id = RO_desc["instance_action_id"]
3506 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3507
3508 RO_task_done = False
3509 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3510 detailed_status_old = None
3511 self.logger.debug(logging_text + step)
3512
3513 deployment_timeout = 1 * 3600 # One hour
3514 while deployment_timeout > 0:
3515 if not RO_task_done:
3516 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3517 extra_item_id=RO_nslcmop_id)
3518
3519 # deploymentStatus
3520 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3521
3522 ns_status, ns_status_info = self.RO.check_action_status(desc)
3523 if ns_status == "ERROR":
3524 raise ROclient.ROClientException(ns_status_info)
3525 elif ns_status == "BUILD":
3526 detailed_status = step + "; {}".format(ns_status_info)
3527 elif ns_status == "ACTIVE":
3528 RO_task_done = True
3529 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3530 self.logger.debug(logging_text + step)
3531 else:
3532 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3533 else:
3534
3535 if ns_status == "ERROR":
3536 raise ROclient.ROClientException(ns_status_info)
3537 elif ns_status == "BUILD":
3538 detailed_status = step + "; {}".format(ns_status_info)
3539 elif ns_status == "ACTIVE":
3540 step = detailed_status = \
3541 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3542 if not vnfr_scaled:
3543 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3544 vnfr_scaled = True
3545 try:
3546 desc = await self.RO.show("ns", RO_nsr_id)
3547
3548 # deploymentStatus
3549 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3550
3551 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3552 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3553 break
3554 except LcmExceptionNoMgmtIP:
3555 pass
3556 else:
3557 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3558 if detailed_status != detailed_status_old:
3559 self._update_suboperation_status(
3560 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3561 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3562 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3563
3564 await asyncio.sleep(5, loop=self.loop)
3565 deployment_timeout -= 5
3566 if deployment_timeout <= 0:
3567 self._update_suboperation_status(
3568 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3569 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3570
3571 # update VDU_SCALING_INFO with the obtained ip_addresses
3572 if vdu_scaling_info["scaling_direction"] == "OUT":
3573 for vdur in reversed(db_vnfr["vdur"]):
3574 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3575 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3576 vdu_scaling_info["vdu"].append({
3577 "name": vdur["name"],
3578 "vdu_id": vdur["vdu-id-ref"],
3579 "interface": []
3580 })
3581 for interface in vdur["interfaces"]:
3582 vdu_scaling_info["vdu"][-1]["interface"].append({
3583 "name": interface["name"],
3584 "ip_address": interface["ip-address"],
3585 "mac_address": interface.get("mac-address"),
3586 })
3587 del vdu_scaling_info["vdu-create"]
3588
3589 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3590 # SCALE RO - END
3591
3592 scale_process = None
3593 if db_nsr_update:
3594 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3595
3596 # POST-SCALE BEGIN
3597 # execute primitive service POST-SCALING
3598 step = "Executing post-scale vnf-config-primitive"
3599 if scaling_descriptor.get("scaling-config-action"):
3600 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3601 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3602 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3603 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3604 step = db_nslcmop_update["detailed-status"] = \
3605 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3606
3607 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3608 if db_vnfr.get("additionalParamsForVnf"):
3609 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3610
3611 # look for primitive
3612 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3613 if config_primitive["name"] == vnf_config_primitive:
3614 break
3615 else:
3616 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3617 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3618 "match any vnf-configuration:config-primitive".format(scaling_group,
3619 config_primitive))
3620 scale_process = "VCA"
3621 db_nsr_update["config-status"] = "configuring post-scaling"
3622 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3623
3624 # Post-scale reintent check: Check if this sub-operation has been executed before
3625 op_index = self._check_or_add_scale_suboperation(
3626 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3627 if op_index == self.SUBOPERATION_STATUS_SKIP:
3628 # Skip sub-operation
3629 result = 'COMPLETED'
3630 result_detail = 'Done'
3631 self.logger.debug(logging_text +
3632 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3633 format(vnf_config_primitive, result, result_detail))
3634 else:
3635 if op_index == self.SUBOPERATION_STATUS_NEW:
3636 # New sub-operation: Get index of this sub-operation
3637 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3638 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3639 format(vnf_config_primitive))
3640 else:
3641 # Reintent: Get registered params for this existing sub-operation
3642 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3643 vnf_index = op.get('member_vnf_index')
3644 vnf_config_primitive = op.get('primitive')
3645 primitive_params = op.get('primitive_params')
3646 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3647 format(vnf_config_primitive))
3648 # Execute the primitive, either with new (first-time) or registered (reintent) args
3649 result, result_detail = await self._ns_execute_primitive(
3650 self._look_for_deployed_vca(nsr_deployed["VCA"],
3651 member_vnf_index=vnf_index,
3652 vdu_id=None,
3653 vdu_name=None,
3654 vdu_count_index=None),
3655 vnf_config_primitive, primitive_params)
3656 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3657 vnf_config_primitive, result, result_detail))
3658 # Update operationState = COMPLETED | FAILED
3659 self._update_suboperation_status(
3660 db_nslcmop, op_index, result, result_detail)
3661
3662 if result == "FAILED":
3663 raise LcmException(result_detail)
3664 db_nsr_update["config-status"] = old_config_status
3665 scale_process = None
3666 # POST-SCALE END
3667
3668 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3669 db_nslcmop_update["statusEnteredTime"] = time()
3670 db_nslcmop_update["detailed-status"] = "done"
3671 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3672 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3673 else old_operational_status
3674 db_nsr_update["config-status"] = old_config_status
3675 return
3676 except (ROclient.ROClientException, DbException, LcmException) as e:
3677 self.logger.error(logging_text + "Exit Exception {}".format(e))
3678 exc = e
3679 except asyncio.CancelledError:
3680 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3681 exc = "Operation was cancelled"
3682 except Exception as e:
3683 exc = traceback.format_exc()
3684 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3685 finally:
3686 self._write_ns_status(
3687 nsr_id=nsr_id,
3688 ns_state=None,
3689 current_operation="IDLE",
3690 current_operation_id=None
3691 )
3692 if exc:
3693 if db_nslcmop:
3694 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3695 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3696 db_nslcmop_update["statusEnteredTime"] = time()
3697 if db_nsr:
3698 db_nsr_update["operational-status"] = old_operational_status
3699 db_nsr_update["config-status"] = old_config_status
3700 db_nsr_update["detailed-status"] = ""
3701 if scale_process:
3702 if "VCA" in scale_process:
3703 db_nsr_update["config-status"] = "failed"
3704 if "RO" in scale_process:
3705 db_nsr_update["operational-status"] = "failed"
3706 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3707 exc)
3708 try:
3709 if db_nslcmop and db_nslcmop_update:
3710 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3711 if db_nsr:
3712 self._write_ns_status(
3713 nsr_id=nsr_id,
3714 ns_state=None,
3715 current_operation="IDLE",
3716 current_operation_id=None,
3717 other_update=db_nsr_update
3718 )
3719
3720 except DbException as e:
3721 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3722 if nslcmop_operation_state:
3723 try:
3724 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3725 "operationState": nslcmop_operation_state},
3726 loop=self.loop)
3727 # if cooldown_time:
3728 # await asyncio.sleep(cooldown_time, loop=self.loop)
3729 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3730 except Exception as e:
3731 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3732 self.logger.debug(logging_text + "Exit")
3733 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")