fix 1049. Ignore juju model delete not found exception
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52 timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
53
54 SUBOPERATION_STATUS_NOT_FOUND = -1
55 SUBOPERATION_STATUS_NEW = -2
56 SUBOPERATION_STATUS_SKIP = -3
57 task_name_deploy_vca = "Deploying VCA"
58
59 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65 super().__init__(
66 db=db,
67 msg=msg,
68 fs=fs,
69 logger=logging.getLogger('lcm.ns')
70 )
71
72 self.loop = loop
73 self.lcm_tasks = lcm_tasks
74 self.timeout = config["timeout"]
75 self.ro_config = config["ro_config"]
76 self.vca_config = config["VCA"].copy()
77
78 # create N2VC connector
79 self.n2vc = N2VCJujuConnector(
80 db=self.db,
81 fs=self.fs,
82 log=self.logger,
83 loop=self.loop,
84 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
85 username=self.vca_config.get('user', None),
86 vca_config=self.vca_config,
87 on_update_db=self._on_update_n2vc_db
88 )
89
90 self.k8sclusterhelm = K8sHelmConnector(
91 kubectl_command=self.vca_config.get("kubectlpath"),
92 helm_command=self.vca_config.get("helmpath"),
93 fs=self.fs,
94 log=self.logger,
95 db=self.db,
96 on_update_db=None,
97 )
98
99 self.k8sclusterjuju = K8sJujuConnector(
100 kubectl_command=self.vca_config.get("kubectlpath"),
101 juju_command=self.vca_config.get("jujupath"),
102 fs=self.fs,
103 log=self.logger,
104 db=self.db,
105 on_update_db=None,
106 )
107
108 self.k8scluster_map = {
109 "helm-chart": self.k8sclusterhelm,
110 "chart": self.k8sclusterhelm,
111 "juju-bundle": self.k8sclusterjuju,
112 "juju": self.k8sclusterjuju,
113 }
114 # create RO client
115 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
116
117 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
118
119 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
120
121 try:
122 # TODO filter RO descriptor fields...
123
124 # write to database
125 db_dict = dict()
126 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
127 db_dict['deploymentStatus'] = ro_descriptor
128 self.update_db_2("nsrs", nsrs_id, db_dict)
129
130 except Exception as e:
131 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
132
133 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
134
135 # remove last dot from path (if exists)
136 if path.endswith('.'):
137 path = path[:-1]
138
139 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
140 # .format(table, filter, path, updated_data))
141
142 try:
143
144 nsr_id = filter.get('_id')
145
146 # read ns record from database
147 nsr = self.db.get_one(table='nsrs', q_filter=filter)
148 current_ns_status = nsr.get('nsState')
149
150 # get vca status for NS
151 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
152
153 # vcaStatus
154 db_dict = dict()
155 db_dict['vcaStatus'] = status_dict
156
157 # update configurationStatus for this VCA
158 try:
159 vca_index = int(path[path.rfind(".")+1:])
160
161 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
162 vca_status = vca_list[vca_index].get('status')
163
164 configuration_status_list = nsr.get('configurationStatus')
165 config_status = configuration_status_list[vca_index].get('status')
166
167 if config_status == 'BROKEN' and vca_status != 'failed':
168 db_dict['configurationStatus'][vca_index] = 'READY'
169 elif config_status != 'BROKEN' and vca_status == 'failed':
170 db_dict['configurationStatus'][vca_index] = 'BROKEN'
171 except Exception as e:
172 # not update configurationStatus
173 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
174
175 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
176 # if nsState = 'DEGRADED' check if all is OK
177 is_degraded = False
178 if current_ns_status in ('READY', 'DEGRADED'):
179 error_description = ''
180 # check machines
181 if status_dict.get('machines'):
182 for machine_id in status_dict.get('machines'):
183 machine = status_dict.get('machines').get(machine_id)
184 # check machine agent-status
185 if machine.get('agent-status'):
186 s = machine.get('agent-status').get('status')
187 if s != 'started':
188 is_degraded = True
189 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
190 # check machine instance status
191 if machine.get('instance-status'):
192 s = machine.get('instance-status').get('status')
193 if s != 'running':
194 is_degraded = True
195 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
196 # check applications
197 if status_dict.get('applications'):
198 for app_id in status_dict.get('applications'):
199 app = status_dict.get('applications').get(app_id)
200 # check application status
201 if app.get('status'):
202 s = app.get('status').get('status')
203 if s != 'active':
204 is_degraded = True
205 error_description += 'application {} status={} ; '.format(app_id, s)
206
207 if error_description:
208 db_dict['errorDescription'] = error_description
209 if current_ns_status == 'READY' and is_degraded:
210 db_dict['nsState'] = 'DEGRADED'
211 if current_ns_status == 'DEGRADED' and not is_degraded:
212 db_dict['nsState'] = 'READY'
213
214 # write to database
215 self.update_db_2("nsrs", nsr_id, db_dict)
216
217 except (asyncio.CancelledError, asyncio.TimeoutError):
218 raise
219 except Exception as e:
220 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
221
222 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
223 """
224 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
225 :param vnfd: input vnfd
226 :param new_id: overrides vnf id if provided
227 :param additionalParams: Instantiation params for VNFs provided
228 :param nsrId: Id of the NSR
229 :return: copy of vnfd
230 """
231 try:
232 vnfd_RO = deepcopy(vnfd)
233 # remove unused by RO configuration, monitoring, scaling and internal keys
234 vnfd_RO.pop("_id", None)
235 vnfd_RO.pop("_admin", None)
236 vnfd_RO.pop("vnf-configuration", None)
237 vnfd_RO.pop("monitoring-param", None)
238 vnfd_RO.pop("scaling-group-descriptor", None)
239 vnfd_RO.pop("kdu", None)
240 vnfd_RO.pop("k8s-cluster", None)
241 if new_id:
242 vnfd_RO["id"] = new_id
243
244 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
245 for vdu in get_iterable(vnfd_RO, "vdu"):
246 cloud_init_file = None
247 if vdu.get("cloud-init-file"):
248 base_folder = vnfd["_admin"]["storage"]
249 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
250 vdu["cloud-init-file"])
251 with self.fs.file_open(cloud_init_file, "r") as ci_file:
252 cloud_init_content = ci_file.read()
253 vdu.pop("cloud-init-file", None)
254 elif vdu.get("cloud-init"):
255 cloud_init_content = vdu["cloud-init"]
256 else:
257 continue
258
259 env = Environment()
260 ast = env.parse(cloud_init_content)
261 mandatory_vars = meta.find_undeclared_variables(ast)
262 if mandatory_vars:
263 for var in mandatory_vars:
264 if not additionalParams or var not in additionalParams.keys():
265 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
266 "file, must be provided in the instantiation parameters inside the "
267 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
268 template = Template(cloud_init_content)
269 cloud_init_content = template.render(additionalParams or {})
270 vdu["cloud-init"] = cloud_init_content
271
272 return vnfd_RO
273 except FsException as e:
274 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
275 format(vnfd["id"], vdu["id"], cloud_init_file, e))
276 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd["id"], vdu["id"], e))
279
280 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
281 """
282 Creates a RO ns descriptor from OSM ns_instantiate params
283 :param ns_params: OSM instantiate params
284 :return: The RO ns descriptor
285 """
286 vim_2_RO = {}
287 wim_2_RO = {}
288 # TODO feature 1417: Check that no instantiation is set over PDU
289 # check if PDU forces a concrete vim-network-id and add it
290 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
291
292 def vim_account_2_RO(vim_account):
293 if vim_account in vim_2_RO:
294 return vim_2_RO[vim_account]
295
296 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
297 if db_vim["_admin"]["operationalState"] != "ENABLED":
298 raise LcmException("VIM={} is not available. operationalState={}".format(
299 vim_account, db_vim["_admin"]["operationalState"]))
300 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
301 vim_2_RO[vim_account] = RO_vim_id
302 return RO_vim_id
303
304 def wim_account_2_RO(wim_account):
305 if isinstance(wim_account, str):
306 if wim_account in wim_2_RO:
307 return wim_2_RO[wim_account]
308
309 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
310 if db_wim["_admin"]["operationalState"] != "ENABLED":
311 raise LcmException("WIM={} is not available. operationalState={}".format(
312 wim_account, db_wim["_admin"]["operationalState"]))
313 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
314 wim_2_RO[wim_account] = RO_wim_id
315 return RO_wim_id
316 else:
317 return wim_account
318
319 def ip_profile_2_RO(ip_profile):
320 RO_ip_profile = deepcopy((ip_profile))
321 if "dns-server" in RO_ip_profile:
322 if isinstance(RO_ip_profile["dns-server"], list):
323 RO_ip_profile["dns-address"] = []
324 for ds in RO_ip_profile.pop("dns-server"):
325 RO_ip_profile["dns-address"].append(ds['address'])
326 else:
327 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
328 if RO_ip_profile.get("ip-version") == "ipv4":
329 RO_ip_profile["ip-version"] = "IPv4"
330 if RO_ip_profile.get("ip-version") == "ipv6":
331 RO_ip_profile["ip-version"] = "IPv6"
332 if "dhcp-params" in RO_ip_profile:
333 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
334 return RO_ip_profile
335
336 if not ns_params:
337 return None
338 RO_ns_params = {
339 # "name": ns_params["nsName"],
340 # "description": ns_params.get("nsDescription"),
341 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
342 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
343 # "scenario": ns_params["nsdId"],
344 }
345
346 n2vc_key_list = n2vc_key_list or []
347 for vnfd_ref, vnfd in vnfd_dict.items():
348 vdu_needed_access = []
349 mgmt_cp = None
350 if vnfd.get("vnf-configuration"):
351 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required and vnfd.get("mgmt-interface"):
353 if vnfd["mgmt-interface"].get("vdu-id"):
354 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
355 elif vnfd["mgmt-interface"].get("cp"):
356 mgmt_cp = vnfd["mgmt-interface"]["cp"]
357
358 for vdu in vnfd.get("vdu", ()):
359 if vdu.get("vdu-configuration"):
360 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
361 if ssh_required:
362 vdu_needed_access.append(vdu["id"])
363 elif mgmt_cp:
364 for vdu_interface in vdu.get("interface"):
365 if vdu_interface.get("external-connection-point-ref") and \
366 vdu_interface["external-connection-point-ref"] == mgmt_cp:
367 vdu_needed_access.append(vdu["id"])
368 mgmt_cp = None
369 break
370
371 if vdu_needed_access:
372 for vnf_member in nsd.get("constituent-vnfd"):
373 if vnf_member["vnfd-id-ref"] != vnfd_ref:
374 continue
375 for vdu in vdu_needed_access:
376 populate_dict(RO_ns_params,
377 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
378 n2vc_key_list)
379
380 if ns_params.get("vduImage"):
381 RO_ns_params["vduImage"] = ns_params["vduImage"]
382
383 if ns_params.get("ssh_keys"):
384 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
385 for vnf_params in get_iterable(ns_params, "vnf"):
386 for constituent_vnfd in nsd["constituent-vnfd"]:
387 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
388 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
389 break
390 else:
391 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
392 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
393 if vnf_params.get("vimAccountId"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
395 vim_account_2_RO(vnf_params["vimAccountId"]))
396
397 for vdu_params in get_iterable(vnf_params, "vdu"):
398 # TODO feature 1417: check that this VDU exist and it is not a PDU
399 if vdu_params.get("volume"):
400 for volume_params in vdu_params["volume"]:
401 if volume_params.get("vim-volume-id"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
404 volume_params["vim-volume-id"])
405 if vdu_params.get("interface"):
406 for interface_params in vdu_params["interface"]:
407 if interface_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
409 vdu_params["id"], "interfaces", interface_params["name"],
410 "ip_address"),
411 interface_params["ip-address"])
412 if interface_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
414 vdu_params["id"], "interfaces", interface_params["name"],
415 "mac_address"),
416 interface_params["mac-address"])
417 if interface_params.get("floating-ip-required"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
419 vdu_params["id"], "interfaces", interface_params["name"],
420 "floating-ip"),
421 interface_params["floating-ip-required"])
422
423 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
424 if internal_vld_params.get("vim-network-name"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
426 internal_vld_params["name"], "vim-network-name"),
427 internal_vld_params["vim-network-name"])
428 if internal_vld_params.get("vim-network-id"):
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "vim-network-id"),
431 internal_vld_params["vim-network-id"])
432 if internal_vld_params.get("ip-profile"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
434 internal_vld_params["name"], "ip-profile"),
435 ip_profile_2_RO(internal_vld_params["ip-profile"]))
436 if internal_vld_params.get("provider-network"):
437
438 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
439 internal_vld_params["name"], "provider-network"),
440 internal_vld_params["provider-network"].copy())
441
442 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
443 # look for interface
444 iface_found = False
445 for vdu_descriptor in vnf_descriptor["vdu"]:
446 for vdu_interface in vdu_descriptor["interface"]:
447 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
448 if icp_params.get("ip-address"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_descriptor["id"], "interfaces",
451 vdu_interface["name"], "ip_address"),
452 icp_params["ip-address"])
453
454 if icp_params.get("mac-address"):
455 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 vdu_interface["name"], "mac_address"),
458 icp_params["mac-address"])
459 iface_found = True
460 break
461 if iface_found:
462 break
463 else:
464 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
465 "internal-vld:id-ref={} is not present at vnfd:internal-"
466 "connection-point".format(vnf_params["member-vnf-index"],
467 icp_params["id-ref"]))
468
469 for vld_params in get_iterable(ns_params, "vld"):
470 if "ip-profile" in vld_params:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
472 ip_profile_2_RO(vld_params["ip-profile"]))
473
474 if vld_params.get("provider-network"):
475
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
477 vld_params["provider-network"].copy())
478
479 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
480 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
481 wim_account_2_RO(vld_params["wimAccountId"])),
482 if vld_params.get("vim-network-name"):
483 RO_vld_sites = []
484 if isinstance(vld_params["vim-network-name"], dict):
485 for vim_account, vim_net in vld_params["vim-network-name"].items():
486 RO_vld_sites.append({
487 "netmap-use": vim_net,
488 "datacenter": vim_account_2_RO(vim_account)
489 })
490 else: # isinstance str
491 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
492 if RO_vld_sites:
493 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
494
495 if vld_params.get("vim-network-id"):
496 RO_vld_sites = []
497 if isinstance(vld_params["vim-network-id"], dict):
498 for vim_account, vim_net in vld_params["vim-network-id"].items():
499 RO_vld_sites.append({
500 "netmap-use": vim_net,
501 "datacenter": vim_account_2_RO(vim_account)
502 })
503 else: # isinstance str
504 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
505 if RO_vld_sites:
506 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
507 if vld_params.get("ns-net"):
508 if isinstance(vld_params["ns-net"], dict):
509 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
510 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
512 if "vnfd-connection-point-ref" in vld_params:
513 for cp_params in vld_params["vnfd-connection-point-ref"]:
514 # look for interface
515 for constituent_vnfd in nsd["constituent-vnfd"]:
516 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
517 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
518 break
519 else:
520 raise LcmException(
521 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
522 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
523 match_cp = False
524 for vdu_descriptor in vnf_descriptor["vdu"]:
525 for interface_descriptor in vdu_descriptor["interface"]:
526 if interface_descriptor.get("external-connection-point-ref") == \
527 cp_params["vnfd-connection-point-ref"]:
528 match_cp = True
529 break
530 if match_cp:
531 break
532 else:
533 raise LcmException(
534 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
535 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
536 cp_params["member-vnf-index-ref"],
537 cp_params["vnfd-connection-point-ref"],
538 vnf_descriptor["id"]))
539 if cp_params.get("ip-address"):
540 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
541 vdu_descriptor["id"], "interfaces",
542 interface_descriptor["name"], "ip_address"),
543 cp_params["ip-address"])
544 if cp_params.get("mac-address"):
545 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
546 vdu_descriptor["id"], "interfaces",
547 interface_descriptor["name"], "mac_address"),
548 cp_params["mac-address"])
549 return RO_ns_params
550
551 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
552 # make a copy to do not change
553 vdu_create = copy(vdu_create)
554 vdu_delete = copy(vdu_delete)
555
556 vdurs = db_vnfr.get("vdur")
557 if vdurs is None:
558 vdurs = []
559 vdu_index = len(vdurs)
560 while vdu_index:
561 vdu_index -= 1
562 vdur = vdurs[vdu_index]
563 if vdur.get("pdu-type"):
564 continue
565 vdu_id_ref = vdur["vdu-id-ref"]
566 if vdu_create and vdu_create.get(vdu_id_ref):
567 for index in range(0, vdu_create[vdu_id_ref]):
568 vdur = deepcopy(vdur)
569 vdur["_id"] = str(uuid4())
570 vdur["count-index"] += 1
571 vdurs.insert(vdu_index+1+index, vdur)
572 del vdu_create[vdu_id_ref]
573 if vdu_delete and vdu_delete.get(vdu_id_ref):
574 del vdurs[vdu_index]
575 vdu_delete[vdu_id_ref] -= 1
576 if not vdu_delete[vdu_id_ref]:
577 del vdu_delete[vdu_id_ref]
578 # check all operations are done
579 if vdu_create or vdu_delete:
580 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
581 vdu_create))
582 if vdu_delete:
583 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
584 vdu_delete))
585
586 vnfr_update = {"vdur": vdurs}
587 db_vnfr["vdur"] = vdurs
588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
589
590 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
591 """
592 Updates database nsr with the RO info for the created vld
593 :param ns_update_nsr: dictionary to be filled with the updated info
594 :param db_nsr: content of db_nsr. This is also modified
595 :param nsr_desc_RO: nsr descriptor from RO
596 :return: Nothing, LcmException is raised on errors
597 """
598
599 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
600 for net_RO in get_iterable(nsr_desc_RO, "nets"):
601 if vld["id"] != net_RO.get("ns_net_osm_id"):
602 continue
603 vld["vim-id"] = net_RO.get("vim_net_id")
604 vld["name"] = net_RO.get("vim_name")
605 vld["status"] = net_RO.get("status")
606 vld["status-detailed"] = net_RO.get("error_msg")
607 ns_update_nsr["vld.{}".format(vld_index)] = vld
608 break
609 else:
610 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
611
612 def set_vnfr_at_error(self, db_vnfrs, error_text):
613 try:
614 for db_vnfr in db_vnfrs.values():
615 vnfr_update = {"status": "ERROR"}
616 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
617 if "status" not in vdur:
618 vdur["status"] = "ERROR"
619 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
620 if error_text:
621 vdur["status-detailed"] = str(error_text)
622 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
623 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
624 except DbException as e:
625 self.logger.error("Cannot update vnf. {}".format(e))
626
627 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
628 """
629 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
630 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
631 :param nsr_desc_RO: nsr descriptor from RO
632 :return: Nothing, LcmException is raised on errors
633 """
634 for vnf_index, db_vnfr in db_vnfrs.items():
635 for vnf_RO in nsr_desc_RO["vnfs"]:
636 if vnf_RO["member_vnf_index"] != vnf_index:
637 continue
638 vnfr_update = {}
639 if vnf_RO.get("ip_address"):
640 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
641 elif not db_vnfr.get("ip-address"):
642 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
643 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
644
645 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
646 vdur_RO_count_index = 0
647 if vdur.get("pdu-type"):
648 continue
649 for vdur_RO in get_iterable(vnf_RO, "vms"):
650 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
651 continue
652 if vdur["count-index"] != vdur_RO_count_index:
653 vdur_RO_count_index += 1
654 continue
655 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
656 if vdur_RO.get("ip_address"):
657 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
658 else:
659 vdur["ip-address"] = None
660 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
661 vdur["name"] = vdur_RO.get("vim_name")
662 vdur["status"] = vdur_RO.get("status")
663 vdur["status-detailed"] = vdur_RO.get("error_msg")
664 for ifacer in get_iterable(vdur, "interfaces"):
665 for interface_RO in get_iterable(vdur_RO, "interfaces"):
666 if ifacer["name"] == interface_RO.get("internal_name"):
667 ifacer["ip-address"] = interface_RO.get("ip_address")
668 ifacer["mac-address"] = interface_RO.get("mac_address")
669 break
670 else:
671 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info"
673 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
678 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
679
680 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
681 for net_RO in get_iterable(nsr_desc_RO, "nets"):
682 if vld["id"] != net_RO.get("vnf_net_osm_id"):
683 continue
684 vld["vim-id"] = net_RO.get("vim_net_id")
685 vld["name"] = net_RO.get("vim_name")
686 vld["status"] = net_RO.get("status")
687 vld["status-detailed"] = net_RO.get("error_msg")
688 vnfr_update["vld.{}".format(vld_index)] = vld
689 break
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
692 vnf_index, vld["id"]))
693
694 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
695 break
696
697 else:
698 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
699
700 def _get_ns_config_info(self, nsr_id):
701 """
702 Generates a mapping between vnf,vdu elements and the N2VC id
703 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
704 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
705 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
706 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
707 """
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
710 mapping = {}
711 ns_config_info = {"osm-config-mapping": mapping}
712 for vca in vca_deployed_list:
713 if not vca["member-vnf-index"]:
714 continue
715 if not vca["vdu_id"]:
716 mapping[vca["member-vnf-index"]] = vca["application"]
717 else:
718 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
719 vca["application"]
720 return ns_config_info
721
722 @staticmethod
723 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
724 """
725 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
726 primitives as verify-ssh-credentials, or config when needed
727 :param desc_primitive_list: information of the descriptor
728 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
729 this element contains a ssh public key
730 :return: The modified list. Can ba an empty list, but always a list
731 """
732 if desc_primitive_list:
733 primitive_list = desc_primitive_list.copy()
734 else:
735 primitive_list = []
736 # look for primitive config, and get the position. None if not present
737 config_position = None
738 for index, primitive in enumerate(primitive_list):
739 if primitive["name"] == "config":
740 config_position = index
741 break
742
743 # for NS, add always a config primitive if not present (bug 874)
744 if not vca_deployed["member-vnf-index"] and config_position is None:
745 primitive_list.insert(0, {"name": "config", "parameter": []})
746 config_position = 0
747 # for VNF/VDU add verify-ssh-credentials after config
748 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
749 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
750 return primitive_list
751
752 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
753 n2vc_key_list, stage):
754 try:
755 db_nsr_update = {}
756 RO_descriptor_number = 0 # number of descriptors created at RO
757 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
758 nslcmop_id = db_nslcmop["_id"]
759 start_deploy = time()
760 ns_params = db_nslcmop.get("operationParams")
761 if ns_params and ns_params.get("timeout_ns_deploy"):
762 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
763 else:
764 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
765
766 # Check for and optionally request placement optimization. Database will be updated if placement activated
767 stage[2] = "Waiting for Placement."
768 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
769
770 # deploy RO
771
772 # get vnfds, instantiate at RO
773 for c_vnf in nsd.get("constituent-vnfd", ()):
774 member_vnf_index = c_vnf["member-vnf-index"]
775 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
776 vnfd_ref = vnfd["id"]
777
778 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
779 db_nsr_update["detailed-status"] = " ".join(stage)
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 self._write_op_status(nslcmop_id, stage)
782
783 # self.logger.debug(logging_text + stage[2])
784 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
785 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
786 RO_descriptor_number += 1
787
788 # look position at deployed.RO.vnfd if not present it will be appended at the end
789 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
790 if vnf_deployed["member-vnf-index"] == member_vnf_index:
791 break
792 else:
793 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
794 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
795
796 # look if present
797 RO_update = {"member-vnf-index": member_vnf_index}
798 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
799 if vnfd_list:
800 RO_update["id"] = vnfd_list[0]["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
802 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
803 else:
804 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
805 get("additionalParamsForVnf"), nsr_id)
806 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
807 RO_update["id"] = desc["uuid"]
808 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
809 vnfd_ref, member_vnf_index, desc["uuid"]))
810 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
811 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
812
813 # create nsd at RO
814 nsd_ref = nsd["id"]
815
816 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
817 db_nsr_update["detailed-status"] = " ".join(stage)
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819 self._write_op_status(nslcmop_id, stage)
820
821 # self.logger.debug(logging_text + stage[2])
822 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
823 RO_descriptor_number += 1
824 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
825 if nsd_list:
826 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
827 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
828 nsd_ref, RO_nsd_uuid))
829 else:
830 nsd_RO = deepcopy(nsd)
831 nsd_RO["id"] = RO_osm_nsd_id
832 nsd_RO.pop("_id", None)
833 nsd_RO.pop("_admin", None)
834 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
835 member_vnf_index = c_vnf["member-vnf-index"]
836 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
837 for c_vld in nsd_RO.get("vld", ()):
838 for cp in c_vld.get("vnfd-connection-point-ref", ()):
839 member_vnf_index = cp["member-vnf-index-ref"]
840 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
841
842 desc = await self.RO.create("nsd", descriptor=nsd_RO)
843 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
844 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
845 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 # Crate ns at RO
849 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853
854 # if present use it unless in error status
855 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
856 if RO_nsr_id:
857 try:
858 stage[2] = "Looking for existing ns at RO"
859 db_nsr_update["detailed-status"] = " ".join(stage)
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._write_op_status(nslcmop_id, stage)
862 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
863 desc = await self.RO.show("ns", RO_nsr_id)
864
865 except ROclient.ROClientException as e:
866 if e.http_code != HTTPStatus.NOT_FOUND:
867 raise
868 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
869 if RO_nsr_id:
870 ns_status, ns_status_info = self.RO.check_ns_status(desc)
871 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
872 if ns_status == "ERROR":
873 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
874 self.logger.debug(logging_text + stage[2])
875 await self.RO.delete("ns", RO_nsr_id)
876 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
877 if not RO_nsr_id:
878 stage[2] = "Checking dependencies"
879 db_nsr_update["detailed-status"] = " ".join(stage)
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 self._write_op_status(nslcmop_id, stage)
882 # self.logger.debug(logging_text + stage[2])
883
884 # check if VIM is creating and wait look if previous tasks in process
885 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
886 if task_dependency:
887 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
888 self.logger.debug(logging_text + stage[2])
889 await asyncio.wait(task_dependency, timeout=3600)
890 if ns_params.get("vnf"):
891 for vnf in ns_params["vnf"]:
892 if "vimAccountId" in vnf:
893 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
894 vnf["vimAccountId"])
895 if task_dependency:
896 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
897 self.logger.debug(logging_text + stage[2])
898 await asyncio.wait(task_dependency, timeout=3600)
899
900 stage[2] = "Checking instantiation parameters."
901 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
902 stage[2] = "Deploying ns at VIM."
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
908 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
911 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
912
913 # wait until NS is ready
914 stage[2] = "Waiting VIM to deploy ns."
915 db_nsr_update["detailed-status"] = " ".join(stage)
916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
917 self._write_op_status(nslcmop_id, stage)
918 detailed_status_old = None
919 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
920
921 old_desc = None
922 while time() <= start_deploy + timeout_ns_deploy:
923 desc = await self.RO.show("ns", RO_nsr_id)
924
925 # deploymentStatus
926 if desc != old_desc:
927 # desc has changed => update db
928 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
929 old_desc = desc
930
931 ns_status, ns_status_info = self.RO.check_ns_status(desc)
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
933 if ns_status == "ERROR":
934 raise ROclient.ROClientException(ns_status_info)
935 elif ns_status == "BUILD":
936 stage[2] = "VIM: ({})".format(ns_status_info)
937 elif ns_status == "ACTIVE":
938 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
939 try:
940 self.ns_update_vnfr(db_vnfrs, desc)
941 break
942 except LcmExceptionNoMgmtIP:
943 pass
944 else:
945 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
946 if stage[2] != detailed_status_old:
947 detailed_status_old = stage[2]
948 db_nsr_update["detailed-status"] = " ".join(stage)
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950 self._write_op_status(nslcmop_id, stage)
951 await asyncio.sleep(5, loop=self.loop)
952 else: # timeout_ns_deploy
953 raise ROclient.ROClientException("Timeout waiting ns to be ready")
954
955 # Updating NSR
956 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
957
958 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
959 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
960 stage[2] = "Deployed at VIM"
961 db_nsr_update["detailed-status"] = " ".join(stage)
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 self._write_op_status(nslcmop_id, stage)
964 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
965 # self.logger.debug(logging_text + "Deployed at VIM")
966 except (ROclient.ROClientException, LcmException, DbException) as e:
967 stage[2] = "ERROR deploying at VIM"
968 self.set_vnfr_at_error(db_vnfrs, str(e))
969 raise
970
971 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
972 """
973 Wait for ip addres at RO, and optionally, insert public key in virtual machine
974 :param logging_text: prefix use for logging
975 :param nsr_id:
976 :param vnfr_id:
977 :param vdu_id:
978 :param vdu_index:
979 :param pub_key: public ssh key to inject, None to skip
980 :param user: user to apply the public ssh key
981 :return: IP address
982 """
983
984 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
985 ro_nsr_id = None
986 ip_address = None
987 nb_tries = 0
988 target_vdu_id = None
989 ro_retries = 0
990
991 while True:
992
993 ro_retries += 1
994 if ro_retries >= 360: # 1 hour
995 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
996
997 await asyncio.sleep(10, loop=self.loop)
998
999 # get ip address
1000 if not target_vdu_id:
1001 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1002
1003 if not vdu_id: # for the VNF case
1004 if db_vnfr.get("status") == "ERROR":
1005 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1006 ip_address = db_vnfr.get("ip-address")
1007 if not ip_address:
1008 continue
1009 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1010 else: # VDU case
1011 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1012 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1013
1014 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1015 vdur = db_vnfr["vdur"][0]
1016 if not vdur:
1017 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1018 vdu_index))
1019
1020 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1021 ip_address = vdur.get("ip-address")
1022 if not ip_address:
1023 continue
1024 target_vdu_id = vdur["vdu-id-ref"]
1025 elif vdur.get("status") == "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1027
1028 if not target_vdu_id:
1029 continue
1030
1031 # inject public key into machine
1032 if pub_key and user:
1033 # wait until NS is deployed at RO
1034 if not ro_nsr_id:
1035 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1036 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1037 if not ro_nsr_id:
1038 continue
1039
1040 # self.logger.debug(logging_text + "Inserting RO key")
1041 if vdur.get("pdu-type"):
1042 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1043 return ip_address
1044 try:
1045 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1046 result_dict = await self.RO.create_action(
1047 item="ns",
1048 item_id_name=ro_nsr_id,
1049 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1050 )
1051 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1052 if not result_dict or not isinstance(result_dict, dict):
1053 raise LcmException("Unknown response from RO when injecting key")
1054 for result in result_dict.values():
1055 if result.get("vim_result") == 200:
1056 break
1057 else:
1058 raise ROclient.ROClientException("error injecting key: {}".format(
1059 result.get("description")))
1060 break
1061 except ROclient.ROClientException as e:
1062 if not nb_tries:
1063 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1064 format(e, 20*10))
1065 nb_tries += 1
1066 if nb_tries >= 20:
1067 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1068 else:
1069 break
1070
1071 return ip_address
1072
1073 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1074 """
1075 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1076 """
1077 my_vca = vca_deployed_list[vca_index]
1078 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1079 # vdu or kdu: no dependencies
1080 return
1081 timeout = 300
1082 while timeout >= 0:
1083 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1084 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1085 configuration_status_list = db_nsr["configurationStatus"]
1086 for index, vca_deployed in enumerate(configuration_status_list):
1087 if index == vca_index:
1088 # myself
1089 continue
1090 if not my_vca.get("member-vnf-index") or \
1091 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1092 internal_status = configuration_status_list[index].get("status")
1093 if internal_status == 'READY':
1094 continue
1095 elif internal_status == 'BROKEN':
1096 raise LcmException("Configuration aborted because dependent charm/s has failed")
1097 else:
1098 break
1099 else:
1100 # no dependencies, return
1101 return
1102 await asyncio.sleep(10)
1103 timeout -= 1
1104
1105 raise LcmException("Configuration aborted because dependent charm/s timeout")
1106
1107 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1108 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1109 nsr_id = db_nsr["_id"]
1110 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1113 db_dict = {
1114 'collection': 'nsrs',
1115 'filter': {'_id': nsr_id},
1116 'path': db_update_entry
1117 }
1118 step = ""
1119 try:
1120
1121 element_type = 'NS'
1122 element_under_configuration = nsr_id
1123
1124 vnfr_id = None
1125 if db_vnfr:
1126 vnfr_id = db_vnfr["_id"]
1127
1128 namespace = "{nsi}.{ns}".format(
1129 nsi=nsi_id if nsi_id else "",
1130 ns=nsr_id)
1131
1132 if vnfr_id:
1133 element_type = 'VNF'
1134 element_under_configuration = vnfr_id
1135 namespace += ".{}".format(vnfr_id)
1136 if vdu_id:
1137 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1138 element_type = 'VDU'
1139 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1140 elif kdu_name:
1141 namespace += ".{}".format(kdu_name)
1142 element_type = 'KDU'
1143 element_under_configuration = kdu_name
1144
1145 # Get artifact path
1146 self.fs.sync() # Sync from FSMongo
1147 artifact_path = "{}/{}/charms/{}".format(
1148 base_folder["folder"],
1149 base_folder["pkg-dir"],
1150 config_descriptor["juju"]["charm"]
1151 )
1152
1153 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1154 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1155 is_proxy_charm = False
1156
1157 # n2vc_redesign STEP 3.1
1158
1159 # find old ee_id if exists
1160 ee_id = vca_deployed.get("ee_id")
1161
1162 # create or register execution environment in VCA
1163 if is_proxy_charm:
1164
1165 self._write_configuration_status(
1166 nsr_id=nsr_id,
1167 vca_index=vca_index,
1168 status='CREATING',
1169 element_under_configuration=element_under_configuration,
1170 element_type=element_type
1171 )
1172
1173 step = "create execution environment"
1174 self.logger.debug(logging_text + step)
1175 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1176 reuse_ee_id=ee_id,
1177 db_dict=db_dict)
1178
1179 else:
1180 step = "Waiting to VM being up and getting IP address"
1181 self.logger.debug(logging_text + step)
1182 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1183 user=None, pub_key=None)
1184 credentials = {"hostname": rw_mgmt_ip}
1185 # get username
1186 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1187 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1188 # merged. Meanwhile let's get username from initial-config-primitive
1189 if not username and config_descriptor.get("initial-config-primitive"):
1190 for config_primitive in config_descriptor["initial-config-primitive"]:
1191 for param in config_primitive.get("parameter", ()):
1192 if param["name"] == "ssh-username":
1193 username = param["value"]
1194 break
1195 if not username:
1196 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1197 "'config-access.ssh-access.default-user'")
1198 credentials["username"] = username
1199 # n2vc_redesign STEP 3.2
1200
1201 self._write_configuration_status(
1202 nsr_id=nsr_id,
1203 vca_index=vca_index,
1204 status='REGISTERING',
1205 element_under_configuration=element_under_configuration,
1206 element_type=element_type
1207 )
1208
1209 step = "register execution environment {}".format(credentials)
1210 self.logger.debug(logging_text + step)
1211 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1212 db_dict=db_dict)
1213
1214 # for compatibility with MON/POL modules, the need model and application name at database
1215 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1216 ee_id_parts = ee_id.split('.')
1217 model_name = ee_id_parts[0]
1218 application_name = ee_id_parts[1]
1219 db_nsr_update = {db_update_entry + "model": model_name,
1220 db_update_entry + "application": application_name,
1221 db_update_entry + "ee_id": ee_id}
1222
1223 # n2vc_redesign STEP 3.3
1224
1225 step = "Install configuration Software"
1226
1227 self._write_configuration_status(
1228 nsr_id=nsr_id,
1229 vca_index=vca_index,
1230 status='INSTALLING SW',
1231 element_under_configuration=element_under_configuration,
1232 element_type=element_type,
1233 other_update=db_nsr_update
1234 )
1235
1236 # TODO check if already done
1237 self.logger.debug(logging_text + step)
1238 config = None
1239 if not is_proxy_charm:
1240 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1241 if initial_config_primitive_list:
1242 for primitive in initial_config_primitive_list:
1243 if primitive["name"] == "config":
1244 config = self._map_primitive_params(
1245 primitive,
1246 {},
1247 deploy_params
1248 )
1249 break
1250 await self.n2vc.install_configuration_sw(
1251 ee_id=ee_id,
1252 artifact_path=artifact_path,
1253 db_dict=db_dict,
1254 config=config
1255 )
1256
1257 # write in db flag of configuration_sw already installed
1258 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1259
1260 # add relations for this VCA (wait for other peers related with this VCA)
1261 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1262
1263 # if SSH access is required, then get execution environment SSH public
1264 if is_proxy_charm: # if native charm we have waited already to VM be UP
1265 pub_key = None
1266 user = None
1267 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1268 # Needed to inject a ssh key
1269 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1270 step = "Install configuration Software, getting public ssh key"
1271 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1272
1273 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1274 else:
1275 step = "Waiting to VM being up and getting IP address"
1276 self.logger.debug(logging_text + step)
1277
1278 # n2vc_redesign STEP 5.1
1279 # wait for RO (ip-address) Insert pub_key into VM
1280 if vnfr_id:
1281 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1282 user=user, pub_key=pub_key)
1283 else:
1284 rw_mgmt_ip = None # This is for a NS configuration
1285
1286 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1287
1288 # store rw_mgmt_ip in deploy params for later replacement
1289 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1290
1291 # n2vc_redesign STEP 6 Execute initial config primitive
1292 step = 'execute initial config primitive'
1293 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1294
1295 # sort initial config primitives by 'seq'
1296 if initial_config_primitive_list:
1297 try:
1298 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1299 except Exception as e:
1300 self.logger.error(logging_text + step + ": " + str(e))
1301 else:
1302 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1303
1304 # add config if not present for NS charm
1305 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1306 vca_deployed)
1307
1308 # wait for dependent primitives execution (NS -> VNF -> VDU)
1309 if initial_config_primitive_list:
1310 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1311
1312 # stage, in function of element type: vdu, kdu, vnf or ns
1313 my_vca = vca_deployed_list[vca_index]
1314 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1315 # VDU or KDU
1316 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1317 elif my_vca.get("member-vnf-index"):
1318 # VNF
1319 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1320 else:
1321 # NS
1322 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1323
1324 self._write_configuration_status(
1325 nsr_id=nsr_id,
1326 vca_index=vca_index,
1327 status='EXECUTING PRIMITIVE'
1328 )
1329
1330 self._write_op_status(
1331 op_id=nslcmop_id,
1332 stage=stage
1333 )
1334
1335 check_if_terminated_needed = True
1336 for initial_config_primitive in initial_config_primitive_list:
1337 # adding information on the vca_deployed if it is a NS execution environment
1338 if not vca_deployed["member-vnf-index"]:
1339 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1340 # TODO check if already done
1341 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1342
1343 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1344 self.logger.debug(logging_text + step)
1345 await self.n2vc.exec_primitive(
1346 ee_id=ee_id,
1347 primitive_name=initial_config_primitive["name"],
1348 params_dict=primitive_params_,
1349 db_dict=db_dict
1350 )
1351 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1352 if check_if_terminated_needed:
1353 if config_descriptor.get('terminate-config-primitive'):
1354 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1355 check_if_terminated_needed = False
1356
1357 # TODO register in database that primitive is done
1358
1359 step = "instantiated at VCA"
1360 self.logger.debug(logging_text + step)
1361
1362 self._write_configuration_status(
1363 nsr_id=nsr_id,
1364 vca_index=vca_index,
1365 status='READY'
1366 )
1367
1368 except Exception as e: # TODO not use Exception but N2VC exception
1369 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1370 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1371 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1372 self._write_configuration_status(
1373 nsr_id=nsr_id,
1374 vca_index=vca_index,
1375 status='BROKEN'
1376 )
1377 raise LcmException("{} {}".format(step, e)) from e
1378
1379 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1380 error_description: str = None, error_detail: str = None, other_update: dict = None):
1381 """
1382 Update db_nsr fields.
1383 :param nsr_id:
1384 :param ns_state:
1385 :param current_operation:
1386 :param current_operation_id:
1387 :param error_description:
1388 :param error_detail:
1389 :param other_update: Other required changes at database if provided, will be cleared
1390 :return:
1391 """
1392 try:
1393 db_dict = other_update or {}
1394 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1395 db_dict["_admin.current-operation"] = current_operation_id
1396 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1397 db_dict["currentOperation"] = current_operation
1398 db_dict["currentOperationID"] = current_operation_id
1399 db_dict["errorDescription"] = error_description
1400 db_dict["errorDetail"] = error_detail
1401
1402 if ns_state:
1403 db_dict["nsState"] = ns_state
1404 self.update_db_2("nsrs", nsr_id, db_dict)
1405 except DbException as e:
1406 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1407
1408 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1409 operation_state: str = None, other_update: dict = None):
1410 try:
1411 db_dict = other_update or {}
1412 db_dict['queuePosition'] = queuePosition
1413 if isinstance(stage, list):
1414 db_dict['stage'] = stage[0]
1415 db_dict['detailed-status'] = " ".join(stage)
1416 elif stage is not None:
1417 db_dict['stage'] = str(stage)
1418
1419 if error_message is not None:
1420 db_dict['errorMessage'] = error_message
1421 if operation_state is not None:
1422 db_dict['operationState'] = operation_state
1423 db_dict["statusEnteredTime"] = time()
1424 self.update_db_2("nslcmops", op_id, db_dict)
1425 except DbException as e:
1426 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1427
1428 def _write_all_config_status(self, db_nsr: dict, status: str):
1429 try:
1430 nsr_id = db_nsr["_id"]
1431 # configurationStatus
1432 config_status = db_nsr.get('configurationStatus')
1433 if config_status:
1434 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1435 enumerate(config_status) if v}
1436 # update status
1437 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1438
1439 except DbException as e:
1440 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1441
1442 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1443 element_under_configuration: str = None, element_type: str = None,
1444 other_update: dict = None):
1445
1446 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1447 # .format(vca_index, status))
1448
1449 try:
1450 db_path = 'configurationStatus.{}.'.format(vca_index)
1451 db_dict = other_update or {}
1452 if status:
1453 db_dict[db_path + 'status'] = status
1454 if element_under_configuration:
1455 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1456 if element_type:
1457 db_dict[db_path + 'elementType'] = element_type
1458 self.update_db_2("nsrs", nsr_id, db_dict)
1459 except DbException as e:
1460 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1461 .format(status, nsr_id, vca_index, e))
1462
1463 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1464 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1465 if placement_engine == "PLA":
1466 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1467 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1468 db_poll_interval = 5
1469 wait = db_poll_interval * 4
1470 pla_result = None
1471 while not pla_result and wait >= 0:
1472 await asyncio.sleep(db_poll_interval)
1473 wait -= db_poll_interval
1474 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1475 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1476
1477 if not pla_result:
1478 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1479
1480 for pla_vnf in pla_result['vnf']:
1481 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1482 if not pla_vnf.get('vimAccountId') or not vnfr:
1483 continue
1484 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1485 return
1486
1487 def update_nsrs_with_pla_result(self, params):
1488 try:
1489 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1490 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1491 except Exception as e:
1492 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1493
1494 async def instantiate(self, nsr_id, nslcmop_id):
1495 """
1496
1497 :param nsr_id: ns instance to deploy
1498 :param nslcmop_id: operation to run
1499 :return:
1500 """
1501
1502 # Try to lock HA task here
1503 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1504 if not task_is_locked_by_me:
1505 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1506 return
1507
1508 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1509 self.logger.debug(logging_text + "Enter")
1510
1511 # get all needed from database
1512
1513 # database nsrs record
1514 db_nsr = None
1515
1516 # database nslcmops record
1517 db_nslcmop = None
1518
1519 # update operation on nsrs
1520 db_nsr_update = {}
1521 # update operation on nslcmops
1522 db_nslcmop_update = {}
1523
1524 nslcmop_operation_state = None
1525 db_vnfrs = {} # vnf's info indexed by member-index
1526 # n2vc_info = {}
1527 tasks_dict_info = {} # from task to info text
1528 exc = None
1529 error_list = []
1530 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1531 # ^ stage, step, VIM progress
1532 try:
1533 # wait for any previous tasks in process
1534 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1535
1536 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1537 stage[1] = "Reading from database,"
1538 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1539 db_nsr_update["detailed-status"] = "creating"
1540 db_nsr_update["operational-status"] = "init"
1541 self._write_ns_status(
1542 nsr_id=nsr_id,
1543 ns_state="BUILDING",
1544 current_operation="INSTANTIATING",
1545 current_operation_id=nslcmop_id,
1546 other_update=db_nsr_update
1547 )
1548 self._write_op_status(
1549 op_id=nslcmop_id,
1550 stage=stage,
1551 queuePosition=0
1552 )
1553
1554 # read from db: operation
1555 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1556 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1557 ns_params = db_nslcmop.get("operationParams")
1558 if ns_params and ns_params.get("timeout_ns_deploy"):
1559 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1560 else:
1561 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1562
1563 # read from db: ns
1564 stage[1] = "Getting nsr={} from db".format(nsr_id)
1565 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1566 # nsd is replicated into ns (no db read)
1567 nsd = db_nsr["nsd"]
1568 # nsr_name = db_nsr["name"] # TODO short-name??
1569
1570 # read from db: vnf's of this ns
1571 stage[1] = "Getting vnfrs from db"
1572 self.logger.debug(logging_text + stage[1])
1573 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1574
1575 # read from db: vnfd's for every vnf
1576 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1577 db_vnfds = {} # every vnfd data indexed by vnf id
1578 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1579
1580 # for each vnf in ns, read vnfd
1581 for vnfr in db_vnfrs_list:
1582 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1583 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1584 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1585 # if we haven't this vnfd, read it from db
1586 if vnfd_id not in db_vnfds:
1587 # read from db
1588 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1589 self.logger.debug(logging_text + stage[1])
1590 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1591
1592 # store vnfd
1593 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1594 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1595 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1596
1597 # Get or generates the _admin.deployed.VCA list
1598 vca_deployed_list = None
1599 if db_nsr["_admin"].get("deployed"):
1600 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1601 if vca_deployed_list is None:
1602 vca_deployed_list = []
1603 configuration_status_list = []
1604 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1605 db_nsr_update["configurationStatus"] = configuration_status_list
1606 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1607 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1608 elif isinstance(vca_deployed_list, dict):
1609 # maintain backward compatibility. Change a dict to list at database
1610 vca_deployed_list = list(vca_deployed_list.values())
1611 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1612 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1613
1614 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1615 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1616 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1617
1618 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1619 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1620 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1621
1622 # n2vc_redesign STEP 2 Deploy Network Scenario
1623 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1624 self._write_op_status(
1625 op_id=nslcmop_id,
1626 stage=stage
1627 )
1628
1629 stage[1] = "Deploying KDUs,"
1630 # self.logger.debug(logging_text + "Before deploy_kdus")
1631 # Call to deploy_kdus in case exists the "vdu:kdu" param
1632 await self.deploy_kdus(
1633 logging_text=logging_text,
1634 nsr_id=nsr_id,
1635 nslcmop_id=nslcmop_id,
1636 db_vnfrs=db_vnfrs,
1637 db_vnfds=db_vnfds,
1638 task_instantiation_info=tasks_dict_info,
1639 )
1640
1641 stage[1] = "Getting VCA public key."
1642 # n2vc_redesign STEP 1 Get VCA public ssh-key
1643 # feature 1429. Add n2vc public key to needed VMs
1644 n2vc_key = self.n2vc.get_public_key()
1645 n2vc_key_list = [n2vc_key]
1646 if self.vca_config.get("public_key"):
1647 n2vc_key_list.append(self.vca_config["public_key"])
1648
1649 stage[1] = "Deploying NS at VIM."
1650 task_ro = asyncio.ensure_future(
1651 self.instantiate_RO(
1652 logging_text=logging_text,
1653 nsr_id=nsr_id,
1654 nsd=nsd,
1655 db_nsr=db_nsr,
1656 db_nslcmop=db_nslcmop,
1657 db_vnfrs=db_vnfrs,
1658 db_vnfds_ref=db_vnfds_ref,
1659 n2vc_key_list=n2vc_key_list,
1660 stage=stage
1661 )
1662 )
1663 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1664 tasks_dict_info[task_ro] = "Deploying at VIM"
1665
1666 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1667 stage[1] = "Deploying Execution Environments."
1668 self.logger.debug(logging_text + stage[1])
1669
1670 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1671 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1672 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1673 vnfd_id = c_vnf["vnfd-id-ref"]
1674 vnfd = db_vnfds_ref[vnfd_id]
1675 member_vnf_index = str(c_vnf["member-vnf-index"])
1676 db_vnfr = db_vnfrs[member_vnf_index]
1677 base_folder = vnfd["_admin"]["storage"]
1678 vdu_id = None
1679 vdu_index = 0
1680 vdu_name = None
1681 kdu_name = None
1682
1683 # Get additional parameters
1684 deploy_params = {}
1685 if db_vnfr.get("additionalParamsForVnf"):
1686 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1687
1688 descriptor_config = vnfd.get("vnf-configuration")
1689 if descriptor_config and descriptor_config.get("juju"):
1690 self._deploy_n2vc(
1691 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1692 db_nsr=db_nsr,
1693 db_vnfr=db_vnfr,
1694 nslcmop_id=nslcmop_id,
1695 nsr_id=nsr_id,
1696 nsi_id=nsi_id,
1697 vnfd_id=vnfd_id,
1698 vdu_id=vdu_id,
1699 kdu_name=kdu_name,
1700 member_vnf_index=member_vnf_index,
1701 vdu_index=vdu_index,
1702 vdu_name=vdu_name,
1703 deploy_params=deploy_params,
1704 descriptor_config=descriptor_config,
1705 base_folder=base_folder,
1706 task_instantiation_info=tasks_dict_info,
1707 stage=stage
1708 )
1709
1710 # Deploy charms for each VDU that supports one.
1711 for vdud in get_iterable(vnfd, 'vdu'):
1712 vdu_id = vdud["id"]
1713 descriptor_config = vdud.get('vdu-configuration')
1714 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1715 if vdur.get("additionalParams"):
1716 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1717 else:
1718 deploy_params_vdu = deploy_params
1719 if descriptor_config and descriptor_config.get("juju"):
1720 # look for vdu index in the db_vnfr["vdu"] section
1721 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1722 # if vdur["vdu-id-ref"] == vdu_id:
1723 # break
1724 # else:
1725 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1726 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1727 # vdu_name = vdur.get("name")
1728 vdu_name = None
1729 kdu_name = None
1730 for vdu_index in range(int(vdud.get("count", 1))):
1731 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1732 self._deploy_n2vc(
1733 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1734 member_vnf_index, vdu_id, vdu_index),
1735 db_nsr=db_nsr,
1736 db_vnfr=db_vnfr,
1737 nslcmop_id=nslcmop_id,
1738 nsr_id=nsr_id,
1739 nsi_id=nsi_id,
1740 vnfd_id=vnfd_id,
1741 vdu_id=vdu_id,
1742 kdu_name=kdu_name,
1743 member_vnf_index=member_vnf_index,
1744 vdu_index=vdu_index,
1745 vdu_name=vdu_name,
1746 deploy_params=deploy_params_vdu,
1747 descriptor_config=descriptor_config,
1748 base_folder=base_folder,
1749 task_instantiation_info=tasks_dict_info,
1750 stage=stage
1751 )
1752 for kdud in get_iterable(vnfd, 'kdu'):
1753 kdu_name = kdud["name"]
1754 descriptor_config = kdud.get('kdu-configuration')
1755 if descriptor_config and descriptor_config.get("juju"):
1756 vdu_id = None
1757 vdu_index = 0
1758 vdu_name = None
1759 # look for vdu index in the db_vnfr["vdu"] section
1760 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1761 # if vdur["vdu-id-ref"] == vdu_id:
1762 # break
1763 # else:
1764 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1765 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1766 # vdu_name = vdur.get("name")
1767 # vdu_name = None
1768
1769 self._deploy_n2vc(
1770 logging_text=logging_text,
1771 db_nsr=db_nsr,
1772 db_vnfr=db_vnfr,
1773 nslcmop_id=nslcmop_id,
1774 nsr_id=nsr_id,
1775 nsi_id=nsi_id,
1776 vnfd_id=vnfd_id,
1777 vdu_id=vdu_id,
1778 kdu_name=kdu_name,
1779 member_vnf_index=member_vnf_index,
1780 vdu_index=vdu_index,
1781 vdu_name=vdu_name,
1782 deploy_params=deploy_params,
1783 descriptor_config=descriptor_config,
1784 base_folder=base_folder,
1785 task_instantiation_info=tasks_dict_info,
1786 stage=stage
1787 )
1788
1789 # Check if this NS has a charm configuration
1790 descriptor_config = nsd.get("ns-configuration")
1791 if descriptor_config and descriptor_config.get("juju"):
1792 vnfd_id = None
1793 db_vnfr = None
1794 member_vnf_index = None
1795 vdu_id = None
1796 kdu_name = None
1797 vdu_index = 0
1798 vdu_name = None
1799
1800 # Get additional parameters
1801 deploy_params = {}
1802 if db_nsr.get("additionalParamsForNs"):
1803 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1804 base_folder = nsd["_admin"]["storage"]
1805 self._deploy_n2vc(
1806 logging_text=logging_text,
1807 db_nsr=db_nsr,
1808 db_vnfr=db_vnfr,
1809 nslcmop_id=nslcmop_id,
1810 nsr_id=nsr_id,
1811 nsi_id=nsi_id,
1812 vnfd_id=vnfd_id,
1813 vdu_id=vdu_id,
1814 kdu_name=kdu_name,
1815 member_vnf_index=member_vnf_index,
1816 vdu_index=vdu_index,
1817 vdu_name=vdu_name,
1818 deploy_params=deploy_params,
1819 descriptor_config=descriptor_config,
1820 base_folder=base_folder,
1821 task_instantiation_info=tasks_dict_info,
1822 stage=stage
1823 )
1824
1825 # rest of staff will be done at finally
1826
1827 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1828 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1829 exc = e
1830 except asyncio.CancelledError:
1831 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1832 exc = "Operation was cancelled"
1833 except Exception as e:
1834 exc = traceback.format_exc()
1835 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1836 finally:
1837 if exc:
1838 error_list.append(str(exc))
1839 try:
1840 # wait for pending tasks
1841 if tasks_dict_info:
1842 stage[1] = "Waiting for instantiate pending tasks."
1843 self.logger.debug(logging_text + stage[1])
1844 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1845 stage, nslcmop_id, nsr_id=nsr_id)
1846 stage[1] = stage[2] = ""
1847 except asyncio.CancelledError:
1848 error_list.append("Cancelled")
1849 # TODO cancel all tasks
1850 except Exception as exc:
1851 error_list.append(str(exc))
1852
1853 # update operation-status
1854 db_nsr_update["operational-status"] = "running"
1855 # let's begin with VCA 'configured' status (later we can change it)
1856 db_nsr_update["config-status"] = "configured"
1857 for task, task_name in tasks_dict_info.items():
1858 if not task.done() or task.cancelled() or task.exception():
1859 if task_name.startswith(self.task_name_deploy_vca):
1860 # A N2VC task is pending
1861 db_nsr_update["config-status"] = "failed"
1862 else:
1863 # RO or KDU task is pending
1864 db_nsr_update["operational-status"] = "failed"
1865
1866 # update status at database
1867 if error_list:
1868 error_detail = ". ".join(error_list)
1869 self.logger.error(logging_text + error_detail)
1870 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1871 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1872
1873 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1874 db_nslcmop_update["detailed-status"] = error_detail
1875 nslcmop_operation_state = "FAILED"
1876 ns_state = "BROKEN"
1877 else:
1878 error_detail = None
1879 error_description_nsr = error_description_nslcmop = None
1880 ns_state = "READY"
1881 db_nsr_update["detailed-status"] = "Done"
1882 db_nslcmop_update["detailed-status"] = "Done"
1883 nslcmop_operation_state = "COMPLETED"
1884
1885 if db_nsr:
1886 self._write_ns_status(
1887 nsr_id=nsr_id,
1888 ns_state=ns_state,
1889 current_operation="IDLE",
1890 current_operation_id=None,
1891 error_description=error_description_nsr,
1892 error_detail=error_detail,
1893 other_update=db_nsr_update
1894 )
1895 if db_nslcmop:
1896 self._write_op_status(
1897 op_id=nslcmop_id,
1898 stage="",
1899 error_message=error_description_nslcmop,
1900 operation_state=nslcmop_operation_state,
1901 other_update=db_nslcmop_update,
1902 )
1903
1904 if nslcmop_operation_state:
1905 try:
1906 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1907 "operationState": nslcmop_operation_state},
1908 loop=self.loop)
1909 except Exception as e:
1910 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1911
1912 self.logger.debug(logging_text + "Exit")
1913 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1914
1915 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1916
1917 # steps:
1918 # 1. find all relations for this VCA
1919 # 2. wait for other peers related
1920 # 3. add relations
1921
1922 try:
1923
1924 # STEP 1: find all relations for this VCA
1925
1926 # read nsr record
1927 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1928
1929 # this VCA data
1930 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1931
1932 # read all ns-configuration relations
1933 ns_relations = list()
1934 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1935 if db_ns_relations:
1936 for r in db_ns_relations:
1937 # check if this VCA is in the relation
1938 if my_vca.get('member-vnf-index') in\
1939 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1940 ns_relations.append(r)
1941
1942 # read all vnf-configuration relations
1943 vnf_relations = list()
1944 db_vnfd_list = db_nsr.get('vnfd-id')
1945 if db_vnfd_list:
1946 for vnfd in db_vnfd_list:
1947 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1948 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1949 if db_vnf_relations:
1950 for r in db_vnf_relations:
1951 # check if this VCA is in the relation
1952 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1953 vnf_relations.append(r)
1954
1955 # if no relations, terminate
1956 if not ns_relations and not vnf_relations:
1957 self.logger.debug(logging_text + ' No relations')
1958 return True
1959
1960 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1961
1962 # add all relations
1963 start = time()
1964 while True:
1965 # check timeout
1966 now = time()
1967 if now - start >= timeout:
1968 self.logger.error(logging_text + ' : timeout adding relations')
1969 return False
1970
1971 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1972 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1973
1974 # for each defined NS relation, find the VCA's related
1975 for r in ns_relations:
1976 from_vca_ee_id = None
1977 to_vca_ee_id = None
1978 from_vca_endpoint = None
1979 to_vca_endpoint = None
1980 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1981 for vca in vca_list:
1982 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1983 and vca.get('config_sw_installed'):
1984 from_vca_ee_id = vca.get('ee_id')
1985 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1986 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1987 and vca.get('config_sw_installed'):
1988 to_vca_ee_id = vca.get('ee_id')
1989 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1990 if from_vca_ee_id and to_vca_ee_id:
1991 # add relation
1992 await self.n2vc.add_relation(
1993 ee_id_1=from_vca_ee_id,
1994 ee_id_2=to_vca_ee_id,
1995 endpoint_1=from_vca_endpoint,
1996 endpoint_2=to_vca_endpoint)
1997 # remove entry from relations list
1998 ns_relations.remove(r)
1999 else:
2000 # check failed peers
2001 try:
2002 vca_status_list = db_nsr.get('configurationStatus')
2003 if vca_status_list:
2004 for i in range(len(vca_list)):
2005 vca = vca_list[i]
2006 vca_status = vca_status_list[i]
2007 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2008 if vca_status.get('status') == 'BROKEN':
2009 # peer broken: remove relation from list
2010 ns_relations.remove(r)
2011 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2012 if vca_status.get('status') == 'BROKEN':
2013 # peer broken: remove relation from list
2014 ns_relations.remove(r)
2015 except Exception:
2016 # ignore
2017 pass
2018
2019 # for each defined VNF relation, find the VCA's related
2020 for r in vnf_relations:
2021 from_vca_ee_id = None
2022 to_vca_ee_id = None
2023 from_vca_endpoint = None
2024 to_vca_endpoint = None
2025 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2026 for vca in vca_list:
2027 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2028 from_vca_ee_id = vca.get('ee_id')
2029 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2030 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2031 to_vca_ee_id = vca.get('ee_id')
2032 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2033 if from_vca_ee_id and to_vca_ee_id:
2034 # add relation
2035 await self.n2vc.add_relation(
2036 ee_id_1=from_vca_ee_id,
2037 ee_id_2=to_vca_ee_id,
2038 endpoint_1=from_vca_endpoint,
2039 endpoint_2=to_vca_endpoint)
2040 # remove entry from relations list
2041 vnf_relations.remove(r)
2042 else:
2043 # check failed peers
2044 try:
2045 vca_status_list = db_nsr.get('configurationStatus')
2046 if vca_status_list:
2047 for i in range(len(vca_list)):
2048 vca = vca_list[i]
2049 vca_status = vca_status_list[i]
2050 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2051 if vca_status.get('status') == 'BROKEN':
2052 # peer broken: remove relation from list
2053 ns_relations.remove(r)
2054 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2055 if vca_status.get('status') == 'BROKEN':
2056 # peer broken: remove relation from list
2057 ns_relations.remove(r)
2058 except Exception:
2059 # ignore
2060 pass
2061
2062 # wait for next try
2063 await asyncio.sleep(5.0)
2064
2065 if not ns_relations and not vnf_relations:
2066 self.logger.debug('Relations added')
2067 break
2068
2069 return True
2070
2071 except Exception as e:
2072 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2073 return False
2074
2075 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2076 # Launch kdus if present in the descriptor
2077
2078 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2079
2080 def _get_cluster_id(cluster_id, cluster_type):
2081 nonlocal k8scluster_id_2_uuic
2082 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2083 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2084
2085 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2086 if not db_k8scluster:
2087 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2088 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2089 if not k8s_id:
2090 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2091 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2092 return k8s_id
2093
2094 logging_text += "Deploy kdus: "
2095 step = ""
2096 try:
2097 db_nsr_update = {"_admin.deployed.K8s": []}
2098 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2099
2100 index = 0
2101 updated_cluster_list = []
2102
2103 for vnfr_data in db_vnfrs.values():
2104 for kdur in get_iterable(vnfr_data, "kdur"):
2105 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2106 vnfd_id = vnfr_data.get('vnfd-id')
2107 if kdur.get("helm-chart"):
2108 kdumodel = kdur["helm-chart"]
2109 k8sclustertype = "helm-chart"
2110 elif kdur.get("juju-bundle"):
2111 kdumodel = kdur["juju-bundle"]
2112 k8sclustertype = "juju-bundle"
2113 else:
2114 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2115 "juju-bundle. Maybe an old NBI version is running".
2116 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2117 # check if kdumodel is a file and exists
2118 try:
2119 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2120 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2121 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2122 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
2123 kdumodel)
2124 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2125 kdumodel = self.fs.path + filename
2126 except (asyncio.TimeoutError, asyncio.CancelledError):
2127 raise
2128 except Exception: # it is not a file
2129 pass
2130
2131 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2132 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2133 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2134
2135 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2136 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2137 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2138 if del_repo_list or added_repo_dict:
2139 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2140 updated = {'_admin.helm_charts_added.' +
2141 item: name for item, name in added_repo_dict.items()}
2142 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2143 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2144 added_repo_dict))
2145 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2146 updated_cluster_list.append(cluster_uuid)
2147
2148 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2149 kdur["kdu-name"], k8s_cluster_id)
2150
2151 k8s_instace_info = {"kdu-instance": None,
2152 "k8scluster-uuid": cluster_uuid,
2153 "k8scluster-type": k8sclustertype,
2154 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2155 "kdu-name": kdur["kdu-name"],
2156 "kdu-model": kdumodel}
2157 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2159
2160 db_dict = {"collection": "nsrs",
2161 "filter": {"_id": nsr_id},
2162 "path": "_admin.deployed.K8s.{}".format(index)}
2163
2164 task = asyncio.ensure_future(
2165 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2166 atomic=True, params=desc_params,
2167 db_dict=db_dict, timeout=600,
2168 kdu_name=kdur["kdu-name"]))
2169
2170 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2171 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2172
2173 index += 1
2174
2175 except (LcmException, asyncio.CancelledError):
2176 raise
2177 except Exception as e:
2178 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2179 if isinstance(e, (N2VCException, DbException)):
2180 self.logger.error(logging_text + msg)
2181 else:
2182 self.logger.critical(logging_text + msg, exc_info=True)
2183 raise LcmException(msg)
2184 finally:
2185 if db_nsr_update:
2186 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2187
2188 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2189 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2190 base_folder, task_instantiation_info, stage):
2191 # launch instantiate_N2VC in a asyncio task and register task object
2192 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2193 # if not found, create one entry and update database
2194
2195 # fill db_nsr._admin.deployed.VCA.<index>
2196 vca_index = -1
2197 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2198 if not vca_deployed:
2199 continue
2200 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2201 vca_deployed.get("vdu_id") == vdu_id and \
2202 vca_deployed.get("kdu_name") == kdu_name and \
2203 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2204 break
2205 else:
2206 # not found, create one.
2207 vca_deployed = {
2208 "member-vnf-index": member_vnf_index,
2209 "vdu_id": vdu_id,
2210 "kdu_name": kdu_name,
2211 "vdu_count_index": vdu_index,
2212 "operational-status": "init", # TODO revise
2213 "detailed-status": "", # TODO revise
2214 "step": "initial-deploy", # TODO revise
2215 "vnfd_id": vnfd_id,
2216 "vdu_name": vdu_name,
2217 }
2218 vca_index += 1
2219
2220 # create VCA and configurationStatus in db
2221 db_dict = {
2222 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2223 "configurationStatus.{}".format(vca_index): dict()
2224 }
2225 self.update_db_2("nsrs", nsr_id, db_dict)
2226
2227 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2228
2229 # Launch task
2230 task_n2vc = asyncio.ensure_future(
2231 self.instantiate_N2VC(
2232 logging_text=logging_text,
2233 vca_index=vca_index,
2234 nsi_id=nsi_id,
2235 db_nsr=db_nsr,
2236 db_vnfr=db_vnfr,
2237 vdu_id=vdu_id,
2238 kdu_name=kdu_name,
2239 vdu_index=vdu_index,
2240 deploy_params=deploy_params,
2241 config_descriptor=descriptor_config,
2242 base_folder=base_folder,
2243 nslcmop_id=nslcmop_id,
2244 stage=stage
2245 )
2246 )
2247 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2248 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2249 member_vnf_index or "", vdu_id or "")
2250
2251 # Check if this VNFD has a configured terminate action
2252 def _has_terminate_config_primitive(self, vnfd):
2253 vnf_config = vnfd.get("vnf-configuration")
2254 if vnf_config and vnf_config.get("terminate-config-primitive"):
2255 return True
2256 else:
2257 return False
2258
2259 @staticmethod
2260 def _get_terminate_config_primitive_seq_list(vnfd):
2261 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2262 # No need to check for existing primitive twice, already done before
2263 vnf_config = vnfd.get("vnf-configuration")
2264 seq_list = vnf_config.get("terminate-config-primitive")
2265 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2266 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2267 return seq_list_sorted
2268
2269 @staticmethod
2270 def _create_nslcmop(nsr_id, operation, params):
2271 """
2272 Creates a ns-lcm-opp content to be stored at database.
2273 :param nsr_id: internal id of the instance
2274 :param operation: instantiate, terminate, scale, action, ...
2275 :param params: user parameters for the operation
2276 :return: dictionary following SOL005 format
2277 """
2278 # Raise exception if invalid arguments
2279 if not (nsr_id and operation and params):
2280 raise LcmException(
2281 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2282 now = time()
2283 _id = str(uuid4())
2284 nslcmop = {
2285 "id": _id,
2286 "_id": _id,
2287 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2288 "operationState": "PROCESSING",
2289 "statusEnteredTime": now,
2290 "nsInstanceId": nsr_id,
2291 "lcmOperationType": operation,
2292 "startTime": now,
2293 "isAutomaticInvocation": False,
2294 "operationParams": params,
2295 "isCancelPending": False,
2296 "links": {
2297 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2298 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2299 }
2300 }
2301 return nslcmop
2302
2303 def _format_additional_params(self, params):
2304 params = params or {}
2305 for key, value in params.items():
2306 if str(value).startswith("!!yaml "):
2307 params[key] = yaml.safe_load(value[7:])
2308 return params
2309
2310 def _get_terminate_primitive_params(self, seq, vnf_index):
2311 primitive = seq.get('name')
2312 primitive_params = {}
2313 params = {
2314 "member_vnf_index": vnf_index,
2315 "primitive": primitive,
2316 "primitive_params": primitive_params,
2317 }
2318 desc_params = {}
2319 return self._map_primitive_params(seq, params, desc_params)
2320
2321 # sub-operations
2322
2323 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2324 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2325 if op.get('operationState') == 'COMPLETED':
2326 # b. Skip sub-operation
2327 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2328 return self.SUBOPERATION_STATUS_SKIP
2329 else:
2330 # c. Reintent executing sub-operation
2331 # The sub-operation exists, and operationState != 'COMPLETED'
2332 # Update operationState = 'PROCESSING' to indicate a reintent.
2333 operationState = 'PROCESSING'
2334 detailed_status = 'In progress'
2335 self._update_suboperation_status(
2336 db_nslcmop, op_index, operationState, detailed_status)
2337 # Return the sub-operation index
2338 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2339 # with arguments extracted from the sub-operation
2340 return op_index
2341
2342 # Find a sub-operation where all keys in a matching dictionary must match
2343 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2344 def _find_suboperation(self, db_nslcmop, match):
2345 if (db_nslcmop and match):
2346 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2347 for i, op in enumerate(op_list):
2348 if all(op.get(k) == match[k] for k in match):
2349 return i
2350 return self.SUBOPERATION_STATUS_NOT_FOUND
2351
2352 # Update status for a sub-operation given its index
2353 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2354 # Update DB for HA tasks
2355 q_filter = {'_id': db_nslcmop['_id']}
2356 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2357 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2358 self.db.set_one("nslcmops",
2359 q_filter=q_filter,
2360 update_dict=update_dict,
2361 fail_on_empty=False)
2362
2363 # Add sub-operation, return the index of the added sub-operation
2364 # Optionally, set operationState, detailed-status, and operationType
2365 # Status and type are currently set for 'scale' sub-operations:
2366 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2367 # 'detailed-status' : status message
2368 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2369 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2370 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2371 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2372 RO_nsr_id=None, RO_scaling_info=None):
2373 if not db_nslcmop:
2374 return self.SUBOPERATION_STATUS_NOT_FOUND
2375 # Get the "_admin.operations" list, if it exists
2376 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2377 op_list = db_nslcmop_admin.get('operations')
2378 # Create or append to the "_admin.operations" list
2379 new_op = {'member_vnf_index': vnf_index,
2380 'vdu_id': vdu_id,
2381 'vdu_count_index': vdu_count_index,
2382 'primitive': primitive,
2383 'primitive_params': mapped_primitive_params}
2384 if operationState:
2385 new_op['operationState'] = operationState
2386 if detailed_status:
2387 new_op['detailed-status'] = detailed_status
2388 if operationType:
2389 new_op['lcmOperationType'] = operationType
2390 if RO_nsr_id:
2391 new_op['RO_nsr_id'] = RO_nsr_id
2392 if RO_scaling_info:
2393 new_op['RO_scaling_info'] = RO_scaling_info
2394 if not op_list:
2395 # No existing operations, create key 'operations' with current operation as first list element
2396 db_nslcmop_admin.update({'operations': [new_op]})
2397 op_list = db_nslcmop_admin.get('operations')
2398 else:
2399 # Existing operations, append operation to list
2400 op_list.append(new_op)
2401
2402 db_nslcmop_update = {'_admin.operations': op_list}
2403 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2404 op_index = len(op_list) - 1
2405 return op_index
2406
2407 # Helper methods for scale() sub-operations
2408
2409 # pre-scale/post-scale:
2410 # Check for 3 different cases:
2411 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2412 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2413 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2414 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2415 operationType, RO_nsr_id=None, RO_scaling_info=None):
2416 # Find this sub-operation
2417 if (RO_nsr_id and RO_scaling_info):
2418 operationType = 'SCALE-RO'
2419 match = {
2420 'member_vnf_index': vnf_index,
2421 'RO_nsr_id': RO_nsr_id,
2422 'RO_scaling_info': RO_scaling_info,
2423 }
2424 else:
2425 match = {
2426 'member_vnf_index': vnf_index,
2427 'primitive': vnf_config_primitive,
2428 'primitive_params': primitive_params,
2429 'lcmOperationType': operationType
2430 }
2431 op_index = self._find_suboperation(db_nslcmop, match)
2432 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2433 # a. New sub-operation
2434 # The sub-operation does not exist, add it.
2435 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2436 # The following parameters are set to None for all kind of scaling:
2437 vdu_id = None
2438 vdu_count_index = None
2439 vdu_name = None
2440 if RO_nsr_id and RO_scaling_info:
2441 vnf_config_primitive = None
2442 primitive_params = None
2443 else:
2444 RO_nsr_id = None
2445 RO_scaling_info = None
2446 # Initial status for sub-operation
2447 operationState = 'PROCESSING'
2448 detailed_status = 'In progress'
2449 # Add sub-operation for pre/post-scaling (zero or more operations)
2450 self._add_suboperation(db_nslcmop,
2451 vnf_index,
2452 vdu_id,
2453 vdu_count_index,
2454 vdu_name,
2455 vnf_config_primitive,
2456 primitive_params,
2457 operationState,
2458 detailed_status,
2459 operationType,
2460 RO_nsr_id,
2461 RO_scaling_info)
2462 return self.SUBOPERATION_STATUS_NEW
2463 else:
2464 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2465 # or op_index (operationState != 'COMPLETED')
2466 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2467
2468 # Function to return execution_environment id
2469
2470 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2471 # TODO vdu_index_count
2472 for vca in vca_deployed_list:
2473 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2474 return vca["ee_id"]
2475
2476 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2477 """
2478 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2479 :param logging_text:
2480 :param db_nslcmop:
2481 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2482 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2483 :param vca_index: index in the database _admin.deployed.VCA
2484 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2485 :return: None or exception
2486 """
2487 # execute terminate_primitives
2488 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2489 vdu_id = vca_deployed.get("vdu_id")
2490 vdu_count_index = vca_deployed.get("vdu_count_index")
2491 vdu_name = vca_deployed.get("vdu_name")
2492 vnf_index = vca_deployed.get("member-vnf-index")
2493 if terminate_primitives and vca_deployed.get("needed_terminate"):
2494 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2495 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2496 for seq in terminate_primitives:
2497 # For each sequence in list, get primitive and call _ns_execute_primitive()
2498 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2499 vnf_index, seq.get("name"))
2500 self.logger.debug(logging_text + step)
2501 # Create the primitive for each sequence, i.e. "primitive": "touch"
2502 primitive = seq.get('name')
2503 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2504 # The following 3 parameters are currently set to None for 'terminate':
2505 # vdu_id, vdu_count_index, vdu_name
2506
2507 # Add sub-operation
2508 self._add_suboperation(db_nslcmop,
2509 vnf_index,
2510 vdu_id,
2511 vdu_count_index,
2512 vdu_name,
2513 primitive,
2514 mapped_primitive_params)
2515 # Sub-operations: Call _ns_execute_primitive() instead of action()
2516 try:
2517 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2518 mapped_primitive_params)
2519 except LcmException:
2520 # this happens when VCA is not deployed. In this case it is not needed to terminate
2521 continue
2522 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2523 if result not in result_ok:
2524 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2525 "error {}".format(seq.get("name"), vnf_index, result_detail))
2526 # set that this VCA do not need terminated
2527 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2528 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2529
2530 if destroy_ee:
2531 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2532
2533 async def _delete_all_N2VC(self, db_nsr: dict):
2534 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2535 namespace = "." + db_nsr["_id"]
2536 try:
2537 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2538 except N2VCNotFound: # already deleted. Skip
2539 pass
2540 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2541
2542 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2543 """
2544 Terminates a deployment from RO
2545 :param logging_text:
2546 :param nsr_deployed: db_nsr._admin.deployed
2547 :param nsr_id:
2548 :param nslcmop_id:
2549 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2550 this method will update only the index 2, but it will write on database the concatenated content of the list
2551 :return:
2552 """
2553 db_nsr_update = {}
2554 failed_detail = []
2555 ro_nsr_id = ro_delete_action = None
2556 if nsr_deployed and nsr_deployed.get("RO"):
2557 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2558 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2559 try:
2560 if ro_nsr_id:
2561 stage[2] = "Deleting ns from VIM."
2562 db_nsr_update["detailed-status"] = " ".join(stage)
2563 self._write_op_status(nslcmop_id, stage)
2564 self.logger.debug(logging_text + stage[2])
2565 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2566 self._write_op_status(nslcmop_id, stage)
2567 desc = await self.RO.delete("ns", ro_nsr_id)
2568 ro_delete_action = desc["action_id"]
2569 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2570 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2571 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2572 if ro_delete_action:
2573 # wait until NS is deleted from VIM
2574 stage[2] = "Waiting ns deleted from VIM."
2575 detailed_status_old = None
2576 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2577 ro_delete_action))
2578 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2579 self._write_op_status(nslcmop_id, stage)
2580
2581 delete_timeout = 20 * 60 # 20 minutes
2582 while delete_timeout > 0:
2583 desc = await self.RO.show(
2584 "ns",
2585 item_id_name=ro_nsr_id,
2586 extra_item="action",
2587 extra_item_id=ro_delete_action)
2588
2589 # deploymentStatus
2590 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2591
2592 ns_status, ns_status_info = self.RO.check_action_status(desc)
2593 if ns_status == "ERROR":
2594 raise ROclient.ROClientException(ns_status_info)
2595 elif ns_status == "BUILD":
2596 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2597 elif ns_status == "ACTIVE":
2598 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2599 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2600 break
2601 else:
2602 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2603 if stage[2] != detailed_status_old:
2604 detailed_status_old = stage[2]
2605 db_nsr_update["detailed-status"] = " ".join(stage)
2606 self._write_op_status(nslcmop_id, stage)
2607 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2608 await asyncio.sleep(5, loop=self.loop)
2609 delete_timeout -= 5
2610 else: # delete_timeout <= 0:
2611 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2612
2613 except Exception as e:
2614 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2615 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2616 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2617 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2618 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2619 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2620 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2621 failed_detail.append("delete conflict: {}".format(e))
2622 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2623 else:
2624 failed_detail.append("delete error: {}".format(e))
2625 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2626
2627 # Delete nsd
2628 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2629 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2630 try:
2631 stage[2] = "Deleting nsd from RO."
2632 db_nsr_update["detailed-status"] = " ".join(stage)
2633 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2634 self._write_op_status(nslcmop_id, stage)
2635 await self.RO.delete("nsd", ro_nsd_id)
2636 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2637 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2638 except Exception as e:
2639 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2640 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2641 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2642 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2643 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2644 self.logger.debug(logging_text + failed_detail[-1])
2645 else:
2646 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2647 self.logger.error(logging_text + failed_detail[-1])
2648
2649 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2650 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2651 if not vnf_deployed or not vnf_deployed["id"]:
2652 continue
2653 try:
2654 ro_vnfd_id = vnf_deployed["id"]
2655 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2656 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2657 db_nsr_update["detailed-status"] = " ".join(stage)
2658 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2659 self._write_op_status(nslcmop_id, stage)
2660 await self.RO.delete("vnfd", ro_vnfd_id)
2661 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2662 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2663 except Exception as e:
2664 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2665 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2666 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2667 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2668 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2669 self.logger.debug(logging_text + failed_detail[-1])
2670 else:
2671 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2672 self.logger.error(logging_text + failed_detail[-1])
2673
2674 if failed_detail:
2675 stage[2] = "Error deleting from VIM"
2676 else:
2677 stage[2] = "Deleted from VIM"
2678 db_nsr_update["detailed-status"] = " ".join(stage)
2679 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2680 self._write_op_status(nslcmop_id, stage)
2681
2682 if failed_detail:
2683 raise LcmException("; ".join(failed_detail))
2684
2685 async def terminate(self, nsr_id, nslcmop_id):
2686 # Try to lock HA task here
2687 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2688 if not task_is_locked_by_me:
2689 return
2690
2691 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2692 self.logger.debug(logging_text + "Enter")
2693 timeout_ns_terminate = self.timeout_ns_terminate
2694 db_nsr = None
2695 db_nslcmop = None
2696 exc = None
2697 error_list = [] # annotates all failed error messages
2698 db_nslcmop_update = {}
2699 autoremove = False # autoremove after terminated
2700 tasks_dict_info = {}
2701 db_nsr_update = {}
2702 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2703 # ^ contains [stage, step, VIM-status]
2704 try:
2705 # wait for any previous tasks in process
2706 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2707
2708 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2709 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2710 operation_params = db_nslcmop.get("operationParams") or {}
2711 if operation_params.get("timeout_ns_terminate"):
2712 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2713 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2714 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2715
2716 db_nsr_update["operational-status"] = "terminating"
2717 db_nsr_update["config-status"] = "terminating"
2718 self._write_ns_status(
2719 nsr_id=nsr_id,
2720 ns_state="TERMINATING",
2721 current_operation="TERMINATING",
2722 current_operation_id=nslcmop_id,
2723 other_update=db_nsr_update
2724 )
2725 self._write_op_status(
2726 op_id=nslcmop_id,
2727 queuePosition=0,
2728 stage=stage
2729 )
2730 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2731 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2732 return
2733
2734 stage[1] = "Getting vnf descriptors from db."
2735 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2736 db_vnfds_from_id = {}
2737 db_vnfds_from_member_index = {}
2738 # Loop over VNFRs
2739 for vnfr in db_vnfrs_list:
2740 vnfd_id = vnfr["vnfd-id"]
2741 if vnfd_id not in db_vnfds_from_id:
2742 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2743 db_vnfds_from_id[vnfd_id] = vnfd
2744 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2745
2746 # Destroy individual execution environments when there are terminating primitives.
2747 # Rest of EE will be deleted at once
2748 if not operation_params.get("skip_terminate_primitives"):
2749 stage[0] = "Stage 2/3 execute terminating primitives."
2750 stage[1] = "Looking execution environment that needs terminate."
2751 self.logger.debug(logging_text + stage[1])
2752 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2753 config_descriptor = None
2754 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2755 continue
2756 if not vca.get("member-vnf-index"):
2757 # ns
2758 config_descriptor = db_nsr.get("ns-configuration")
2759 elif vca.get("vdu_id"):
2760 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2761 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2762 if vdud:
2763 config_descriptor = vdud.get("vdu-configuration")
2764 elif vca.get("kdu_name"):
2765 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2766 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2767 if kdud:
2768 config_descriptor = kdud.get("kdu-configuration")
2769 else:
2770 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2771 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2772 vca_index, False))
2773 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2774
2775 # wait for pending tasks of terminate primitives
2776 if tasks_dict_info:
2777 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2778 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2779 min(self.timeout_charm_delete, timeout_ns_terminate),
2780 stage, nslcmop_id)
2781 if error_list:
2782 return # raise LcmException("; ".join(error_list))
2783 tasks_dict_info.clear()
2784
2785 # remove All execution environments at once
2786 stage[0] = "Stage 3/3 delete all."
2787
2788 if nsr_deployed.get("VCA"):
2789 stage[1] = "Deleting all execution environments."
2790 self.logger.debug(logging_text + stage[1])
2791 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
2792 timeout=self.timeout_charm_delete))
2793 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2794 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2795
2796 # Delete from k8scluster
2797 stage[1] = "Deleting KDUs."
2798 self.logger.debug(logging_text + stage[1])
2799 # print(nsr_deployed)
2800 for kdu in get_iterable(nsr_deployed, "K8s"):
2801 if not kdu or not kdu.get("kdu-instance"):
2802 continue
2803 kdu_instance = kdu.get("kdu-instance")
2804 if kdu.get("k8scluster-type") in self.k8scluster_map:
2805 task_delete_kdu_instance = asyncio.ensure_future(
2806 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2807 cluster_uuid=kdu.get("k8scluster-uuid"),
2808 kdu_instance=kdu_instance))
2809 else:
2810 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2811 format(kdu.get("k8scluster-type")))
2812 continue
2813 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2814
2815 # remove from RO
2816 stage[1] = "Deleting ns from VIM."
2817 task_delete_ro = asyncio.ensure_future(
2818 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2819 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2820
2821 # rest of staff will be done at finally
2822
2823 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2824 self.logger.error(logging_text + "Exit Exception {}".format(e))
2825 exc = e
2826 except asyncio.CancelledError:
2827 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2828 exc = "Operation was cancelled"
2829 except Exception as e:
2830 exc = traceback.format_exc()
2831 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2832 finally:
2833 if exc:
2834 error_list.append(str(exc))
2835 try:
2836 # wait for pending tasks
2837 if tasks_dict_info:
2838 stage[1] = "Waiting for terminate pending tasks."
2839 self.logger.debug(logging_text + stage[1])
2840 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2841 stage, nslcmop_id)
2842 stage[1] = stage[2] = ""
2843 except asyncio.CancelledError:
2844 error_list.append("Cancelled")
2845 # TODO cancell all tasks
2846 except Exception as exc:
2847 error_list.append(str(exc))
2848 # update status at database
2849 if error_list:
2850 error_detail = "; ".join(error_list)
2851 # self.logger.error(logging_text + error_detail)
2852 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2853 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2854
2855 db_nsr_update["operational-status"] = "failed"
2856 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2857 db_nslcmop_update["detailed-status"] = error_detail
2858 nslcmop_operation_state = "FAILED"
2859 ns_state = "BROKEN"
2860 else:
2861 error_detail = None
2862 error_description_nsr = error_description_nslcmop = None
2863 ns_state = "NOT_INSTANTIATED"
2864 db_nsr_update["operational-status"] = "terminated"
2865 db_nsr_update["detailed-status"] = "Done"
2866 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2867 db_nslcmop_update["detailed-status"] = "Done"
2868 nslcmop_operation_state = "COMPLETED"
2869
2870 if db_nsr:
2871 self._write_ns_status(
2872 nsr_id=nsr_id,
2873 ns_state=ns_state,
2874 current_operation="IDLE",
2875 current_operation_id=None,
2876 error_description=error_description_nsr,
2877 error_detail=error_detail,
2878 other_update=db_nsr_update
2879 )
2880 if db_nslcmop:
2881 self._write_op_status(
2882 op_id=nslcmop_id,
2883 stage="",
2884 error_message=error_description_nslcmop,
2885 operation_state=nslcmop_operation_state,
2886 other_update=db_nslcmop_update,
2887 )
2888 autoremove = operation_params.get("autoremove", False)
2889 if nslcmop_operation_state:
2890 try:
2891 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2892 "operationState": nslcmop_operation_state,
2893 "autoremove": autoremove},
2894 loop=self.loop)
2895 except Exception as e:
2896 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2897
2898 self.logger.debug(logging_text + "Exit")
2899 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2900
2901 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2902 time_start = time()
2903 error_detail_list = []
2904 error_list = []
2905 pending_tasks = list(created_tasks_info.keys())
2906 num_tasks = len(pending_tasks)
2907 num_done = 0
2908 stage[1] = "{}/{}.".format(num_done, num_tasks)
2909 self._write_op_status(nslcmop_id, stage)
2910 while pending_tasks:
2911 new_error = None
2912 _timeout = timeout + time_start - time()
2913 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2914 return_when=asyncio.FIRST_COMPLETED)
2915 num_done += len(done)
2916 if not done: # Timeout
2917 for task in pending_tasks:
2918 new_error = created_tasks_info[task] + ": Timeout"
2919 error_detail_list.append(new_error)
2920 error_list.append(new_error)
2921 break
2922 for task in done:
2923 if task.cancelled():
2924 exc = "Cancelled"
2925 else:
2926 exc = task.exception()
2927 if exc:
2928 if isinstance(exc, asyncio.TimeoutError):
2929 exc = "Timeout"
2930 new_error = created_tasks_info[task] + ": {}".format(exc)
2931 error_list.append(created_tasks_info[task])
2932 error_detail_list.append(new_error)
2933 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
2934 self.logger.error(logging_text + new_error)
2935 else:
2936 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2937 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2938 else:
2939 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2940 stage[1] = "{}/{}.".format(num_done, num_tasks)
2941 if new_error:
2942 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2943 if nsr_id: # update also nsr
2944 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2945 "errorDetail": ". ".join(error_detail_list)})
2946 self._write_op_status(nslcmop_id, stage)
2947 return error_detail_list
2948
2949 @staticmethod
2950 def _map_primitive_params(primitive_desc, params, instantiation_params):
2951 """
2952 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2953 The default-value is used. If it is between < > it look for a value at instantiation_params
2954 :param primitive_desc: portion of VNFD/NSD that describes primitive
2955 :param params: Params provided by user
2956 :param instantiation_params: Instantiation params provided by user
2957 :return: a dictionary with the calculated params
2958 """
2959 calculated_params = {}
2960 for parameter in primitive_desc.get("parameter", ()):
2961 param_name = parameter["name"]
2962 if param_name in params:
2963 calculated_params[param_name] = params[param_name]
2964 elif "default-value" in parameter or "value" in parameter:
2965 if "value" in parameter:
2966 calculated_params[param_name] = parameter["value"]
2967 else:
2968 calculated_params[param_name] = parameter["default-value"]
2969 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2970 and calculated_params[param_name].endswith(">"):
2971 if calculated_params[param_name][1:-1] in instantiation_params:
2972 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2973 else:
2974 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2975 format(calculated_params[param_name], primitive_desc["name"]))
2976 else:
2977 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2978 format(param_name, primitive_desc["name"]))
2979
2980 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2981 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2982 width=256)
2983 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2984 calculated_params[param_name] = calculated_params[param_name][7:]
2985
2986 # add always ns_config_info if primitive name is config
2987 if primitive_desc["name"] == "config":
2988 if "ns_config_info" in instantiation_params:
2989 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2990 return calculated_params
2991
2992 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
2993 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2994 for vca in deployed_vca:
2995 if not vca:
2996 continue
2997 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2998 continue
2999 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3000 continue
3001 if kdu_name and kdu_name != vca["kdu_name"]:
3002 continue
3003 break
3004 else:
3005 # vca_deployed not found
3006 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3007 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3008
3009 # get ee_id
3010 ee_id = vca.get("ee_id")
3011 if not ee_id:
3012 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3013 "execution environment"
3014 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3015 return ee_id
3016
3017 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3018 retries_interval=30, timeout=None) -> (str, str):
3019 try:
3020 if primitive == "config":
3021 primitive_params = {"params": primitive_params}
3022
3023 while retries >= 0:
3024 try:
3025 output = await asyncio.wait_for(
3026 self.n2vc.exec_primitive(
3027 ee_id=ee_id,
3028 primitive_name=primitive,
3029 params_dict=primitive_params,
3030 progress_timeout=self.timeout_progress_primitive,
3031 total_timeout=self.timeout_primitive),
3032 timeout=timeout or self.timeout_primitive)
3033 # execution was OK
3034 break
3035 except asyncio.CancelledError:
3036 raise
3037 except Exception as e: # asyncio.TimeoutError
3038 if isinstance(e, asyncio.TimeoutError):
3039 e = "Timeout"
3040 retries -= 1
3041 if retries >= 0:
3042 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3043 # wait and retry
3044 await asyncio.sleep(retries_interval, loop=self.loop)
3045 else:
3046 return 'FAILED', str(e)
3047
3048 return 'COMPLETED', output
3049
3050 except (LcmException, asyncio.CancelledError):
3051 raise
3052 except Exception as e:
3053 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3054
3055 async def action(self, nsr_id, nslcmop_id):
3056
3057 # Try to lock HA task here
3058 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3059 if not task_is_locked_by_me:
3060 return
3061
3062 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3063 self.logger.debug(logging_text + "Enter")
3064 # get all needed from database
3065 db_nsr = None
3066 db_nslcmop = None
3067 db_nsr_update = {}
3068 db_nslcmop_update = {}
3069 nslcmop_operation_state = None
3070 error_description_nslcmop = None
3071 exc = None
3072 try:
3073 # wait for any previous tasks in process
3074 step = "Waiting for previous operations to terminate"
3075 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3076
3077 self._write_ns_status(
3078 nsr_id=nsr_id,
3079 ns_state=None,
3080 current_operation="RUNNING ACTION",
3081 current_operation_id=nslcmop_id
3082 )
3083
3084 step = "Getting information from database"
3085 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3086 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3087
3088 nsr_deployed = db_nsr["_admin"].get("deployed")
3089 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3090 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3091 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3092 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3093 primitive = db_nslcmop["operationParams"]["primitive"]
3094 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3095 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3096
3097 if vnf_index:
3098 step = "Getting vnfr from database"
3099 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3100 step = "Getting vnfd from database"
3101 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3102 else:
3103 step = "Getting nsd from database"
3104 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3105
3106 # for backward compatibility
3107 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3108 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3109 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3110 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3111
3112 # look for primitive
3113 config_primitive_desc = None
3114 if vdu_id:
3115 for vdu in get_iterable(db_vnfd, "vdu"):
3116 if vdu_id == vdu["id"]:
3117 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3118 if config_primitive["name"] == primitive:
3119 config_primitive_desc = config_primitive
3120 break
3121 break
3122 elif kdu_name:
3123 for kdu in get_iterable(db_vnfd, "kdu"):
3124 if kdu_name == kdu["name"]:
3125 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3126 if config_primitive["name"] == primitive:
3127 config_primitive_desc = config_primitive
3128 break
3129 break
3130 elif vnf_index:
3131 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3132 if config_primitive["name"] == primitive:
3133 config_primitive_desc = config_primitive
3134 break
3135 else:
3136 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3137 if config_primitive["name"] == primitive:
3138 config_primitive_desc = config_primitive
3139 break
3140
3141 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3142 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3143 format(primitive))
3144
3145 if vnf_index:
3146 if vdu_id:
3147 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3148 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3149 elif kdu_name:
3150 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3151 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3152 else:
3153 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3154 else:
3155 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3156
3157 # TODO check if ns is in a proper status
3158 if kdu_name and primitive in ("upgrade", "rollback", "status"):
3159 # kdur and desc_params already set from before
3160 if primitive_params:
3161 desc_params.update(primitive_params)
3162 # TODO Check if we will need something at vnf level
3163 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3164 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3165 break
3166 else:
3167 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3168
3169 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3170 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3171 raise LcmException(msg)
3172
3173 db_dict = {"collection": "nsrs",
3174 "filter": {"_id": nsr_id},
3175 "path": "_admin.deployed.K8s.{}".format(index)}
3176 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3177 step = "Executing kdu {}".format(primitive)
3178 if primitive == "upgrade":
3179 if desc_params.get("kdu_model"):
3180 kdu_model = desc_params.get("kdu_model")
3181 del desc_params["kdu_model"]
3182 else:
3183 kdu_model = kdu.get("kdu-model")
3184 parts = kdu_model.split(sep=":")
3185 if len(parts) == 2:
3186 kdu_model = parts[0]
3187
3188 detailed_status = await asyncio.wait_for(
3189 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3190 cluster_uuid=kdu.get("k8scluster-uuid"),
3191 kdu_instance=kdu.get("kdu-instance"),
3192 atomic=True, kdu_model=kdu_model,
3193 params=desc_params, db_dict=db_dict,
3194 timeout=timeout_ns_action),
3195 timeout=timeout_ns_action + 10)
3196 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3197 elif primitive == "rollback":
3198 detailed_status = await asyncio.wait_for(
3199 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3200 cluster_uuid=kdu.get("k8scluster-uuid"),
3201 kdu_instance=kdu.get("kdu-instance"),
3202 db_dict=db_dict),
3203 timeout=timeout_ns_action)
3204 elif primitive == "status":
3205 detailed_status = await asyncio.wait_for(
3206 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3207 cluster_uuid=kdu.get("k8scluster-uuid"),
3208 kdu_instance=kdu.get("kdu-instance")),
3209 timeout=timeout_ns_action)
3210
3211 if detailed_status:
3212 nslcmop_operation_state = 'COMPLETED'
3213 else:
3214 detailed_status = ''
3215 nslcmop_operation_state = 'FAILED'
3216
3217 else:
3218 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3219 self._look_for_deployed_vca(nsr_deployed["VCA"],
3220 member_vnf_index=vnf_index,
3221 vdu_id=vdu_id,
3222 vdu_count_index=vdu_count_index),
3223 primitive=primitive,
3224 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3225 timeout=timeout_ns_action)
3226
3227 db_nslcmop_update["detailed-status"] = detailed_status
3228 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3229 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3230 detailed_status))
3231 return # database update is called inside finally
3232
3233 except (DbException, LcmException, N2VCException, K8sException) as e:
3234 self.logger.error(logging_text + "Exit Exception {}".format(e))
3235 exc = e
3236 except asyncio.CancelledError:
3237 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3238 exc = "Operation was cancelled"
3239 except asyncio.TimeoutError:
3240 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3241 exc = "Timeout"
3242 except Exception as e:
3243 exc = traceback.format_exc()
3244 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3245 finally:
3246 if exc:
3247 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3248 "FAILED {}: {}".format(step, exc)
3249 nslcmop_operation_state = "FAILED"
3250 if db_nsr:
3251 self._write_ns_status(
3252 nsr_id=nsr_id,
3253 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3254 current_operation="IDLE",
3255 current_operation_id=None,
3256 # error_description=error_description_nsr,
3257 # error_detail=error_detail,
3258 other_update=db_nsr_update
3259 )
3260
3261 if db_nslcmop:
3262 self._write_op_status(
3263 op_id=nslcmop_id,
3264 stage="",
3265 error_message=error_description_nslcmop,
3266 operation_state=nslcmop_operation_state,
3267 other_update=db_nslcmop_update,
3268 )
3269
3270 if nslcmop_operation_state:
3271 try:
3272 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3273 "operationState": nslcmop_operation_state},
3274 loop=self.loop)
3275 except Exception as e:
3276 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3277 self.logger.debug(logging_text + "Exit")
3278 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3279 return nslcmop_operation_state, detailed_status
3280
3281 async def scale(self, nsr_id, nslcmop_id):
3282
3283 # Try to lock HA task here
3284 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3285 if not task_is_locked_by_me:
3286 return
3287
3288 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3289 self.logger.debug(logging_text + "Enter")
3290 # get all needed from database
3291 db_nsr = None
3292 db_nslcmop = None
3293 db_nslcmop_update = {}
3294 nslcmop_operation_state = None
3295 db_nsr_update = {}
3296 exc = None
3297 # in case of error, indicates what part of scale was failed to put nsr at error status
3298 scale_process = None
3299 old_operational_status = ""
3300 old_config_status = ""
3301 vnfr_scaled = False
3302 try:
3303 # wait for any previous tasks in process
3304 step = "Waiting for previous operations to terminate"
3305 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3306
3307 self._write_ns_status(
3308 nsr_id=nsr_id,
3309 ns_state=None,
3310 current_operation="SCALING",
3311 current_operation_id=nslcmop_id
3312 )
3313
3314 step = "Getting nslcmop from database"
3315 self.logger.debug(step + " after having waited for previous tasks to be completed")
3316 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3317 step = "Getting nsr from database"
3318 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3319
3320 old_operational_status = db_nsr["operational-status"]
3321 old_config_status = db_nsr["config-status"]
3322 step = "Parsing scaling parameters"
3323 # self.logger.debug(step)
3324 db_nsr_update["operational-status"] = "scaling"
3325 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3326 nsr_deployed = db_nsr["_admin"].get("deployed")
3327
3328 #######
3329 nsr_deployed = db_nsr["_admin"].get("deployed")
3330 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3331 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3332 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3333 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3334 #######
3335
3336 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3337 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3338 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3339 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3340 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3341
3342 # for backward compatibility
3343 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3344 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3345 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3346 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3347
3348 step = "Getting vnfr from database"
3349 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3350 step = "Getting vnfd from database"
3351 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3352
3353 step = "Getting scaling-group-descriptor"
3354 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3355 if scaling_descriptor["name"] == scaling_group:
3356 break
3357 else:
3358 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3359 "at vnfd:scaling-group-descriptor".format(scaling_group))
3360
3361 # cooldown_time = 0
3362 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3363 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3364 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3365 # break
3366
3367 # TODO check if ns is in a proper status
3368 step = "Sending scale order to VIM"
3369 nb_scale_op = 0
3370 if not db_nsr["_admin"].get("scaling-group"):
3371 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3372 admin_scale_index = 0
3373 else:
3374 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3375 if admin_scale_info["name"] == scaling_group:
3376 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3377 break
3378 else: # not found, set index one plus last element and add new entry with the name
3379 admin_scale_index += 1
3380 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3381 RO_scaling_info = []
3382 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3383 if scaling_type == "SCALE_OUT":
3384 # count if max-instance-count is reached
3385 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3386 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3387 if nb_scale_op >= max_instance_count:
3388 raise LcmException("reached the limit of {} (max-instance-count) "
3389 "scaling-out operations for the "
3390 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3391
3392 nb_scale_op += 1
3393 vdu_scaling_info["scaling_direction"] = "OUT"
3394 vdu_scaling_info["vdu-create"] = {}
3395 for vdu_scale_info in scaling_descriptor["vdu"]:
3396 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3397 "type": "create", "count": vdu_scale_info.get("count", 1)})
3398 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3399
3400 elif scaling_type == "SCALE_IN":
3401 # count if min-instance-count is reached
3402 min_instance_count = 0
3403 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3404 min_instance_count = int(scaling_descriptor["min-instance-count"])
3405 if nb_scale_op <= min_instance_count:
3406 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3407 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3408 nb_scale_op -= 1
3409 vdu_scaling_info["scaling_direction"] = "IN"
3410 vdu_scaling_info["vdu-delete"] = {}
3411 for vdu_scale_info in scaling_descriptor["vdu"]:
3412 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3413 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3414 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3415
3416 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3417 vdu_create = vdu_scaling_info.get("vdu-create")
3418 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3419 if vdu_scaling_info["scaling_direction"] == "IN":
3420 for vdur in reversed(db_vnfr["vdur"]):
3421 if vdu_delete.get(vdur["vdu-id-ref"]):
3422 vdu_delete[vdur["vdu-id-ref"]] -= 1
3423 vdu_scaling_info["vdu"].append({
3424 "name": vdur["name"],
3425 "vdu_id": vdur["vdu-id-ref"],
3426 "interface": []
3427 })
3428 for interface in vdur["interfaces"]:
3429 vdu_scaling_info["vdu"][-1]["interface"].append({
3430 "name": interface["name"],
3431 "ip_address": interface["ip-address"],
3432 "mac_address": interface.get("mac-address"),
3433 })
3434 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3435
3436 # PRE-SCALE BEGIN
3437 step = "Executing pre-scale vnf-config-primitive"
3438 if scaling_descriptor.get("scaling-config-action"):
3439 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3440 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3441 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3442 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3443 step = db_nslcmop_update["detailed-status"] = \
3444 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3445
3446 # look for primitive
3447 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3448 if config_primitive["name"] == vnf_config_primitive:
3449 break
3450 else:
3451 raise LcmException(
3452 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3453 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3454 "primitive".format(scaling_group, config_primitive))
3455
3456 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3457 if db_vnfr.get("additionalParamsForVnf"):
3458 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3459
3460 scale_process = "VCA"
3461 db_nsr_update["config-status"] = "configuring pre-scaling"
3462 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3463
3464 # Pre-scale reintent check: Check if this sub-operation has been executed before
3465 op_index = self._check_or_add_scale_suboperation(
3466 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3467 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3468 # Skip sub-operation
3469 result = 'COMPLETED'
3470 result_detail = 'Done'
3471 self.logger.debug(logging_text +
3472 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3473 vnf_config_primitive, result, result_detail))
3474 else:
3475 if (op_index == self.SUBOPERATION_STATUS_NEW):
3476 # New sub-operation: Get index of this sub-operation
3477 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3478 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3479 format(vnf_config_primitive))
3480 else:
3481 # Reintent: Get registered params for this existing sub-operation
3482 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3483 vnf_index = op.get('member_vnf_index')
3484 vnf_config_primitive = op.get('primitive')
3485 primitive_params = op.get('primitive_params')
3486 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3487 format(vnf_config_primitive))
3488 # Execute the primitive, either with new (first-time) or registered (reintent) args
3489 result, result_detail = await self._ns_execute_primitive(
3490 self._look_for_deployed_vca(nsr_deployed["VCA"],
3491 member_vnf_index=vnf_index,
3492 vdu_id=None,
3493 vdu_count_index=None),
3494 vnf_config_primitive, primitive_params)
3495 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3496 vnf_config_primitive, result, result_detail))
3497 # Update operationState = COMPLETED | FAILED
3498 self._update_suboperation_status(
3499 db_nslcmop, op_index, result, result_detail)
3500
3501 if result == "FAILED":
3502 raise LcmException(result_detail)
3503 db_nsr_update["config-status"] = old_config_status
3504 scale_process = None
3505 # PRE-SCALE END
3506
3507 # SCALE RO - BEGIN
3508 # Should this block be skipped if 'RO_nsr_id' == None ?
3509 # if (RO_nsr_id and RO_scaling_info):
3510 if RO_scaling_info:
3511 scale_process = "RO"
3512 # Scale RO reintent check: Check if this sub-operation has been executed before
3513 op_index = self._check_or_add_scale_suboperation(
3514 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3515 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3516 # Skip sub-operation
3517 result = 'COMPLETED'
3518 result_detail = 'Done'
3519 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3520 result, result_detail))
3521 else:
3522 if (op_index == self.SUBOPERATION_STATUS_NEW):
3523 # New sub-operation: Get index of this sub-operation
3524 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3525 self.logger.debug(logging_text + "New sub-operation RO")
3526 else:
3527 # Reintent: Get registered params for this existing sub-operation
3528 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3529 RO_nsr_id = op.get('RO_nsr_id')
3530 RO_scaling_info = op.get('RO_scaling_info')
3531 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3532 vnf_config_primitive))
3533
3534 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3535 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3536 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3537 # wait until ready
3538 RO_nslcmop_id = RO_desc["instance_action_id"]
3539 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3540
3541 RO_task_done = False
3542 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3543 detailed_status_old = None
3544 self.logger.debug(logging_text + step)
3545
3546 deployment_timeout = 1 * 3600 # One hour
3547 while deployment_timeout > 0:
3548 if not RO_task_done:
3549 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3550 extra_item_id=RO_nslcmop_id)
3551
3552 # deploymentStatus
3553 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3554
3555 ns_status, ns_status_info = self.RO.check_action_status(desc)
3556 if ns_status == "ERROR":
3557 raise ROclient.ROClientException(ns_status_info)
3558 elif ns_status == "BUILD":
3559 detailed_status = step + "; {}".format(ns_status_info)
3560 elif ns_status == "ACTIVE":
3561 RO_task_done = True
3562 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3563 self.logger.debug(logging_text + step)
3564 else:
3565 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3566 else:
3567
3568 if ns_status == "ERROR":
3569 raise ROclient.ROClientException(ns_status_info)
3570 elif ns_status == "BUILD":
3571 detailed_status = step + "; {}".format(ns_status_info)
3572 elif ns_status == "ACTIVE":
3573 step = detailed_status = \
3574 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3575 if not vnfr_scaled:
3576 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3577 vnfr_scaled = True
3578 try:
3579 desc = await self.RO.show("ns", RO_nsr_id)
3580
3581 # deploymentStatus
3582 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3583
3584 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3585 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3586 break
3587 except LcmExceptionNoMgmtIP:
3588 pass
3589 else:
3590 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3591 if detailed_status != detailed_status_old:
3592 self._update_suboperation_status(
3593 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3594 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3595 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3596
3597 await asyncio.sleep(5, loop=self.loop)
3598 deployment_timeout -= 5
3599 if deployment_timeout <= 0:
3600 self._update_suboperation_status(
3601 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3602 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3603
3604 # update VDU_SCALING_INFO with the obtained ip_addresses
3605 if vdu_scaling_info["scaling_direction"] == "OUT":
3606 for vdur in reversed(db_vnfr["vdur"]):
3607 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3608 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3609 vdu_scaling_info["vdu"].append({
3610 "name": vdur["name"],
3611 "vdu_id": vdur["vdu-id-ref"],
3612 "interface": []
3613 })
3614 for interface in vdur["interfaces"]:
3615 vdu_scaling_info["vdu"][-1]["interface"].append({
3616 "name": interface["name"],
3617 "ip_address": interface["ip-address"],
3618 "mac_address": interface.get("mac-address"),
3619 })
3620 del vdu_scaling_info["vdu-create"]
3621
3622 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3623 # SCALE RO - END
3624
3625 scale_process = None
3626 if db_nsr_update:
3627 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3628
3629 # POST-SCALE BEGIN
3630 # execute primitive service POST-SCALING
3631 step = "Executing post-scale vnf-config-primitive"
3632 if scaling_descriptor.get("scaling-config-action"):
3633 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3634 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3635 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3636 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3637 step = db_nslcmop_update["detailed-status"] = \
3638 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3639
3640 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3641 if db_vnfr.get("additionalParamsForVnf"):
3642 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3643
3644 # look for primitive
3645 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3646 if config_primitive["name"] == vnf_config_primitive:
3647 break
3648 else:
3649 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3650 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3651 "match any vnf-configuration:config-primitive".format(scaling_group,
3652 config_primitive))
3653 scale_process = "VCA"
3654 db_nsr_update["config-status"] = "configuring post-scaling"
3655 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3656
3657 # Post-scale reintent check: Check if this sub-operation has been executed before
3658 op_index = self._check_or_add_scale_suboperation(
3659 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3660 if op_index == self.SUBOPERATION_STATUS_SKIP:
3661 # Skip sub-operation
3662 result = 'COMPLETED'
3663 result_detail = 'Done'
3664 self.logger.debug(logging_text +
3665 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3666 format(vnf_config_primitive, result, result_detail))
3667 else:
3668 if op_index == self.SUBOPERATION_STATUS_NEW:
3669 # New sub-operation: Get index of this sub-operation
3670 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3671 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3672 format(vnf_config_primitive))
3673 else:
3674 # Reintent: Get registered params for this existing sub-operation
3675 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3676 vnf_index = op.get('member_vnf_index')
3677 vnf_config_primitive = op.get('primitive')
3678 primitive_params = op.get('primitive_params')
3679 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3680 format(vnf_config_primitive))
3681 # Execute the primitive, either with new (first-time) or registered (reintent) args
3682 result, result_detail = await self._ns_execute_primitive(
3683 self._look_for_deployed_vca(nsr_deployed["VCA"],
3684 member_vnf_index=vnf_index,
3685 vdu_id=None,
3686 vdu_count_index=None),
3687 vnf_config_primitive, primitive_params)
3688 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3689 vnf_config_primitive, result, result_detail))
3690 # Update operationState = COMPLETED | FAILED
3691 self._update_suboperation_status(
3692 db_nslcmop, op_index, result, result_detail)
3693
3694 if result == "FAILED":
3695 raise LcmException(result_detail)
3696 db_nsr_update["config-status"] = old_config_status
3697 scale_process = None
3698 # POST-SCALE END
3699
3700 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3701 db_nslcmop_update["statusEnteredTime"] = time()
3702 db_nslcmop_update["detailed-status"] = "done"
3703 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3704 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3705 else old_operational_status
3706 db_nsr_update["config-status"] = old_config_status
3707 return
3708 except (ROclient.ROClientException, DbException, LcmException) as e:
3709 self.logger.error(logging_text + "Exit Exception {}".format(e))
3710 exc = e
3711 except asyncio.CancelledError:
3712 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3713 exc = "Operation was cancelled"
3714 except Exception as e:
3715 exc = traceback.format_exc()
3716 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3717 finally:
3718 self._write_ns_status(
3719 nsr_id=nsr_id,
3720 ns_state=None,
3721 current_operation="IDLE",
3722 current_operation_id=None
3723 )
3724 if exc:
3725 if db_nslcmop:
3726 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3727 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3728 db_nslcmop_update["statusEnteredTime"] = time()
3729 if db_nsr:
3730 db_nsr_update["operational-status"] = old_operational_status
3731 db_nsr_update["config-status"] = old_config_status
3732 db_nsr_update["detailed-status"] = ""
3733 if scale_process:
3734 if "VCA" in scale_process:
3735 db_nsr_update["config-status"] = "failed"
3736 if "RO" in scale_process:
3737 db_nsr_update["operational-status"] = "failed"
3738 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3739 exc)
3740 try:
3741 if db_nslcmop and db_nslcmop_update:
3742 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3743 if db_nsr:
3744 self._write_ns_status(
3745 nsr_id=nsr_id,
3746 ns_state=None,
3747 current_operation="IDLE",
3748 current_operation_id=None,
3749 other_update=db_nsr_update
3750 )
3751
3752 except DbException as e:
3753 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3754 if nslcmop_operation_state:
3755 try:
3756 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3757 "operationState": nslcmop_operation_state},
3758 loop=self.loop)
3759 # if cooldown_time:
3760 # await asyncio.sleep(cooldown_time, loop=self.loop)
3761 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3762 except Exception as e:
3763 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3764 self.logger.debug(logging_text + "Exit")
3765 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")