5d4f34d14ad0aa4ceb2f1da078942701aec16f27
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52 timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
53
54 SUBOPERATION_STATUS_NOT_FOUND = -1
55 SUBOPERATION_STATUS_NEW = -2
56 SUBOPERATION_STATUS_SKIP = -3
57 task_name_deploy_vca = "Deploying VCA"
58
59 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65 super().__init__(
66 db=db,
67 msg=msg,
68 fs=fs,
69 logger=logging.getLogger('lcm.ns')
70 )
71
72 self.loop = loop
73 self.lcm_tasks = lcm_tasks
74 self.timeout = config["timeout"]
75 self.ro_config = config["ro_config"]
76 self.vca_config = config["VCA"].copy()
77
78 # create N2VC connector
79 self.n2vc = N2VCJujuConnector(
80 db=self.db,
81 fs=self.fs,
82 log=self.logger,
83 loop=self.loop,
84 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
85 username=self.vca_config.get('user', None),
86 vca_config=self.vca_config,
87 on_update_db=self._on_update_n2vc_db
88 )
89
90 self.k8sclusterhelm = K8sHelmConnector(
91 kubectl_command=self.vca_config.get("kubectlpath"),
92 helm_command=self.vca_config.get("helmpath"),
93 fs=self.fs,
94 log=self.logger,
95 db=self.db,
96 on_update_db=None,
97 )
98
99 self.k8sclusterjuju = K8sJujuConnector(
100 kubectl_command=self.vca_config.get("kubectlpath"),
101 juju_command=self.vca_config.get("jujupath"),
102 fs=self.fs,
103 log=self.logger,
104 db=self.db,
105 on_update_db=None,
106 )
107
108 self.k8scluster_map = {
109 "helm-chart": self.k8sclusterhelm,
110 "chart": self.k8sclusterhelm,
111 "juju-bundle": self.k8sclusterjuju,
112 "juju": self.k8sclusterjuju,
113 }
114 # create RO client
115 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
116
117 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
118
119 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
120
121 try:
122 # TODO filter RO descriptor fields...
123
124 # write to database
125 db_dict = dict()
126 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
127 db_dict['deploymentStatus'] = ro_descriptor
128 self.update_db_2("nsrs", nsrs_id, db_dict)
129
130 except Exception as e:
131 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
132
133 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
134
135 # remove last dot from path (if exists)
136 if path.endswith('.'):
137 path = path[:-1]
138
139 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
140 # .format(table, filter, path, updated_data))
141
142 try:
143
144 nsr_id = filter.get('_id')
145
146 # read ns record from database
147 nsr = self.db.get_one(table='nsrs', q_filter=filter)
148 current_ns_status = nsr.get('nsState')
149
150 # get vca status for NS
151 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
152
153 # vcaStatus
154 db_dict = dict()
155 db_dict['vcaStatus'] = status_dict
156
157 # update configurationStatus for this VCA
158 try:
159 vca_index = int(path[path.rfind(".")+1:])
160
161 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
162 vca_status = vca_list[vca_index].get('status')
163
164 configuration_status_list = nsr.get('configurationStatus')
165 config_status = configuration_status_list[vca_index].get('status')
166
167 if config_status == 'BROKEN' and vca_status != 'failed':
168 db_dict['configurationStatus'][vca_index] = 'READY'
169 elif config_status != 'BROKEN' and vca_status == 'failed':
170 db_dict['configurationStatus'][vca_index] = 'BROKEN'
171 except Exception as e:
172 # not update configurationStatus
173 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
174
175 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
176 # if nsState = 'DEGRADED' check if all is OK
177 is_degraded = False
178 if current_ns_status in ('READY', 'DEGRADED'):
179 error_description = ''
180 # check machines
181 if status_dict.get('machines'):
182 for machine_id in status_dict.get('machines'):
183 machine = status_dict.get('machines').get(machine_id)
184 # check machine agent-status
185 if machine.get('agent-status'):
186 s = machine.get('agent-status').get('status')
187 if s != 'started':
188 is_degraded = True
189 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
190 # check machine instance status
191 if machine.get('instance-status'):
192 s = machine.get('instance-status').get('status')
193 if s != 'running':
194 is_degraded = True
195 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
196 # check applications
197 if status_dict.get('applications'):
198 for app_id in status_dict.get('applications'):
199 app = status_dict.get('applications').get(app_id)
200 # check application status
201 if app.get('status'):
202 s = app.get('status').get('status')
203 if s != 'active':
204 is_degraded = True
205 error_description += 'application {} status={} ; '.format(app_id, s)
206
207 if error_description:
208 db_dict['errorDescription'] = error_description
209 if current_ns_status == 'READY' and is_degraded:
210 db_dict['nsState'] = 'DEGRADED'
211 if current_ns_status == 'DEGRADED' and not is_degraded:
212 db_dict['nsState'] = 'READY'
213
214 # write to database
215 self.update_db_2("nsrs", nsr_id, db_dict)
216
217 except Exception as e:
218 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
219
220 return
221
222 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
223 """
224 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
225 :param vnfd: input vnfd
226 :param new_id: overrides vnf id if provided
227 :param additionalParams: Instantiation params for VNFs provided
228 :param nsrId: Id of the NSR
229 :return: copy of vnfd
230 """
231 try:
232 vnfd_RO = deepcopy(vnfd)
233 # remove unused by RO configuration, monitoring, scaling and internal keys
234 vnfd_RO.pop("_id", None)
235 vnfd_RO.pop("_admin", None)
236 vnfd_RO.pop("vnf-configuration", None)
237 vnfd_RO.pop("monitoring-param", None)
238 vnfd_RO.pop("scaling-group-descriptor", None)
239 vnfd_RO.pop("kdu", None)
240 vnfd_RO.pop("k8s-cluster", None)
241 if new_id:
242 vnfd_RO["id"] = new_id
243
244 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
245 for vdu in get_iterable(vnfd_RO, "vdu"):
246 cloud_init_file = None
247 if vdu.get("cloud-init-file"):
248 base_folder = vnfd["_admin"]["storage"]
249 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
250 vdu["cloud-init-file"])
251 with self.fs.file_open(cloud_init_file, "r") as ci_file:
252 cloud_init_content = ci_file.read()
253 vdu.pop("cloud-init-file", None)
254 elif vdu.get("cloud-init"):
255 cloud_init_content = vdu["cloud-init"]
256 else:
257 continue
258
259 env = Environment()
260 ast = env.parse(cloud_init_content)
261 mandatory_vars = meta.find_undeclared_variables(ast)
262 if mandatory_vars:
263 for var in mandatory_vars:
264 if not additionalParams or var not in additionalParams.keys():
265 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
266 "file, must be provided in the instantiation parameters inside the "
267 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
268 template = Template(cloud_init_content)
269 cloud_init_content = template.render(additionalParams or {})
270 vdu["cloud-init"] = cloud_init_content
271
272 return vnfd_RO
273 except FsException as e:
274 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
275 format(vnfd["id"], vdu["id"], cloud_init_file, e))
276 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd["id"], vdu["id"], e))
279
280 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
281 """
282 Creates a RO ns descriptor from OSM ns_instantiate params
283 :param ns_params: OSM instantiate params
284 :return: The RO ns descriptor
285 """
286 vim_2_RO = {}
287 wim_2_RO = {}
288 # TODO feature 1417: Check that no instantiation is set over PDU
289 # check if PDU forces a concrete vim-network-id and add it
290 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
291
292 def vim_account_2_RO(vim_account):
293 if vim_account in vim_2_RO:
294 return vim_2_RO[vim_account]
295
296 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
297 if db_vim["_admin"]["operationalState"] != "ENABLED":
298 raise LcmException("VIM={} is not available. operationalState={}".format(
299 vim_account, db_vim["_admin"]["operationalState"]))
300 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
301 vim_2_RO[vim_account] = RO_vim_id
302 return RO_vim_id
303
304 def wim_account_2_RO(wim_account):
305 if isinstance(wim_account, str):
306 if wim_account in wim_2_RO:
307 return wim_2_RO[wim_account]
308
309 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
310 if db_wim["_admin"]["operationalState"] != "ENABLED":
311 raise LcmException("WIM={} is not available. operationalState={}".format(
312 wim_account, db_wim["_admin"]["operationalState"]))
313 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
314 wim_2_RO[wim_account] = RO_wim_id
315 return RO_wim_id
316 else:
317 return wim_account
318
319 def ip_profile_2_RO(ip_profile):
320 RO_ip_profile = deepcopy((ip_profile))
321 if "dns-server" in RO_ip_profile:
322 if isinstance(RO_ip_profile["dns-server"], list):
323 RO_ip_profile["dns-address"] = []
324 for ds in RO_ip_profile.pop("dns-server"):
325 RO_ip_profile["dns-address"].append(ds['address'])
326 else:
327 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
328 if RO_ip_profile.get("ip-version") == "ipv4":
329 RO_ip_profile["ip-version"] = "IPv4"
330 if RO_ip_profile.get("ip-version") == "ipv6":
331 RO_ip_profile["ip-version"] = "IPv6"
332 if "dhcp-params" in RO_ip_profile:
333 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
334 return RO_ip_profile
335
336 if not ns_params:
337 return None
338 RO_ns_params = {
339 # "name": ns_params["nsName"],
340 # "description": ns_params.get("nsDescription"),
341 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
342 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
343 # "scenario": ns_params["nsdId"],
344 }
345
346 n2vc_key_list = n2vc_key_list or []
347 for vnfd_ref, vnfd in vnfd_dict.items():
348 vdu_needed_access = []
349 mgmt_cp = None
350 if vnfd.get("vnf-configuration"):
351 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required and vnfd.get("mgmt-interface"):
353 if vnfd["mgmt-interface"].get("vdu-id"):
354 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
355 elif vnfd["mgmt-interface"].get("cp"):
356 mgmt_cp = vnfd["mgmt-interface"]["cp"]
357
358 for vdu in vnfd.get("vdu", ()):
359 if vdu.get("vdu-configuration"):
360 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
361 if ssh_required:
362 vdu_needed_access.append(vdu["id"])
363 elif mgmt_cp:
364 for vdu_interface in vdu.get("interface"):
365 if vdu_interface.get("external-connection-point-ref") and \
366 vdu_interface["external-connection-point-ref"] == mgmt_cp:
367 vdu_needed_access.append(vdu["id"])
368 mgmt_cp = None
369 break
370
371 if vdu_needed_access:
372 for vnf_member in nsd.get("constituent-vnfd"):
373 if vnf_member["vnfd-id-ref"] != vnfd_ref:
374 continue
375 for vdu in vdu_needed_access:
376 populate_dict(RO_ns_params,
377 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
378 n2vc_key_list)
379
380 if ns_params.get("vduImage"):
381 RO_ns_params["vduImage"] = ns_params["vduImage"]
382
383 if ns_params.get("ssh_keys"):
384 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
385 for vnf_params in get_iterable(ns_params, "vnf"):
386 for constituent_vnfd in nsd["constituent-vnfd"]:
387 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
388 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
389 break
390 else:
391 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
392 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
393 if vnf_params.get("vimAccountId"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
395 vim_account_2_RO(vnf_params["vimAccountId"]))
396
397 for vdu_params in get_iterable(vnf_params, "vdu"):
398 # TODO feature 1417: check that this VDU exist and it is not a PDU
399 if vdu_params.get("volume"):
400 for volume_params in vdu_params["volume"]:
401 if volume_params.get("vim-volume-id"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
404 volume_params["vim-volume-id"])
405 if vdu_params.get("interface"):
406 for interface_params in vdu_params["interface"]:
407 if interface_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
409 vdu_params["id"], "interfaces", interface_params["name"],
410 "ip_address"),
411 interface_params["ip-address"])
412 if interface_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
414 vdu_params["id"], "interfaces", interface_params["name"],
415 "mac_address"),
416 interface_params["mac-address"])
417 if interface_params.get("floating-ip-required"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
419 vdu_params["id"], "interfaces", interface_params["name"],
420 "floating-ip"),
421 interface_params["floating-ip-required"])
422
423 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
424 if internal_vld_params.get("vim-network-name"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
426 internal_vld_params["name"], "vim-network-name"),
427 internal_vld_params["vim-network-name"])
428 if internal_vld_params.get("vim-network-id"):
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "vim-network-id"),
431 internal_vld_params["vim-network-id"])
432 if internal_vld_params.get("ip-profile"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
434 internal_vld_params["name"], "ip-profile"),
435 ip_profile_2_RO(internal_vld_params["ip-profile"]))
436 if internal_vld_params.get("provider-network"):
437
438 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
439 internal_vld_params["name"], "provider-network"),
440 internal_vld_params["provider-network"].copy())
441
442 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
443 # look for interface
444 iface_found = False
445 for vdu_descriptor in vnf_descriptor["vdu"]:
446 for vdu_interface in vdu_descriptor["interface"]:
447 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
448 if icp_params.get("ip-address"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_descriptor["id"], "interfaces",
451 vdu_interface["name"], "ip_address"),
452 icp_params["ip-address"])
453
454 if icp_params.get("mac-address"):
455 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 vdu_interface["name"], "mac_address"),
458 icp_params["mac-address"])
459 iface_found = True
460 break
461 if iface_found:
462 break
463 else:
464 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
465 "internal-vld:id-ref={} is not present at vnfd:internal-"
466 "connection-point".format(vnf_params["member-vnf-index"],
467 icp_params["id-ref"]))
468
469 for vld_params in get_iterable(ns_params, "vld"):
470 if "ip-profile" in vld_params:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
472 ip_profile_2_RO(vld_params["ip-profile"]))
473
474 if vld_params.get("provider-network"):
475
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
477 vld_params["provider-network"].copy())
478
479 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
480 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
481 wim_account_2_RO(vld_params["wimAccountId"])),
482 if vld_params.get("vim-network-name"):
483 RO_vld_sites = []
484 if isinstance(vld_params["vim-network-name"], dict):
485 for vim_account, vim_net in vld_params["vim-network-name"].items():
486 RO_vld_sites.append({
487 "netmap-use": vim_net,
488 "datacenter": vim_account_2_RO(vim_account)
489 })
490 else: # isinstance str
491 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
492 if RO_vld_sites:
493 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
494
495 if vld_params.get("vim-network-id"):
496 RO_vld_sites = []
497 if isinstance(vld_params["vim-network-id"], dict):
498 for vim_account, vim_net in vld_params["vim-network-id"].items():
499 RO_vld_sites.append({
500 "netmap-use": vim_net,
501 "datacenter": vim_account_2_RO(vim_account)
502 })
503 else: # isinstance str
504 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
505 if RO_vld_sites:
506 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
507 if vld_params.get("ns-net"):
508 if isinstance(vld_params["ns-net"], dict):
509 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
510 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
512 if "vnfd-connection-point-ref" in vld_params:
513 for cp_params in vld_params["vnfd-connection-point-ref"]:
514 # look for interface
515 for constituent_vnfd in nsd["constituent-vnfd"]:
516 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
517 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
518 break
519 else:
520 raise LcmException(
521 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
522 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
523 match_cp = False
524 for vdu_descriptor in vnf_descriptor["vdu"]:
525 for interface_descriptor in vdu_descriptor["interface"]:
526 if interface_descriptor.get("external-connection-point-ref") == \
527 cp_params["vnfd-connection-point-ref"]:
528 match_cp = True
529 break
530 if match_cp:
531 break
532 else:
533 raise LcmException(
534 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
535 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
536 cp_params["member-vnf-index-ref"],
537 cp_params["vnfd-connection-point-ref"],
538 vnf_descriptor["id"]))
539 if cp_params.get("ip-address"):
540 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
541 vdu_descriptor["id"], "interfaces",
542 interface_descriptor["name"], "ip_address"),
543 cp_params["ip-address"])
544 if cp_params.get("mac-address"):
545 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
546 vdu_descriptor["id"], "interfaces",
547 interface_descriptor["name"], "mac_address"),
548 cp_params["mac-address"])
549 return RO_ns_params
550
551 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
552 # make a copy to do not change
553 vdu_create = copy(vdu_create)
554 vdu_delete = copy(vdu_delete)
555
556 vdurs = db_vnfr.get("vdur")
557 if vdurs is None:
558 vdurs = []
559 vdu_index = len(vdurs)
560 while vdu_index:
561 vdu_index -= 1
562 vdur = vdurs[vdu_index]
563 if vdur.get("pdu-type"):
564 continue
565 vdu_id_ref = vdur["vdu-id-ref"]
566 if vdu_create and vdu_create.get(vdu_id_ref):
567 for index in range(0, vdu_create[vdu_id_ref]):
568 vdur = deepcopy(vdur)
569 vdur["_id"] = str(uuid4())
570 vdur["count-index"] += 1
571 vdurs.insert(vdu_index+1+index, vdur)
572 del vdu_create[vdu_id_ref]
573 if vdu_delete and vdu_delete.get(vdu_id_ref):
574 del vdurs[vdu_index]
575 vdu_delete[vdu_id_ref] -= 1
576 if not vdu_delete[vdu_id_ref]:
577 del vdu_delete[vdu_id_ref]
578 # check all operations are done
579 if vdu_create or vdu_delete:
580 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
581 vdu_create))
582 if vdu_delete:
583 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
584 vdu_delete))
585
586 vnfr_update = {"vdur": vdurs}
587 db_vnfr["vdur"] = vdurs
588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
589
590 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
591 """
592 Updates database nsr with the RO info for the created vld
593 :param ns_update_nsr: dictionary to be filled with the updated info
594 :param db_nsr: content of db_nsr. This is also modified
595 :param nsr_desc_RO: nsr descriptor from RO
596 :return: Nothing, LcmException is raised on errors
597 """
598
599 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
600 for net_RO in get_iterable(nsr_desc_RO, "nets"):
601 if vld["id"] != net_RO.get("ns_net_osm_id"):
602 continue
603 vld["vim-id"] = net_RO.get("vim_net_id")
604 vld["name"] = net_RO.get("vim_name")
605 vld["status"] = net_RO.get("status")
606 vld["status-detailed"] = net_RO.get("error_msg")
607 ns_update_nsr["vld.{}".format(vld_index)] = vld
608 break
609 else:
610 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
611
612 def set_vnfr_at_error(self, db_vnfrs, error_text):
613 try:
614 for db_vnfr in db_vnfrs.values():
615 vnfr_update = {"status": "ERROR"}
616 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
617 if "status" not in vdur:
618 vdur["status"] = "ERROR"
619 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
620 if error_text:
621 vdur["status-detailed"] = str(error_text)
622 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
623 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
624 except DbException as e:
625 self.logger.error("Cannot update vnf. {}".format(e))
626
627 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
628 """
629 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
630 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
631 :param nsr_desc_RO: nsr descriptor from RO
632 :return: Nothing, LcmException is raised on errors
633 """
634 for vnf_index, db_vnfr in db_vnfrs.items():
635 for vnf_RO in nsr_desc_RO["vnfs"]:
636 if vnf_RO["member_vnf_index"] != vnf_index:
637 continue
638 vnfr_update = {}
639 if vnf_RO.get("ip_address"):
640 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
641 elif not db_vnfr.get("ip-address"):
642 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
643 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
644
645 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
646 vdur_RO_count_index = 0
647 if vdur.get("pdu-type"):
648 continue
649 for vdur_RO in get_iterable(vnf_RO, "vms"):
650 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
651 continue
652 if vdur["count-index"] != vdur_RO_count_index:
653 vdur_RO_count_index += 1
654 continue
655 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
656 if vdur_RO.get("ip_address"):
657 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
658 else:
659 vdur["ip-address"] = None
660 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
661 vdur["name"] = vdur_RO.get("vim_name")
662 vdur["status"] = vdur_RO.get("status")
663 vdur["status-detailed"] = vdur_RO.get("error_msg")
664 for ifacer in get_iterable(vdur, "interfaces"):
665 for interface_RO in get_iterable(vdur_RO, "interfaces"):
666 if ifacer["name"] == interface_RO.get("internal_name"):
667 ifacer["ip-address"] = interface_RO.get("ip_address")
668 ifacer["mac-address"] = interface_RO.get("mac_address")
669 break
670 else:
671 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info"
673 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
678 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
679
680 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
681 for net_RO in get_iterable(nsr_desc_RO, "nets"):
682 if vld["id"] != net_RO.get("vnf_net_osm_id"):
683 continue
684 vld["vim-id"] = net_RO.get("vim_net_id")
685 vld["name"] = net_RO.get("vim_name")
686 vld["status"] = net_RO.get("status")
687 vld["status-detailed"] = net_RO.get("error_msg")
688 vnfr_update["vld.{}".format(vld_index)] = vld
689 break
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
692 vnf_index, vld["id"]))
693
694 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
695 break
696
697 else:
698 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
699
700 def _get_ns_config_info(self, nsr_id):
701 """
702 Generates a mapping between vnf,vdu elements and the N2VC id
703 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
704 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
705 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
706 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
707 """
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
710 mapping = {}
711 ns_config_info = {"osm-config-mapping": mapping}
712 for vca in vca_deployed_list:
713 if not vca["member-vnf-index"]:
714 continue
715 if not vca["vdu_id"]:
716 mapping[vca["member-vnf-index"]] = vca["application"]
717 else:
718 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
719 vca["application"]
720 return ns_config_info
721
722 @staticmethod
723 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
724 """
725 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
726 primitives as verify-ssh-credentials, or config when needed
727 :param desc_primitive_list: information of the descriptor
728 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
729 this element contains a ssh public key
730 :return: The modified list. Can ba an empty list, but always a list
731 """
732 if desc_primitive_list:
733 primitive_list = desc_primitive_list.copy()
734 else:
735 primitive_list = []
736 # look for primitive config, and get the position. None if not present
737 config_position = None
738 for index, primitive in enumerate(primitive_list):
739 if primitive["name"] == "config":
740 config_position = index
741 break
742
743 # for NS, add always a config primitive if not present (bug 874)
744 if not vca_deployed["member-vnf-index"] and config_position is None:
745 primitive_list.insert(0, {"name": "config", "parameter": []})
746 config_position = 0
747 # for VNF/VDU add verify-ssh-credentials after config
748 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
749 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
750 return primitive_list
751
752 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
753 n2vc_key_list, stage):
754 try:
755 db_nsr_update = {}
756 RO_descriptor_number = 0 # number of descriptors created at RO
757 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
758 nslcmop_id = db_nslcmop["_id"]
759 start_deploy = time()
760 ns_params = db_nslcmop.get("operationParams")
761 if ns_params and ns_params.get("timeout_ns_deploy"):
762 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
763 else:
764 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
765
766 # Check for and optionally request placement optimization. Database will be updated if placement activated
767 stage[2] = "Waiting for Placement."
768 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
769
770 # deploy RO
771
772 # get vnfds, instantiate at RO
773 for c_vnf in nsd.get("constituent-vnfd", ()):
774 member_vnf_index = c_vnf["member-vnf-index"]
775 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
776 vnfd_ref = vnfd["id"]
777
778 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
779 db_nsr_update["detailed-status"] = " ".join(stage)
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 self._write_op_status(nslcmop_id, stage)
782
783 # self.logger.debug(logging_text + stage[2])
784 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
785 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
786 RO_descriptor_number += 1
787
788 # look position at deployed.RO.vnfd if not present it will be appended at the end
789 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
790 if vnf_deployed["member-vnf-index"] == member_vnf_index:
791 break
792 else:
793 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
794 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
795
796 # look if present
797 RO_update = {"member-vnf-index": member_vnf_index}
798 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
799 if vnfd_list:
800 RO_update["id"] = vnfd_list[0]["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
802 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
803 else:
804 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
805 get("additionalParamsForVnf"), nsr_id)
806 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
807 RO_update["id"] = desc["uuid"]
808 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
809 vnfd_ref, member_vnf_index, desc["uuid"]))
810 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
811 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
812
813 # create nsd at RO
814 nsd_ref = nsd["id"]
815
816 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
817 db_nsr_update["detailed-status"] = " ".join(stage)
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819 self._write_op_status(nslcmop_id, stage)
820
821 # self.logger.debug(logging_text + stage[2])
822 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
823 RO_descriptor_number += 1
824 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
825 if nsd_list:
826 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
827 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
828 nsd_ref, RO_nsd_uuid))
829 else:
830 nsd_RO = deepcopy(nsd)
831 nsd_RO["id"] = RO_osm_nsd_id
832 nsd_RO.pop("_id", None)
833 nsd_RO.pop("_admin", None)
834 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
835 member_vnf_index = c_vnf["member-vnf-index"]
836 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
837 for c_vld in nsd_RO.get("vld", ()):
838 for cp in c_vld.get("vnfd-connection-point-ref", ()):
839 member_vnf_index = cp["member-vnf-index-ref"]
840 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
841
842 desc = await self.RO.create("nsd", descriptor=nsd_RO)
843 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
844 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
845 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 # Crate ns at RO
849 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853
854 # if present use it unless in error status
855 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
856 if RO_nsr_id:
857 try:
858 stage[2] = "Looking for existing ns at RO"
859 db_nsr_update["detailed-status"] = " ".join(stage)
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._write_op_status(nslcmop_id, stage)
862 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
863 desc = await self.RO.show("ns", RO_nsr_id)
864
865 except ROclient.ROClientException as e:
866 if e.http_code != HTTPStatus.NOT_FOUND:
867 raise
868 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
869 if RO_nsr_id:
870 ns_status, ns_status_info = self.RO.check_ns_status(desc)
871 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
872 if ns_status == "ERROR":
873 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
874 self.logger.debug(logging_text + stage[2])
875 await self.RO.delete("ns", RO_nsr_id)
876 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
877 if not RO_nsr_id:
878 stage[2] = "Checking dependencies"
879 db_nsr_update["detailed-status"] = " ".join(stage)
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 self._write_op_status(nslcmop_id, stage)
882 # self.logger.debug(logging_text + stage[2])
883
884 # check if VIM is creating and wait look if previous tasks in process
885 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
886 if task_dependency:
887 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
888 self.logger.debug(logging_text + stage[2])
889 await asyncio.wait(task_dependency, timeout=3600)
890 if ns_params.get("vnf"):
891 for vnf in ns_params["vnf"]:
892 if "vimAccountId" in vnf:
893 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
894 vnf["vimAccountId"])
895 if task_dependency:
896 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
897 self.logger.debug(logging_text + stage[2])
898 await asyncio.wait(task_dependency, timeout=3600)
899
900 stage[2] = "Checking instantiation parameters."
901 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
902 stage[2] = "Deploying ns at VIM."
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
908 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
911 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
912
913 # wait until NS is ready
914 stage[2] = "Waiting VIM to deploy ns."
915 db_nsr_update["detailed-status"] = " ".join(stage)
916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
917 self._write_op_status(nslcmop_id, stage)
918 detailed_status_old = None
919 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
920
921 old_desc = None
922 while time() <= start_deploy + timeout_ns_deploy:
923 desc = await self.RO.show("ns", RO_nsr_id)
924
925 # deploymentStatus
926 if desc != old_desc:
927 # desc has changed => update db
928 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
929 old_desc = desc
930
931 ns_status, ns_status_info = self.RO.check_ns_status(desc)
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
933 if ns_status == "ERROR":
934 raise ROclient.ROClientException(ns_status_info)
935 elif ns_status == "BUILD":
936 stage[2] = "VIM: ({})".format(ns_status_info)
937 elif ns_status == "ACTIVE":
938 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
939 try:
940 self.ns_update_vnfr(db_vnfrs, desc)
941 break
942 except LcmExceptionNoMgmtIP:
943 pass
944 else:
945 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
946 if stage[2] != detailed_status_old:
947 detailed_status_old = stage[2]
948 db_nsr_update["detailed-status"] = " ".join(stage)
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950 self._write_op_status(nslcmop_id, stage)
951 await asyncio.sleep(5, loop=self.loop)
952 else: # timeout_ns_deploy
953 raise ROclient.ROClientException("Timeout waiting ns to be ready")
954
955 # Updating NSR
956 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
957
958 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
959 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
960 stage[2] = "Deployed at VIM"
961 db_nsr_update["detailed-status"] = " ".join(stage)
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 self._write_op_status(nslcmop_id, stage)
964 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
965 # self.logger.debug(logging_text + "Deployed at VIM")
966 except (ROclient.ROClientException, LcmException, DbException) as e:
967 stage[2] = "ERROR deploying at VIM"
968 self.set_vnfr_at_error(db_vnfrs, str(e))
969 raise
970
971 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
972 """
973 Wait for ip addres at RO, and optionally, insert public key in virtual machine
974 :param logging_text: prefix use for logging
975 :param nsr_id:
976 :param vnfr_id:
977 :param vdu_id:
978 :param vdu_index:
979 :param pub_key: public ssh key to inject, None to skip
980 :param user: user to apply the public ssh key
981 :return: IP address
982 """
983
984 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
985 ro_nsr_id = None
986 ip_address = None
987 nb_tries = 0
988 target_vdu_id = None
989 ro_retries = 0
990
991 while True:
992
993 ro_retries += 1
994 if ro_retries >= 360: # 1 hour
995 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
996
997 await asyncio.sleep(10, loop=self.loop)
998
999 # get ip address
1000 if not target_vdu_id:
1001 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1002
1003 if not vdu_id: # for the VNF case
1004 if db_vnfr.get("status") == "ERROR":
1005 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1006 ip_address = db_vnfr.get("ip-address")
1007 if not ip_address:
1008 continue
1009 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1010 else: # VDU case
1011 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1012 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1013
1014 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1015 vdur = db_vnfr["vdur"][0]
1016 if not vdur:
1017 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1018 vdu_index))
1019
1020 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1021 ip_address = vdur.get("ip-address")
1022 if not ip_address:
1023 continue
1024 target_vdu_id = vdur["vdu-id-ref"]
1025 elif vdur.get("status") == "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1027
1028 if not target_vdu_id:
1029 continue
1030
1031 # inject public key into machine
1032 if pub_key and user:
1033 # wait until NS is deployed at RO
1034 if not ro_nsr_id:
1035 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1036 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1037 if not ro_nsr_id:
1038 continue
1039
1040 # self.logger.debug(logging_text + "Inserting RO key")
1041 if vdur.get("pdu-type"):
1042 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1043 return ip_address
1044 try:
1045 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1046 result_dict = await self.RO.create_action(
1047 item="ns",
1048 item_id_name=ro_nsr_id,
1049 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1050 )
1051 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1052 if not result_dict or not isinstance(result_dict, dict):
1053 raise LcmException("Unknown response from RO when injecting key")
1054 for result in result_dict.values():
1055 if result.get("vim_result") == 200:
1056 break
1057 else:
1058 raise ROclient.ROClientException("error injecting key: {}".format(
1059 result.get("description")))
1060 break
1061 except ROclient.ROClientException as e:
1062 if not nb_tries:
1063 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1064 format(e, 20*10))
1065 nb_tries += 1
1066 if nb_tries >= 20:
1067 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1068 else:
1069 break
1070
1071 return ip_address
1072
1073 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1074 """
1075 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1076 """
1077 my_vca = vca_deployed_list[vca_index]
1078 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1079 # vdu or kdu: no dependencies
1080 return
1081 timeout = 300
1082 while timeout >= 0:
1083 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1084 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1085 configuration_status_list = db_nsr["configurationStatus"]
1086 for index, vca_deployed in enumerate(configuration_status_list):
1087 if index == vca_index:
1088 # myself
1089 continue
1090 if not my_vca.get("member-vnf-index") or \
1091 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1092 internal_status = configuration_status_list[index].get("status")
1093 if internal_status == 'READY':
1094 continue
1095 elif internal_status == 'BROKEN':
1096 raise LcmException("Configuration aborted because dependent charm/s has failed")
1097 else:
1098 break
1099 else:
1100 # no dependencies, return
1101 return
1102 await asyncio.sleep(10)
1103 timeout -= 1
1104
1105 raise LcmException("Configuration aborted because dependent charm/s timeout")
1106
1107 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1108 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1109 nsr_id = db_nsr["_id"]
1110 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1113 db_dict = {
1114 'collection': 'nsrs',
1115 'filter': {'_id': nsr_id},
1116 'path': db_update_entry
1117 }
1118 step = ""
1119 try:
1120
1121 element_type = 'NS'
1122 element_under_configuration = nsr_id
1123
1124 vnfr_id = None
1125 if db_vnfr:
1126 vnfr_id = db_vnfr["_id"]
1127
1128 namespace = "{nsi}.{ns}".format(
1129 nsi=nsi_id if nsi_id else "",
1130 ns=nsr_id)
1131
1132 if vnfr_id:
1133 element_type = 'VNF'
1134 element_under_configuration = vnfr_id
1135 namespace += ".{}".format(vnfr_id)
1136 if vdu_id:
1137 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1138 element_type = 'VDU'
1139 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1140
1141 # Get artifact path
1142 self.fs.sync() # Sync from FSMongo
1143 artifact_path = "{}/{}/charms/{}".format(
1144 base_folder["folder"],
1145 base_folder["pkg-dir"],
1146 config_descriptor["juju"]["charm"]
1147 )
1148
1149 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1150 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1151 is_proxy_charm = False
1152
1153 # n2vc_redesign STEP 3.1
1154
1155 # find old ee_id if exists
1156 ee_id = vca_deployed.get("ee_id")
1157
1158 # create or register execution environment in VCA
1159 if is_proxy_charm:
1160
1161 self._write_configuration_status(
1162 nsr_id=nsr_id,
1163 vca_index=vca_index,
1164 status='CREATING',
1165 element_under_configuration=element_under_configuration,
1166 element_type=element_type
1167 )
1168
1169 step = "create execution environment"
1170 self.logger.debug(logging_text + step)
1171 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1172 reuse_ee_id=ee_id,
1173 db_dict=db_dict)
1174
1175 else:
1176 step = "Waiting to VM being up and getting IP address"
1177 self.logger.debug(logging_text + step)
1178 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1179 user=None, pub_key=None)
1180 credentials = {"hostname": rw_mgmt_ip}
1181 # get username
1182 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1183 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1184 # merged. Meanwhile let's get username from initial-config-primitive
1185 if not username and config_descriptor.get("initial-config-primitive"):
1186 for config_primitive in config_descriptor["initial-config-primitive"]:
1187 for param in config_primitive.get("parameter", ()):
1188 if param["name"] == "ssh-username":
1189 username = param["value"]
1190 break
1191 if not username:
1192 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1193 "'config-access.ssh-access.default-user'")
1194 credentials["username"] = username
1195 # n2vc_redesign STEP 3.2
1196
1197 self._write_configuration_status(
1198 nsr_id=nsr_id,
1199 vca_index=vca_index,
1200 status='REGISTERING',
1201 element_under_configuration=element_under_configuration,
1202 element_type=element_type
1203 )
1204
1205 step = "register execution environment {}".format(credentials)
1206 self.logger.debug(logging_text + step)
1207 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1208 db_dict=db_dict)
1209
1210 # for compatibility with MON/POL modules, the need model and application name at database
1211 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1212 ee_id_parts = ee_id.split('.')
1213 model_name = ee_id_parts[0]
1214 application_name = ee_id_parts[1]
1215 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1216 db_update_entry + "application": application_name,
1217 db_update_entry + "ee_id": ee_id})
1218
1219 # n2vc_redesign STEP 3.3
1220
1221 step = "Install configuration Software"
1222
1223 self._write_configuration_status(
1224 nsr_id=nsr_id,
1225 vca_index=vca_index,
1226 status='INSTALLING SW',
1227 element_under_configuration=element_under_configuration,
1228 element_type=element_type
1229 )
1230
1231 # TODO check if already done
1232 self.logger.debug(logging_text + step)
1233 config = None
1234 if not is_proxy_charm:
1235 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1236 if initial_config_primitive_list:
1237 for primitive in initial_config_primitive_list:
1238 if primitive["name"] == "config":
1239 config = self._map_primitive_params(
1240 primitive,
1241 {},
1242 deploy_params
1243 )
1244 break
1245 await self.n2vc.install_configuration_sw(
1246 ee_id=ee_id,
1247 artifact_path=artifact_path,
1248 db_dict=db_dict,
1249 config=config
1250 )
1251
1252 # write in db flag of configuration_sw already installed
1253 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1254
1255 # add relations for this VCA (wait for other peers related with this VCA)
1256 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1257
1258 # if SSH access is required, then get execution environment SSH public
1259 if is_proxy_charm: # if native charm we have waited already to VM be UP
1260 pub_key = None
1261 user = None
1262 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1263 # Needed to inject a ssh key
1264 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1265 step = "Install configuration Software, getting public ssh key"
1266 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1267
1268 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1269 else:
1270 step = "Waiting to VM being up and getting IP address"
1271 self.logger.debug(logging_text + step)
1272
1273 # n2vc_redesign STEP 5.1
1274 # wait for RO (ip-address) Insert pub_key into VM
1275 if vnfr_id:
1276 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1277 user=user, pub_key=pub_key)
1278 else:
1279 rw_mgmt_ip = None # This is for a NS configuration
1280
1281 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1282
1283 # store rw_mgmt_ip in deploy params for later replacement
1284 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1285
1286 # n2vc_redesign STEP 6 Execute initial config primitive
1287 step = 'execute initial config primitive'
1288 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1289
1290 # sort initial config primitives by 'seq'
1291 if initial_config_primitive_list:
1292 try:
1293 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1294 except Exception as e:
1295 self.logger.error(logging_text + step + ": " + str(e))
1296 else:
1297 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1298
1299 # add config if not present for NS charm
1300 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1301 vca_deployed)
1302
1303 # wait for dependent primitives execution (NS -> VNF -> VDU)
1304 if initial_config_primitive_list:
1305 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1306
1307 # stage, in function of element type: vdu, kdu, vnf or ns
1308 my_vca = vca_deployed_list[vca_index]
1309 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1310 # VDU or KDU
1311 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1312 elif my_vca.get("member-vnf-index"):
1313 # VNF
1314 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1315 else:
1316 # NS
1317 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1318
1319 self._write_configuration_status(
1320 nsr_id=nsr_id,
1321 vca_index=vca_index,
1322 status='EXECUTING PRIMITIVE'
1323 )
1324
1325 self._write_op_status(
1326 op_id=nslcmop_id,
1327 stage=stage
1328 )
1329
1330 check_if_terminated_needed = True
1331 for initial_config_primitive in initial_config_primitive_list:
1332 # adding information on the vca_deployed if it is a NS execution environment
1333 if not vca_deployed["member-vnf-index"]:
1334 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1335 # TODO check if already done
1336 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1337
1338 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1339 self.logger.debug(logging_text + step)
1340 await self.n2vc.exec_primitive(
1341 ee_id=ee_id,
1342 primitive_name=initial_config_primitive["name"],
1343 params_dict=primitive_params_,
1344 db_dict=db_dict
1345 )
1346 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1347 if check_if_terminated_needed:
1348 if config_descriptor.get('terminate-config-primitive'):
1349 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1350 check_if_terminated_needed = False
1351
1352 # TODO register in database that primitive is done
1353
1354 step = "instantiated at VCA"
1355 self.logger.debug(logging_text + step)
1356
1357 self._write_configuration_status(
1358 nsr_id=nsr_id,
1359 vca_index=vca_index,
1360 status='READY'
1361 )
1362
1363 except Exception as e: # TODO not use Exception but N2VC exception
1364 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1365 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1366 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1367 self._write_configuration_status(
1368 nsr_id=nsr_id,
1369 vca_index=vca_index,
1370 status='BROKEN'
1371 )
1372 raise LcmException("{} {}".format(step, e)) from e
1373
1374 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1375 error_description: str = None, error_detail: str = None, other_update: dict = None):
1376 """
1377 Update db_nsr fields.
1378 :param nsr_id:
1379 :param ns_state:
1380 :param current_operation:
1381 :param current_operation_id:
1382 :param error_description:
1383 :param error_detail:
1384 :param other_update: Other required changes at database if provided, will be cleared
1385 :return:
1386 """
1387 try:
1388 db_dict = other_update or {}
1389 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1390 db_dict["_admin.current-operation"] = current_operation_id
1391 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1392 db_dict["currentOperation"] = current_operation
1393 db_dict["currentOperationID"] = current_operation_id
1394 db_dict["errorDescription"] = error_description
1395 db_dict["errorDetail"] = error_detail
1396
1397 if ns_state:
1398 db_dict["nsState"] = ns_state
1399 self.update_db_2("nsrs", nsr_id, db_dict)
1400 except DbException as e:
1401 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1402
1403 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1404 operation_state: str = None, other_update: dict = None):
1405 try:
1406 db_dict = other_update or {}
1407 db_dict['queuePosition'] = queuePosition
1408 if isinstance(stage, list):
1409 db_dict['stage'] = stage[0]
1410 db_dict['detailed-status'] = " ".join(stage)
1411 elif stage is not None:
1412 db_dict['stage'] = str(stage)
1413
1414 if error_message is not None:
1415 db_dict['errorMessage'] = error_message
1416 if operation_state is not None:
1417 db_dict['operationState'] = operation_state
1418 db_dict["statusEnteredTime"] = time()
1419 self.update_db_2("nslcmops", op_id, db_dict)
1420 except DbException as e:
1421 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1422
1423 def _write_all_config_status(self, nsr_id: str, status: str):
1424 try:
1425 # nsrs record
1426 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1427 # configurationStatus
1428 config_status = db_nsr.get('configurationStatus')
1429 if config_status:
1430 # update status
1431 db_dict = dict()
1432 db_dict['configurationStatus'] = list()
1433 for c in config_status:
1434 c['status'] = status
1435 db_dict['configurationStatus'].append(c)
1436 self.update_db_2("nsrs", nsr_id, db_dict)
1437
1438 except DbException as e:
1439 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1440
1441 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1442 element_under_configuration: str = None, element_type: str = None):
1443
1444 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1445 # .format(vca_index, status))
1446
1447 try:
1448 db_path = 'configurationStatus.{}.'.format(vca_index)
1449 db_dict = dict()
1450 if status:
1451 db_dict[db_path + 'status'] = status
1452 if element_under_configuration:
1453 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1454 if element_type:
1455 db_dict[db_path + 'elementType'] = element_type
1456 self.update_db_2("nsrs", nsr_id, db_dict)
1457 except DbException as e:
1458 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1459 .format(status, nsr_id, vca_index, e))
1460
1461 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1462 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1463 if placement_engine == "PLA":
1464 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1465 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1466 db_poll_interval = 5
1467 wait = db_poll_interval * 4
1468 pla_result = None
1469 while not pla_result and wait >= 0:
1470 await asyncio.sleep(db_poll_interval)
1471 wait -= db_poll_interval
1472 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1473 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1474
1475 if not pla_result:
1476 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1477
1478 for pla_vnf in pla_result['vnf']:
1479 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1480 if not pla_vnf.get('vimAccountId') or not vnfr:
1481 continue
1482 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1483 return
1484
1485 def update_nsrs_with_pla_result(self, params):
1486 try:
1487 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1488 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1489 except Exception as e:
1490 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1491
1492 async def instantiate(self, nsr_id, nslcmop_id):
1493 """
1494
1495 :param nsr_id: ns instance to deploy
1496 :param nslcmop_id: operation to run
1497 :return:
1498 """
1499
1500 # Try to lock HA task here
1501 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1502 if not task_is_locked_by_me:
1503 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1504 return
1505
1506 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1507 self.logger.debug(logging_text + "Enter")
1508
1509 # get all needed from database
1510
1511 # database nsrs record
1512 db_nsr = None
1513
1514 # database nslcmops record
1515 db_nslcmop = None
1516
1517 # update operation on nsrs
1518 db_nsr_update = {}
1519 # update operation on nslcmops
1520 db_nslcmop_update = {}
1521
1522 nslcmop_operation_state = None
1523 db_vnfrs = {} # vnf's info indexed by member-index
1524 # n2vc_info = {}
1525 tasks_dict_info = {} # from task to info text
1526 exc = None
1527 error_list = []
1528 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1529 # ^ stage, step, VIM progress
1530 try:
1531 # wait for any previous tasks in process
1532 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1533
1534 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1535 stage[1] = "Reading from database,"
1536 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1537 db_nsr_update["detailed-status"] = "creating"
1538 db_nsr_update["operational-status"] = "init"
1539 self._write_ns_status(
1540 nsr_id=nsr_id,
1541 ns_state="BUILDING",
1542 current_operation="INSTANTIATING",
1543 current_operation_id=nslcmop_id,
1544 other_update=db_nsr_update
1545 )
1546 self._write_op_status(
1547 op_id=nslcmop_id,
1548 stage=stage,
1549 queuePosition=0
1550 )
1551
1552 # read from db: operation
1553 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1554 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1555 ns_params = db_nslcmop.get("operationParams")
1556 if ns_params and ns_params.get("timeout_ns_deploy"):
1557 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1558 else:
1559 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1560
1561 # read from db: ns
1562 stage[1] = "Getting nsr={} from db".format(nsr_id)
1563 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1564 # nsd is replicated into ns (no db read)
1565 nsd = db_nsr["nsd"]
1566 # nsr_name = db_nsr["name"] # TODO short-name??
1567
1568 # read from db: vnf's of this ns
1569 stage[1] = "Getting vnfrs from db"
1570 self.logger.debug(logging_text + stage[1])
1571 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1572
1573 # read from db: vnfd's for every vnf
1574 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1575 db_vnfds = {} # every vnfd data indexed by vnf id
1576 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1577
1578 # for each vnf in ns, read vnfd
1579 for vnfr in db_vnfrs_list:
1580 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1581 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1582 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1583 # if we haven't this vnfd, read it from db
1584 if vnfd_id not in db_vnfds:
1585 # read from db
1586 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1587 self.logger.debug(logging_text + stage[1])
1588 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1589
1590 # store vnfd
1591 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1592 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1593 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1594
1595 # Get or generates the _admin.deployed.VCA list
1596 vca_deployed_list = None
1597 if db_nsr["_admin"].get("deployed"):
1598 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1599 if vca_deployed_list is None:
1600 vca_deployed_list = []
1601 configuration_status_list = []
1602 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1603 db_nsr_update["configurationStatus"] = configuration_status_list
1604 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1605 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1606 elif isinstance(vca_deployed_list, dict):
1607 # maintain backward compatibility. Change a dict to list at database
1608 vca_deployed_list = list(vca_deployed_list.values())
1609 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1610 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1611
1612 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1613 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1614 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1615
1616 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1617 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1618 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1619
1620 # n2vc_redesign STEP 2 Deploy Network Scenario
1621 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1622 self._write_op_status(
1623 op_id=nslcmop_id,
1624 stage=stage
1625 )
1626
1627 stage[1] = "Deploying KDUs,"
1628 # self.logger.debug(logging_text + "Before deploy_kdus")
1629 # Call to deploy_kdus in case exists the "vdu:kdu" param
1630 await self.deploy_kdus(
1631 logging_text=logging_text,
1632 nsr_id=nsr_id,
1633 nslcmop_id=nslcmop_id,
1634 db_vnfrs=db_vnfrs,
1635 db_vnfds=db_vnfds,
1636 task_instantiation_info=tasks_dict_info,
1637 )
1638
1639 stage[1] = "Getting VCA public key."
1640 # n2vc_redesign STEP 1 Get VCA public ssh-key
1641 # feature 1429. Add n2vc public key to needed VMs
1642 n2vc_key = self.n2vc.get_public_key()
1643 n2vc_key_list = [n2vc_key]
1644 if self.vca_config.get("public_key"):
1645 n2vc_key_list.append(self.vca_config["public_key"])
1646
1647 stage[1] = "Deploying NS at VIM."
1648 task_ro = asyncio.ensure_future(
1649 self.instantiate_RO(
1650 logging_text=logging_text,
1651 nsr_id=nsr_id,
1652 nsd=nsd,
1653 db_nsr=db_nsr,
1654 db_nslcmop=db_nslcmop,
1655 db_vnfrs=db_vnfrs,
1656 db_vnfds_ref=db_vnfds_ref,
1657 n2vc_key_list=n2vc_key_list,
1658 stage=stage
1659 )
1660 )
1661 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1662 tasks_dict_info[task_ro] = "Deploying at VIM"
1663
1664 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1665 stage[1] = "Deploying Execution Environments."
1666 self.logger.debug(logging_text + stage[1])
1667
1668 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1669 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1670 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1671 vnfd_id = c_vnf["vnfd-id-ref"]
1672 vnfd = db_vnfds_ref[vnfd_id]
1673 member_vnf_index = str(c_vnf["member-vnf-index"])
1674 db_vnfr = db_vnfrs[member_vnf_index]
1675 base_folder = vnfd["_admin"]["storage"]
1676 vdu_id = None
1677 vdu_index = 0
1678 vdu_name = None
1679 kdu_name = None
1680
1681 # Get additional parameters
1682 deploy_params = {}
1683 if db_vnfr.get("additionalParamsForVnf"):
1684 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1685
1686 descriptor_config = vnfd.get("vnf-configuration")
1687 if descriptor_config and descriptor_config.get("juju"):
1688 self._deploy_n2vc(
1689 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1690 db_nsr=db_nsr,
1691 db_vnfr=db_vnfr,
1692 nslcmop_id=nslcmop_id,
1693 nsr_id=nsr_id,
1694 nsi_id=nsi_id,
1695 vnfd_id=vnfd_id,
1696 vdu_id=vdu_id,
1697 kdu_name=kdu_name,
1698 member_vnf_index=member_vnf_index,
1699 vdu_index=vdu_index,
1700 vdu_name=vdu_name,
1701 deploy_params=deploy_params,
1702 descriptor_config=descriptor_config,
1703 base_folder=base_folder,
1704 task_instantiation_info=tasks_dict_info,
1705 stage=stage
1706 )
1707
1708 # Deploy charms for each VDU that supports one.
1709 for vdud in get_iterable(vnfd, 'vdu'):
1710 vdu_id = vdud["id"]
1711 descriptor_config = vdud.get('vdu-configuration')
1712 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1713 if vdur.get("additionalParams"):
1714 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1715 else:
1716 deploy_params_vdu = deploy_params
1717 if descriptor_config and descriptor_config.get("juju"):
1718 # look for vdu index in the db_vnfr["vdu"] section
1719 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1720 # if vdur["vdu-id-ref"] == vdu_id:
1721 # break
1722 # else:
1723 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1724 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1725 # vdu_name = vdur.get("name")
1726 vdu_name = None
1727 kdu_name = None
1728 for vdu_index in range(int(vdud.get("count", 1))):
1729 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1730 self._deploy_n2vc(
1731 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1732 member_vnf_index, vdu_id, vdu_index),
1733 db_nsr=db_nsr,
1734 db_vnfr=db_vnfr,
1735 nslcmop_id=nslcmop_id,
1736 nsr_id=nsr_id,
1737 nsi_id=nsi_id,
1738 vnfd_id=vnfd_id,
1739 vdu_id=vdu_id,
1740 kdu_name=kdu_name,
1741 member_vnf_index=member_vnf_index,
1742 vdu_index=vdu_index,
1743 vdu_name=vdu_name,
1744 deploy_params=deploy_params_vdu,
1745 descriptor_config=descriptor_config,
1746 base_folder=base_folder,
1747 task_instantiation_info=tasks_dict_info,
1748 stage=stage
1749 )
1750 for kdud in get_iterable(vnfd, 'kdu'):
1751 kdu_name = kdud["name"]
1752 descriptor_config = kdud.get('kdu-configuration')
1753 if descriptor_config and descriptor_config.get("juju"):
1754 vdu_id = None
1755 vdu_index = 0
1756 vdu_name = None
1757 # look for vdu index in the db_vnfr["vdu"] section
1758 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1759 # if vdur["vdu-id-ref"] == vdu_id:
1760 # break
1761 # else:
1762 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1763 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1764 # vdu_name = vdur.get("name")
1765 # vdu_name = None
1766
1767 self._deploy_n2vc(
1768 logging_text=logging_text,
1769 db_nsr=db_nsr,
1770 db_vnfr=db_vnfr,
1771 nslcmop_id=nslcmop_id,
1772 nsr_id=nsr_id,
1773 nsi_id=nsi_id,
1774 vnfd_id=vnfd_id,
1775 vdu_id=vdu_id,
1776 kdu_name=kdu_name,
1777 member_vnf_index=member_vnf_index,
1778 vdu_index=vdu_index,
1779 vdu_name=vdu_name,
1780 deploy_params=deploy_params,
1781 descriptor_config=descriptor_config,
1782 base_folder=base_folder,
1783 task_instantiation_info=tasks_dict_info,
1784 stage=stage
1785 )
1786
1787 # Check if this NS has a charm configuration
1788 descriptor_config = nsd.get("ns-configuration")
1789 if descriptor_config and descriptor_config.get("juju"):
1790 vnfd_id = None
1791 db_vnfr = None
1792 member_vnf_index = None
1793 vdu_id = None
1794 kdu_name = None
1795 vdu_index = 0
1796 vdu_name = None
1797
1798 # Get additional parameters
1799 deploy_params = {}
1800 if db_nsr.get("additionalParamsForNs"):
1801 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1802 base_folder = nsd["_admin"]["storage"]
1803 self._deploy_n2vc(
1804 logging_text=logging_text,
1805 db_nsr=db_nsr,
1806 db_vnfr=db_vnfr,
1807 nslcmop_id=nslcmop_id,
1808 nsr_id=nsr_id,
1809 nsi_id=nsi_id,
1810 vnfd_id=vnfd_id,
1811 vdu_id=vdu_id,
1812 kdu_name=kdu_name,
1813 member_vnf_index=member_vnf_index,
1814 vdu_index=vdu_index,
1815 vdu_name=vdu_name,
1816 deploy_params=deploy_params,
1817 descriptor_config=descriptor_config,
1818 base_folder=base_folder,
1819 task_instantiation_info=tasks_dict_info,
1820 stage=stage
1821 )
1822
1823 # rest of staff will be done at finally
1824
1825 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1826 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1827 exc = e
1828 except asyncio.CancelledError:
1829 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1830 exc = "Operation was cancelled"
1831 except Exception as e:
1832 exc = traceback.format_exc()
1833 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1834 finally:
1835 if exc:
1836 error_list.append(str(exc))
1837 try:
1838 # wait for pending tasks
1839 if tasks_dict_info:
1840 stage[1] = "Waiting for instantiate pending tasks."
1841 self.logger.debug(logging_text + stage[1])
1842 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1843 stage, nslcmop_id, nsr_id=nsr_id)
1844 stage[1] = stage[2] = ""
1845 except asyncio.CancelledError:
1846 error_list.append("Cancelled")
1847 # TODO cancel all tasks
1848 except Exception as exc:
1849 error_list.append(str(exc))
1850
1851 # update operation-status
1852 db_nsr_update["operational-status"] = "running"
1853 # let's begin with VCA 'configured' status (later we can change it)
1854 db_nsr_update["config-status"] = "configured"
1855 for task, task_name in tasks_dict_info.items():
1856 if not task.done() or task.cancelled() or task.exception():
1857 if task_name.startswith(self.task_name_deploy_vca):
1858 # A N2VC task is pending
1859 db_nsr_update["config-status"] = "failed"
1860 else:
1861 # RO or KDU task is pending
1862 db_nsr_update["operational-status"] = "failed"
1863
1864 # update status at database
1865 if error_list:
1866 error_detail = ". ".join(error_list)
1867 self.logger.error(logging_text + error_detail)
1868 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1869 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1870
1871 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1872 db_nslcmop_update["detailed-status"] = error_detail
1873 nslcmop_operation_state = "FAILED"
1874 ns_state = "BROKEN"
1875 else:
1876 error_detail = None
1877 error_description_nsr = error_description_nslcmop = None
1878 ns_state = "READY"
1879 db_nsr_update["detailed-status"] = "Done"
1880 db_nslcmop_update["detailed-status"] = "Done"
1881 nslcmop_operation_state = "COMPLETED"
1882
1883 if db_nsr:
1884 self._write_ns_status(
1885 nsr_id=nsr_id,
1886 ns_state=ns_state,
1887 current_operation="IDLE",
1888 current_operation_id=None,
1889 error_description=error_description_nsr,
1890 error_detail=error_detail,
1891 other_update=db_nsr_update
1892 )
1893 if db_nslcmop:
1894 self._write_op_status(
1895 op_id=nslcmop_id,
1896 stage="",
1897 error_message=error_description_nslcmop,
1898 operation_state=nslcmop_operation_state,
1899 other_update=db_nslcmop_update,
1900 )
1901
1902 if nslcmop_operation_state:
1903 try:
1904 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1905 "operationState": nslcmop_operation_state},
1906 loop=self.loop)
1907 except Exception as e:
1908 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1909
1910 self.logger.debug(logging_text + "Exit")
1911 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1912
1913 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1914
1915 # steps:
1916 # 1. find all relations for this VCA
1917 # 2. wait for other peers related
1918 # 3. add relations
1919
1920 try:
1921
1922 # STEP 1: find all relations for this VCA
1923
1924 # read nsr record
1925 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1926
1927 # this VCA data
1928 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1929
1930 # read all ns-configuration relations
1931 ns_relations = list()
1932 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1933 if db_ns_relations:
1934 for r in db_ns_relations:
1935 # check if this VCA is in the relation
1936 if my_vca.get('member-vnf-index') in\
1937 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1938 ns_relations.append(r)
1939
1940 # read all vnf-configuration relations
1941 vnf_relations = list()
1942 db_vnfd_list = db_nsr.get('vnfd-id')
1943 if db_vnfd_list:
1944 for vnfd in db_vnfd_list:
1945 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1946 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1947 if db_vnf_relations:
1948 for r in db_vnf_relations:
1949 # check if this VCA is in the relation
1950 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1951 vnf_relations.append(r)
1952
1953 # if no relations, terminate
1954 if not ns_relations and not vnf_relations:
1955 self.logger.debug(logging_text + ' No relations')
1956 return True
1957
1958 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1959
1960 # add all relations
1961 start = time()
1962 while True:
1963 # check timeout
1964 now = time()
1965 if now - start >= timeout:
1966 self.logger.error(logging_text + ' : timeout adding relations')
1967 return False
1968
1969 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1970 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1971
1972 # for each defined NS relation, find the VCA's related
1973 for r in ns_relations:
1974 from_vca_ee_id = None
1975 to_vca_ee_id = None
1976 from_vca_endpoint = None
1977 to_vca_endpoint = None
1978 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1979 for vca in vca_list:
1980 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1981 and vca.get('config_sw_installed'):
1982 from_vca_ee_id = vca.get('ee_id')
1983 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1984 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1985 and vca.get('config_sw_installed'):
1986 to_vca_ee_id = vca.get('ee_id')
1987 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1988 if from_vca_ee_id and to_vca_ee_id:
1989 # add relation
1990 await self.n2vc.add_relation(
1991 ee_id_1=from_vca_ee_id,
1992 ee_id_2=to_vca_ee_id,
1993 endpoint_1=from_vca_endpoint,
1994 endpoint_2=to_vca_endpoint)
1995 # remove entry from relations list
1996 ns_relations.remove(r)
1997 else:
1998 # check failed peers
1999 try:
2000 vca_status_list = db_nsr.get('configurationStatus')
2001 if vca_status_list:
2002 for i in range(len(vca_list)):
2003 vca = vca_list[i]
2004 vca_status = vca_status_list[i]
2005 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2006 if vca_status.get('status') == 'BROKEN':
2007 # peer broken: remove relation from list
2008 ns_relations.remove(r)
2009 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2010 if vca_status.get('status') == 'BROKEN':
2011 # peer broken: remove relation from list
2012 ns_relations.remove(r)
2013 except Exception:
2014 # ignore
2015 pass
2016
2017 # for each defined VNF relation, find the VCA's related
2018 for r in vnf_relations:
2019 from_vca_ee_id = None
2020 to_vca_ee_id = None
2021 from_vca_endpoint = None
2022 to_vca_endpoint = None
2023 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2024 for vca in vca_list:
2025 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2026 from_vca_ee_id = vca.get('ee_id')
2027 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2028 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2029 to_vca_ee_id = vca.get('ee_id')
2030 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2031 if from_vca_ee_id and to_vca_ee_id:
2032 # add relation
2033 await self.n2vc.add_relation(
2034 ee_id_1=from_vca_ee_id,
2035 ee_id_2=to_vca_ee_id,
2036 endpoint_1=from_vca_endpoint,
2037 endpoint_2=to_vca_endpoint)
2038 # remove entry from relations list
2039 vnf_relations.remove(r)
2040 else:
2041 # check failed peers
2042 try:
2043 vca_status_list = db_nsr.get('configurationStatus')
2044 if vca_status_list:
2045 for i in range(len(vca_list)):
2046 vca = vca_list[i]
2047 vca_status = vca_status_list[i]
2048 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2049 if vca_status.get('status') == 'BROKEN':
2050 # peer broken: remove relation from list
2051 ns_relations.remove(r)
2052 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2053 if vca_status.get('status') == 'BROKEN':
2054 # peer broken: remove relation from list
2055 ns_relations.remove(r)
2056 except Exception:
2057 # ignore
2058 pass
2059
2060 # wait for next try
2061 await asyncio.sleep(5.0)
2062
2063 if not ns_relations and not vnf_relations:
2064 self.logger.debug('Relations added')
2065 break
2066
2067 return True
2068
2069 except Exception as e:
2070 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2071 return False
2072
2073 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2074 # Launch kdus if present in the descriptor
2075
2076 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2077
2078 def _get_cluster_id(cluster_id, cluster_type):
2079 nonlocal k8scluster_id_2_uuic
2080 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2081 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2082
2083 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2084 if not db_k8scluster:
2085 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2086 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2087 if not k8s_id:
2088 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2089 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2090 return k8s_id
2091
2092 logging_text += "Deploy kdus: "
2093 step = ""
2094 try:
2095 db_nsr_update = {"_admin.deployed.K8s": []}
2096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2097
2098 index = 0
2099 updated_cluster_list = []
2100
2101 for vnfr_data in db_vnfrs.values():
2102 for kdur in get_iterable(vnfr_data, "kdur"):
2103 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2104 vnfd_id = vnfr_data.get('vnfd-id')
2105 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2106 if kdur.get("helm-chart"):
2107 kdumodel = kdur["helm-chart"]
2108 k8sclustertype = "helm-chart"
2109 elif kdur.get("juju-bundle"):
2110 kdumodel = kdur["juju-bundle"]
2111 k8sclustertype = "juju-bundle"
2112 else:
2113 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2114 "juju-bundle. Maybe an old NBI version is running".
2115 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2116 # check if kdumodel is a file and exists
2117 try:
2118 # path format: /vnfdid/pkkdir/kdumodel
2119 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
2120 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2121 kdumodel = self.fs.path + filename
2122 except asyncio.CancelledError:
2123 raise
2124 except Exception: # it is not a file
2125 pass
2126
2127 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2128 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2129 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2130
2131 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2132 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2133 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2134 if del_repo_list or added_repo_dict:
2135 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2136 updated = {'_admin.helm_charts_added.' +
2137 item: name for item, name in added_repo_dict.items()}
2138 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2139 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2140 added_repo_dict))
2141 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2142 updated_cluster_list.append(cluster_uuid)
2143
2144 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2145 kdur["kdu-name"], k8s_cluster_id)
2146
2147 k8s_instace_info = {"kdu-instance": None,
2148 "k8scluster-uuid": cluster_uuid,
2149 "k8scluster-type": k8sclustertype,
2150 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2151 "kdu-name": kdur["kdu-name"],
2152 "kdu-model": kdumodel}
2153 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2154 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2155
2156 db_dict = {"collection": "nsrs",
2157 "filter": {"_id": nsr_id},
2158 "path": "_admin.deployed.K8s.{}".format(index)}
2159
2160 task = asyncio.ensure_future(
2161 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2162 atomic=True, params=desc_params,
2163 db_dict=db_dict, timeout=600,
2164 kdu_name=kdur["kdu-name"]))
2165
2166 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2167 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2168
2169 index += 1
2170
2171 except (LcmException, asyncio.CancelledError):
2172 raise
2173 except Exception as e:
2174 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2175 if isinstance(e, (N2VCException, DbException)):
2176 self.logger.error(logging_text + msg)
2177 else:
2178 self.logger.critical(logging_text + msg, exc_info=True)
2179 raise LcmException(msg)
2180 finally:
2181 if db_nsr_update:
2182 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2183
2184 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2185 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2186 base_folder, task_instantiation_info, stage):
2187 # launch instantiate_N2VC in a asyncio task and register task object
2188 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2189 # if not found, create one entry and update database
2190
2191 # fill db_nsr._admin.deployed.VCA.<index>
2192 vca_index = -1
2193 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2194 if not vca_deployed:
2195 continue
2196 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2197 vca_deployed.get("vdu_id") == vdu_id and \
2198 vca_deployed.get("kdu_name") == kdu_name and \
2199 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2200 break
2201 else:
2202 # not found, create one.
2203 vca_deployed = {
2204 "member-vnf-index": member_vnf_index,
2205 "vdu_id": vdu_id,
2206 "kdu_name": kdu_name,
2207 "vdu_count_index": vdu_index,
2208 "operational-status": "init", # TODO revise
2209 "detailed-status": "", # TODO revise
2210 "step": "initial-deploy", # TODO revise
2211 "vnfd_id": vnfd_id,
2212 "vdu_name": vdu_name,
2213 }
2214 vca_index += 1
2215
2216 # create VCA and configurationStatus in db
2217 db_dict = {
2218 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2219 "configurationStatus.{}".format(vca_index): dict()
2220 }
2221 self.update_db_2("nsrs", nsr_id, db_dict)
2222
2223 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2224
2225 # Launch task
2226 task_n2vc = asyncio.ensure_future(
2227 self.instantiate_N2VC(
2228 logging_text=logging_text,
2229 vca_index=vca_index,
2230 nsi_id=nsi_id,
2231 db_nsr=db_nsr,
2232 db_vnfr=db_vnfr,
2233 vdu_id=vdu_id,
2234 kdu_name=kdu_name,
2235 vdu_index=vdu_index,
2236 deploy_params=deploy_params,
2237 config_descriptor=descriptor_config,
2238 base_folder=base_folder,
2239 nslcmop_id=nslcmop_id,
2240 stage=stage
2241 )
2242 )
2243 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2244 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2245 member_vnf_index or "", vdu_id or "")
2246
2247 # Check if this VNFD has a configured terminate action
2248 def _has_terminate_config_primitive(self, vnfd):
2249 vnf_config = vnfd.get("vnf-configuration")
2250 if vnf_config and vnf_config.get("terminate-config-primitive"):
2251 return True
2252 else:
2253 return False
2254
2255 @staticmethod
2256 def _get_terminate_config_primitive_seq_list(vnfd):
2257 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2258 # No need to check for existing primitive twice, already done before
2259 vnf_config = vnfd.get("vnf-configuration")
2260 seq_list = vnf_config.get("terminate-config-primitive")
2261 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2262 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2263 return seq_list_sorted
2264
2265 @staticmethod
2266 def _create_nslcmop(nsr_id, operation, params):
2267 """
2268 Creates a ns-lcm-opp content to be stored at database.
2269 :param nsr_id: internal id of the instance
2270 :param operation: instantiate, terminate, scale, action, ...
2271 :param params: user parameters for the operation
2272 :return: dictionary following SOL005 format
2273 """
2274 # Raise exception if invalid arguments
2275 if not (nsr_id and operation and params):
2276 raise LcmException(
2277 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2278 now = time()
2279 _id = str(uuid4())
2280 nslcmop = {
2281 "id": _id,
2282 "_id": _id,
2283 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2284 "operationState": "PROCESSING",
2285 "statusEnteredTime": now,
2286 "nsInstanceId": nsr_id,
2287 "lcmOperationType": operation,
2288 "startTime": now,
2289 "isAutomaticInvocation": False,
2290 "operationParams": params,
2291 "isCancelPending": False,
2292 "links": {
2293 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2294 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2295 }
2296 }
2297 return nslcmop
2298
2299 def _format_additional_params(self, params):
2300 params = params or {}
2301 for key, value in params.items():
2302 if str(value).startswith("!!yaml "):
2303 params[key] = yaml.safe_load(value[7:])
2304 return params
2305
2306 def _get_terminate_primitive_params(self, seq, vnf_index):
2307 primitive = seq.get('name')
2308 primitive_params = {}
2309 params = {
2310 "member_vnf_index": vnf_index,
2311 "primitive": primitive,
2312 "primitive_params": primitive_params,
2313 }
2314 desc_params = {}
2315 return self._map_primitive_params(seq, params, desc_params)
2316
2317 # sub-operations
2318
2319 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2320 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2321 if (op.get('operationState') == 'COMPLETED'):
2322 # b. Skip sub-operation
2323 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2324 return self.SUBOPERATION_STATUS_SKIP
2325 else:
2326 # c. Reintent executing sub-operation
2327 # The sub-operation exists, and operationState != 'COMPLETED'
2328 # Update operationState = 'PROCESSING' to indicate a reintent.
2329 operationState = 'PROCESSING'
2330 detailed_status = 'In progress'
2331 self._update_suboperation_status(
2332 db_nslcmop, op_index, operationState, detailed_status)
2333 # Return the sub-operation index
2334 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2335 # with arguments extracted from the sub-operation
2336 return op_index
2337
2338 # Find a sub-operation where all keys in a matching dictionary must match
2339 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2340 def _find_suboperation(self, db_nslcmop, match):
2341 if (db_nslcmop and match):
2342 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2343 for i, op in enumerate(op_list):
2344 if all(op.get(k) == match[k] for k in match):
2345 return i
2346 return self.SUBOPERATION_STATUS_NOT_FOUND
2347
2348 # Update status for a sub-operation given its index
2349 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2350 # Update DB for HA tasks
2351 q_filter = {'_id': db_nslcmop['_id']}
2352 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2353 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2354 self.db.set_one("nslcmops",
2355 q_filter=q_filter,
2356 update_dict=update_dict,
2357 fail_on_empty=False)
2358
2359 # Add sub-operation, return the index of the added sub-operation
2360 # Optionally, set operationState, detailed-status, and operationType
2361 # Status and type are currently set for 'scale' sub-operations:
2362 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2363 # 'detailed-status' : status message
2364 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2365 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2366 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2367 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2368 RO_nsr_id=None, RO_scaling_info=None):
2369 if not db_nslcmop:
2370 return self.SUBOPERATION_STATUS_NOT_FOUND
2371 # Get the "_admin.operations" list, if it exists
2372 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2373 op_list = db_nslcmop_admin.get('operations')
2374 # Create or append to the "_admin.operations" list
2375 new_op = {'member_vnf_index': vnf_index,
2376 'vdu_id': vdu_id,
2377 'vdu_count_index': vdu_count_index,
2378 'primitive': primitive,
2379 'primitive_params': mapped_primitive_params}
2380 if operationState:
2381 new_op['operationState'] = operationState
2382 if detailed_status:
2383 new_op['detailed-status'] = detailed_status
2384 if operationType:
2385 new_op['lcmOperationType'] = operationType
2386 if RO_nsr_id:
2387 new_op['RO_nsr_id'] = RO_nsr_id
2388 if RO_scaling_info:
2389 new_op['RO_scaling_info'] = RO_scaling_info
2390 if not op_list:
2391 # No existing operations, create key 'operations' with current operation as first list element
2392 db_nslcmop_admin.update({'operations': [new_op]})
2393 op_list = db_nslcmop_admin.get('operations')
2394 else:
2395 # Existing operations, append operation to list
2396 op_list.append(new_op)
2397
2398 db_nslcmop_update = {'_admin.operations': op_list}
2399 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2400 op_index = len(op_list) - 1
2401 return op_index
2402
2403 # Helper methods for scale() sub-operations
2404
2405 # pre-scale/post-scale:
2406 # Check for 3 different cases:
2407 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2408 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2409 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2410 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2411 operationType, RO_nsr_id=None, RO_scaling_info=None):
2412 # Find this sub-operation
2413 if (RO_nsr_id and RO_scaling_info):
2414 operationType = 'SCALE-RO'
2415 match = {
2416 'member_vnf_index': vnf_index,
2417 'RO_nsr_id': RO_nsr_id,
2418 'RO_scaling_info': RO_scaling_info,
2419 }
2420 else:
2421 match = {
2422 'member_vnf_index': vnf_index,
2423 'primitive': vnf_config_primitive,
2424 'primitive_params': primitive_params,
2425 'lcmOperationType': operationType
2426 }
2427 op_index = self._find_suboperation(db_nslcmop, match)
2428 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2429 # a. New sub-operation
2430 # The sub-operation does not exist, add it.
2431 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2432 # The following parameters are set to None for all kind of scaling:
2433 vdu_id = None
2434 vdu_count_index = None
2435 vdu_name = None
2436 if (RO_nsr_id and RO_scaling_info):
2437 vnf_config_primitive = None
2438 primitive_params = None
2439 else:
2440 RO_nsr_id = None
2441 RO_scaling_info = None
2442 # Initial status for sub-operation
2443 operationState = 'PROCESSING'
2444 detailed_status = 'In progress'
2445 # Add sub-operation for pre/post-scaling (zero or more operations)
2446 self._add_suboperation(db_nslcmop,
2447 vnf_index,
2448 vdu_id,
2449 vdu_count_index,
2450 vdu_name,
2451 vnf_config_primitive,
2452 primitive_params,
2453 operationState,
2454 detailed_status,
2455 operationType,
2456 RO_nsr_id,
2457 RO_scaling_info)
2458 return self.SUBOPERATION_STATUS_NEW
2459 else:
2460 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2461 # or op_index (operationState != 'COMPLETED')
2462 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2463
2464 # Function to return execution_environment id
2465
2466 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2467 # TODO vdu_index_count
2468 for vca in vca_deployed_list:
2469 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2470 return vca["ee_id"]
2471
2472 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2473 """
2474 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2475 :param logging_text:
2476 :param db_nslcmop:
2477 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2478 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2479 :param vca_index: index in the database _admin.deployed.VCA
2480 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2481 :return: None or exception
2482 """
2483 # execute terminate_primitives
2484 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2485 vdu_id = vca_deployed.get("vdu_id")
2486 vdu_count_index = vca_deployed.get("vdu_count_index")
2487 vdu_name = vca_deployed.get("vdu_name")
2488 vnf_index = vca_deployed.get("member-vnf-index")
2489 if terminate_primitives and vca_deployed.get("needed_terminate"):
2490 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2491 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2492 for seq in terminate_primitives:
2493 # For each sequence in list, get primitive and call _ns_execute_primitive()
2494 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2495 vnf_index, seq.get("name"))
2496 self.logger.debug(logging_text + step)
2497 # Create the primitive for each sequence, i.e. "primitive": "touch"
2498 primitive = seq.get('name')
2499 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2500 # The following 3 parameters are currently set to None for 'terminate':
2501 # vdu_id, vdu_count_index, vdu_name
2502
2503 # Add sub-operation
2504 self._add_suboperation(db_nslcmop,
2505 vnf_index,
2506 vdu_id,
2507 vdu_count_index,
2508 vdu_name,
2509 primitive,
2510 mapped_primitive_params)
2511 # Sub-operations: Call _ns_execute_primitive() instead of action()
2512 try:
2513 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2514 mapped_primitive_params)
2515 except LcmException:
2516 # this happens when VCA is not deployed. In this case it is not needed to terminate
2517 continue
2518 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2519 if result not in result_ok:
2520 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2521 "error {}".format(seq.get("name"), vnf_index, result_detail))
2522 # set that this VCA do not need terminated
2523 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2524 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2525
2526 if destroy_ee:
2527 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2528
2529 async def _delete_N2VC(self, nsr_id: str):
2530 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2531 namespace = "." + nsr_id
2532 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2533 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2534
2535 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2536 """
2537 Terminates a deployment from RO
2538 :param logging_text:
2539 :param nsr_deployed: db_nsr._admin.deployed
2540 :param nsr_id:
2541 :param nslcmop_id:
2542 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2543 this method will update only the index 2, but it will write on database the concatenated content of the list
2544 :return:
2545 """
2546 db_nsr_update = {}
2547 failed_detail = []
2548 ro_nsr_id = ro_delete_action = None
2549 if nsr_deployed and nsr_deployed.get("RO"):
2550 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2551 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2552 try:
2553 if ro_nsr_id:
2554 stage[2] = "Deleting ns from VIM."
2555 db_nsr_update["detailed-status"] = " ".join(stage)
2556 self._write_op_status(nslcmop_id, stage)
2557 self.logger.debug(logging_text + stage[2])
2558 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2559 self._write_op_status(nslcmop_id, stage)
2560 desc = await self.RO.delete("ns", ro_nsr_id)
2561 ro_delete_action = desc["action_id"]
2562 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2563 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2564 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2565 if ro_delete_action:
2566 # wait until NS is deleted from VIM
2567 stage[2] = "Waiting ns deleted from VIM."
2568 detailed_status_old = None
2569 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2570 ro_delete_action))
2571 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2572 self._write_op_status(nslcmop_id, stage)
2573
2574 delete_timeout = 20 * 60 # 20 minutes
2575 while delete_timeout > 0:
2576 desc = await self.RO.show(
2577 "ns",
2578 item_id_name=ro_nsr_id,
2579 extra_item="action",
2580 extra_item_id=ro_delete_action)
2581
2582 # deploymentStatus
2583 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2584
2585 ns_status, ns_status_info = self.RO.check_action_status(desc)
2586 if ns_status == "ERROR":
2587 raise ROclient.ROClientException(ns_status_info)
2588 elif ns_status == "BUILD":
2589 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2590 elif ns_status == "ACTIVE":
2591 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2592 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2593 break
2594 else:
2595 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2596 if stage[2] != detailed_status_old:
2597 detailed_status_old = stage[2]
2598 db_nsr_update["detailed-status"] = " ".join(stage)
2599 self._write_op_status(nslcmop_id, stage)
2600 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2601 await asyncio.sleep(5, loop=self.loop)
2602 delete_timeout -= 5
2603 else: # delete_timeout <= 0:
2604 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2605
2606 except Exception as e:
2607 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2608 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2609 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2610 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2611 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2612 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2613 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2614 failed_detail.append("delete conflict: {}".format(e))
2615 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2616 else:
2617 failed_detail.append("delete error: {}".format(e))
2618 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2619
2620 # Delete nsd
2621 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2622 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2623 try:
2624 stage[2] = "Deleting nsd from RO."
2625 db_nsr_update["detailed-status"] = " ".join(stage)
2626 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2627 self._write_op_status(nslcmop_id, stage)
2628 await self.RO.delete("nsd", ro_nsd_id)
2629 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2630 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2631 except Exception as e:
2632 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2633 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2634 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2635 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2636 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2637 self.logger.debug(logging_text + failed_detail[-1])
2638 else:
2639 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2640 self.logger.error(logging_text + failed_detail[-1])
2641
2642 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2643 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2644 if not vnf_deployed or not vnf_deployed["id"]:
2645 continue
2646 try:
2647 ro_vnfd_id = vnf_deployed["id"]
2648 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2649 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2650 db_nsr_update["detailed-status"] = " ".join(stage)
2651 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2652 self._write_op_status(nslcmop_id, stage)
2653 await self.RO.delete("vnfd", ro_vnfd_id)
2654 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2655 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2656 except Exception as e:
2657 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2658 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2659 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2660 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2661 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2662 self.logger.debug(logging_text + failed_detail[-1])
2663 else:
2664 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2665 self.logger.error(logging_text + failed_detail[-1])
2666
2667 if failed_detail:
2668 stage[2] = "Error deleting from VIM"
2669 else:
2670 stage[2] = "Deleted from VIM"
2671 db_nsr_update["detailed-status"] = " ".join(stage)
2672 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2673 self._write_op_status(nslcmop_id, stage)
2674
2675 if failed_detail:
2676 raise LcmException("; ".join(failed_detail))
2677
2678 async def terminate(self, nsr_id, nslcmop_id):
2679 # Try to lock HA task here
2680 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2681 if not task_is_locked_by_me:
2682 return
2683
2684 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2685 self.logger.debug(logging_text + "Enter")
2686 timeout_ns_terminate = self.timeout_ns_terminate
2687 db_nsr = None
2688 db_nslcmop = None
2689 exc = None
2690 error_list = [] # annotates all failed error messages
2691 db_nslcmop_update = {}
2692 autoremove = False # autoremove after terminated
2693 tasks_dict_info = {}
2694 db_nsr_update = {}
2695 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2696 # ^ contains [stage, step, VIM-status]
2697 try:
2698 # wait for any previous tasks in process
2699 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2700
2701 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2702 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2703 operation_params = db_nslcmop.get("operationParams") or {}
2704 if operation_params.get("timeout_ns_terminate"):
2705 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2706 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2707 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2708
2709 db_nsr_update["operational-status"] = "terminating"
2710 db_nsr_update["config-status"] = "terminating"
2711 self._write_ns_status(
2712 nsr_id=nsr_id,
2713 ns_state="TERMINATING",
2714 current_operation="TERMINATING",
2715 current_operation_id=nslcmop_id,
2716 other_update=db_nsr_update
2717 )
2718 self._write_op_status(
2719 op_id=nslcmop_id,
2720 queuePosition=0,
2721 stage=stage
2722 )
2723 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2724 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2725 return
2726
2727 stage[1] = "Getting vnf descriptors from db."
2728 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2729 db_vnfds_from_id = {}
2730 db_vnfds_from_member_index = {}
2731 # Loop over VNFRs
2732 for vnfr in db_vnfrs_list:
2733 vnfd_id = vnfr["vnfd-id"]
2734 if vnfd_id not in db_vnfds_from_id:
2735 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2736 db_vnfds_from_id[vnfd_id] = vnfd
2737 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2738
2739 # Destroy individual execution environments when there are terminating primitives.
2740 # Rest of EE will be deleted at once
2741 if not operation_params.get("skip_terminate_primitives"):
2742 stage[0] = "Stage 2/3 execute terminating primitives."
2743 stage[1] = "Looking execution environment that needs terminate."
2744 self.logger.debug(logging_text + stage[1])
2745 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2746 config_descriptor = None
2747 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2748 continue
2749 if not vca.get("member-vnf-index"):
2750 # ns
2751 config_descriptor = db_nsr.get("ns-configuration")
2752 elif vca.get("vdu_id"):
2753 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2754 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2755 if vdud:
2756 config_descriptor = vdud.get("vdu-configuration")
2757 elif vca.get("kdu_name"):
2758 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2759 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2760 if kdud:
2761 config_descriptor = kdud.get("kdu-configuration")
2762 else:
2763 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2764 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2765 vca_index, False))
2766 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2767
2768 # wait for pending tasks of terminate primitives
2769 if tasks_dict_info:
2770 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2771 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2772 min(self.timeout_charm_delete, timeout_ns_terminate),
2773 stage, nslcmop_id)
2774 if error_list:
2775 return # raise LcmException("; ".join(error_list))
2776 tasks_dict_info.clear()
2777
2778 # remove All execution environments at once
2779 stage[0] = "Stage 3/3 delete all."
2780 stage[1] = "Deleting all execution environments."
2781 self.logger.debug(logging_text + stage[1])
2782
2783 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_N2VC(nsr_id=nsr_id),
2784 timeout=self.timeout_charm_delete))
2785 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2786 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2787
2788 # Delete from k8scluster
2789 stage[1] = "Deleting KDUs."
2790 self.logger.debug(logging_text + stage[1])
2791 # print(nsr_deployed)
2792 for kdu in get_iterable(nsr_deployed, "K8s"):
2793 if not kdu or not kdu.get("kdu-instance"):
2794 continue
2795 kdu_instance = kdu.get("kdu-instance")
2796 if kdu.get("k8scluster-type") in self.k8scluster_map:
2797 task_delete_kdu_instance = asyncio.ensure_future(
2798 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2799 cluster_uuid=kdu.get("k8scluster-uuid"),
2800 kdu_instance=kdu_instance))
2801 else:
2802 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2803 format(kdu.get("k8scluster-type")))
2804 continue
2805 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2806
2807 # remove from RO
2808 stage[1] = "Deleting ns from VIM."
2809 task_delete_ro = asyncio.ensure_future(
2810 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2811 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2812
2813 # rest of staff will be done at finally
2814
2815 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2816 self.logger.error(logging_text + "Exit Exception {}".format(e))
2817 exc = e
2818 except asyncio.CancelledError:
2819 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2820 exc = "Operation was cancelled"
2821 except Exception as e:
2822 exc = traceback.format_exc()
2823 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2824 finally:
2825 if exc:
2826 error_list.append(str(exc))
2827 try:
2828 # wait for pending tasks
2829 if tasks_dict_info:
2830 stage[1] = "Waiting for terminate pending tasks."
2831 self.logger.debug(logging_text + stage[1])
2832 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2833 stage, nslcmop_id)
2834 stage[1] = stage[2] = ""
2835 except asyncio.CancelledError:
2836 error_list.append("Cancelled")
2837 # TODO cancell all tasks
2838 except Exception as exc:
2839 error_list.append(str(exc))
2840 # update status at database
2841 if error_list:
2842 error_detail = "; ".join(error_list)
2843 # self.logger.error(logging_text + error_detail)
2844 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2845 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2846
2847 db_nsr_update["operational-status"] = "failed"
2848 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2849 db_nslcmop_update["detailed-status"] = error_detail
2850 nslcmop_operation_state = "FAILED"
2851 ns_state = "BROKEN"
2852 else:
2853 error_detail = None
2854 error_description_nsr = error_description_nslcmop = None
2855 ns_state = "NOT_INSTANTIATED"
2856 db_nsr_update["operational-status"] = "terminated"
2857 db_nsr_update["detailed-status"] = "Done"
2858 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2859 db_nslcmop_update["detailed-status"] = "Done"
2860 nslcmop_operation_state = "COMPLETED"
2861
2862 if db_nsr:
2863 self._write_ns_status(
2864 nsr_id=nsr_id,
2865 ns_state=ns_state,
2866 current_operation="IDLE",
2867 current_operation_id=None,
2868 error_description=error_description_nsr,
2869 error_detail=error_detail,
2870 other_update=db_nsr_update
2871 )
2872 if db_nslcmop:
2873 self._write_op_status(
2874 op_id=nslcmop_id,
2875 stage="",
2876 error_message=error_description_nslcmop,
2877 operation_state=nslcmop_operation_state,
2878 other_update=db_nslcmop_update,
2879 )
2880 autoremove = operation_params.get("autoremove", False)
2881 if nslcmop_operation_state:
2882 try:
2883 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2884 "operationState": nslcmop_operation_state,
2885 "autoremove": autoremove},
2886 loop=self.loop)
2887 except Exception as e:
2888 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2889
2890 self.logger.debug(logging_text + "Exit")
2891 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2892
2893 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2894 time_start = time()
2895 error_detail_list = []
2896 error_list = []
2897 pending_tasks = list(created_tasks_info.keys())
2898 num_tasks = len(pending_tasks)
2899 num_done = 0
2900 stage[1] = "{}/{}.".format(num_done, num_tasks)
2901 self._write_op_status(nslcmop_id, stage)
2902 while pending_tasks:
2903 new_error = None
2904 _timeout = timeout + time_start - time()
2905 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2906 return_when=asyncio.FIRST_COMPLETED)
2907 num_done += len(done)
2908 if not done: # Timeout
2909 for task in pending_tasks:
2910 new_error = created_tasks_info[task] + ": Timeout"
2911 error_detail_list.append(new_error)
2912 error_list.append(new_error)
2913 break
2914 for task in done:
2915 if task.cancelled():
2916 exc = "Cancelled"
2917 else:
2918 exc = task.exception()
2919 if exc:
2920 if isinstance(exc, asyncio.TimeoutError):
2921 exc = "Timeout"
2922 new_error = created_tasks_info[task] + ": {}".format(exc)
2923 error_list.append(created_tasks_info[task])
2924 error_detail_list.append(new_error)
2925 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
2926 self.logger.error(logging_text + new_error)
2927 else:
2928 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2929 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2930 else:
2931 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2932 stage[1] = "{}/{}.".format(num_done, num_tasks)
2933 if new_error:
2934 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2935 if nsr_id: # update also nsr
2936 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2937 "errorDetail": ". ".join(error_detail_list)})
2938 self._write_op_status(nslcmop_id, stage)
2939 return error_detail_list
2940
2941 @staticmethod
2942 def _map_primitive_params(primitive_desc, params, instantiation_params):
2943 """
2944 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2945 The default-value is used. If it is between < > it look for a value at instantiation_params
2946 :param primitive_desc: portion of VNFD/NSD that describes primitive
2947 :param params: Params provided by user
2948 :param instantiation_params: Instantiation params provided by user
2949 :return: a dictionary with the calculated params
2950 """
2951 calculated_params = {}
2952 for parameter in primitive_desc.get("parameter", ()):
2953 param_name = parameter["name"]
2954 if param_name in params:
2955 calculated_params[param_name] = params[param_name]
2956 elif "default-value" in parameter or "value" in parameter:
2957 if "value" in parameter:
2958 calculated_params[param_name] = parameter["value"]
2959 else:
2960 calculated_params[param_name] = parameter["default-value"]
2961 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2962 and calculated_params[param_name].endswith(">"):
2963 if calculated_params[param_name][1:-1] in instantiation_params:
2964 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2965 else:
2966 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2967 format(calculated_params[param_name], primitive_desc["name"]))
2968 else:
2969 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2970 format(param_name, primitive_desc["name"]))
2971
2972 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2973 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2974 width=256)
2975 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2976 calculated_params[param_name] = calculated_params[param_name][7:]
2977
2978 # add always ns_config_info if primitive name is config
2979 if primitive_desc["name"] == "config":
2980 if "ns_config_info" in instantiation_params:
2981 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2982 return calculated_params
2983
2984 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
2985 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2986 for vca in deployed_vca:
2987 if not vca:
2988 continue
2989 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2990 continue
2991 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2992 continue
2993 if kdu_name and kdu_name != vca["kdu_name"]:
2994 continue
2995 break
2996 else:
2997 # vca_deployed not found
2998 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
2999 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3000
3001 # get ee_id
3002 ee_id = vca.get("ee_id")
3003 if not ee_id:
3004 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3005 "execution environment"
3006 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3007 return ee_id
3008
3009 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3010 retries_interval=30, timeout=None) -> (str, str):
3011 try:
3012 if primitive == "config":
3013 primitive_params = {"params": primitive_params}
3014
3015 while retries >= 0:
3016 try:
3017 output = await asyncio.wait_for(
3018 self.n2vc.exec_primitive(
3019 ee_id=ee_id,
3020 primitive_name=primitive,
3021 params_dict=primitive_params,
3022 progress_timeout=self.timeout_progress_primitive,
3023 total_timeout=self.timeout_primitive),
3024 timeout=timeout or self.timeout_primitive)
3025 # execution was OK
3026 break
3027 except asyncio.CancelledError:
3028 raise
3029 except Exception as e: # asyncio.TimeoutError
3030 if isinstance(e, asyncio.TimeoutError):
3031 e = "Timeout"
3032 retries -= 1
3033 if retries >= 0:
3034 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3035 # wait and retry
3036 await asyncio.sleep(retries_interval, loop=self.loop)
3037 else:
3038 return 'FAILED', str(e)
3039
3040 return 'COMPLETED', output
3041
3042 except (LcmException, asyncio.CancelledError):
3043 raise
3044 except Exception as e:
3045 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3046
3047 async def action(self, nsr_id, nslcmop_id):
3048
3049 # Try to lock HA task here
3050 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3051 if not task_is_locked_by_me:
3052 return
3053
3054 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3055 self.logger.debug(logging_text + "Enter")
3056 # get all needed from database
3057 db_nsr = None
3058 db_nslcmop = None
3059 db_nsr_update = {}
3060 db_nslcmop_update = {}
3061 nslcmop_operation_state = None
3062 error_description_nslcmop = None
3063 exc = None
3064 try:
3065 # wait for any previous tasks in process
3066 step = "Waiting for previous operations to terminate"
3067 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3068
3069 self._write_ns_status(
3070 nsr_id=nsr_id,
3071 ns_state=None,
3072 current_operation="RUNNING ACTION",
3073 current_operation_id=nslcmop_id
3074 )
3075
3076 step = "Getting information from database"
3077 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3078 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3079
3080 nsr_deployed = db_nsr["_admin"].get("deployed")
3081 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3082 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3083 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3084 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3085 primitive = db_nslcmop["operationParams"]["primitive"]
3086 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3087 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3088
3089 if vnf_index:
3090 step = "Getting vnfr from database"
3091 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3092 step = "Getting vnfd from database"
3093 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3094 else:
3095 step = "Getting nsd from database"
3096 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3097
3098 # for backward compatibility
3099 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3100 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3101 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3102 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3103
3104 # look for primitive
3105 config_primitive_desc = None
3106 if vdu_id:
3107 for vdu in get_iterable(db_vnfd, "vdu"):
3108 if vdu_id == vdu["id"]:
3109 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3110 if config_primitive["name"] == primitive:
3111 config_primitive_desc = config_primitive
3112 break
3113 break
3114 elif kdu_name:
3115 for kdu in get_iterable(db_vnfd, "kdu"):
3116 if kdu_name == kdu["name"]:
3117 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3118 if config_primitive["name"] == primitive:
3119 config_primitive_desc = config_primitive
3120 break
3121 break
3122 elif vnf_index:
3123 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3124 if config_primitive["name"] == primitive:
3125 config_primitive_desc = config_primitive
3126 break
3127 else:
3128 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3129 if config_primitive["name"] == primitive:
3130 config_primitive_desc = config_primitive
3131 break
3132
3133 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3134 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3135 format(primitive))
3136
3137 if vnf_index:
3138 if vdu_id:
3139 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3140 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3141 elif kdu_name:
3142 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3143 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3144 else:
3145 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3146 else:
3147 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3148
3149 # TODO check if ns is in a proper status
3150 if kdu_name and primitive in ("upgrade", "rollback", "status"):
3151 # kdur and desc_params already set from before
3152 if primitive_params:
3153 desc_params.update(primitive_params)
3154 # TODO Check if we will need something at vnf level
3155 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3156 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3157 break
3158 else:
3159 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3160
3161 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3162 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3163 raise LcmException(msg)
3164
3165 db_dict = {"collection": "nsrs",
3166 "filter": {"_id": nsr_id},
3167 "path": "_admin.deployed.K8s.{}".format(index)}
3168 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3169 step = "Executing kdu {}".format(primitive)
3170 if primitive == "upgrade":
3171 if desc_params.get("kdu_model"):
3172 kdu_model = desc_params.get("kdu_model")
3173 del desc_params["kdu_model"]
3174 else:
3175 kdu_model = kdu.get("kdu-model")
3176 parts = kdu_model.split(sep=":")
3177 if len(parts) == 2:
3178 kdu_model = parts[0]
3179
3180 detailed_status = await asyncio.wait_for(
3181 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3182 cluster_uuid=kdu.get("k8scluster-uuid"),
3183 kdu_instance=kdu.get("kdu-instance"),
3184 atomic=True, kdu_model=kdu_model,
3185 params=desc_params, db_dict=db_dict,
3186 timeout=timeout_ns_action),
3187 timeout=timeout_ns_action + 10)
3188 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3189 elif primitive == "rollback":
3190 detailed_status = await asyncio.wait_for(
3191 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3192 cluster_uuid=kdu.get("k8scluster-uuid"),
3193 kdu_instance=kdu.get("kdu-instance"),
3194 db_dict=db_dict),
3195 timeout=timeout_ns_action)
3196 elif primitive == "status":
3197 detailed_status = await asyncio.wait_for(
3198 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3199 cluster_uuid=kdu.get("k8scluster-uuid"),
3200 kdu_instance=kdu.get("kdu-instance")),
3201 timeout=timeout_ns_action)
3202
3203 if detailed_status:
3204 nslcmop_operation_state = 'COMPLETED'
3205 else:
3206 detailed_status = ''
3207 nslcmop_operation_state = 'FAILED'
3208
3209 else:
3210 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3211 self._look_for_deployed_vca(nsr_deployed["VCA"],
3212 member_vnf_index=vnf_index,
3213 vdu_id=vdu_id,
3214 vdu_count_index=vdu_count_index),
3215 primitive=primitive,
3216 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3217 timeout=timeout_ns_action)
3218
3219 db_nslcmop_update["detailed-status"] = detailed_status
3220 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3221 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3222 detailed_status))
3223 return # database update is called inside finally
3224
3225 except (DbException, LcmException, N2VCException) as e:
3226 self.logger.error(logging_text + "Exit Exception {}".format(e))
3227 exc = e
3228 except asyncio.CancelledError:
3229 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3230 exc = "Operation was cancelled"
3231 except asyncio.TimeoutError:
3232 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3233 exc = "Timeout"
3234 except Exception as e:
3235 exc = traceback.format_exc()
3236 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3237 finally:
3238 if exc:
3239 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3240 "FAILED {}: {}".format(step, exc)
3241 nslcmop_operation_state = "FAILED"
3242 if db_nsr:
3243 self._write_ns_status(
3244 nsr_id=nsr_id,
3245 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3246 current_operation="IDLE",
3247 current_operation_id=None,
3248 # error_description=error_description_nsr,
3249 # error_detail=error_detail,
3250 other_update=db_nsr_update
3251 )
3252
3253 if db_nslcmop:
3254 self._write_op_status(
3255 op_id=nslcmop_id,
3256 stage="",
3257 error_message=error_description_nslcmop,
3258 operation_state=nslcmop_operation_state,
3259 other_update=db_nslcmop_update,
3260 )
3261
3262 if nslcmop_operation_state:
3263 try:
3264 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3265 "operationState": nslcmop_operation_state},
3266 loop=self.loop)
3267 except Exception as e:
3268 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3269 self.logger.debug(logging_text + "Exit")
3270 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3271 return nslcmop_operation_state, detailed_status
3272
3273 async def scale(self, nsr_id, nslcmop_id):
3274
3275 # Try to lock HA task here
3276 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3277 if not task_is_locked_by_me:
3278 return
3279
3280 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3281 self.logger.debug(logging_text + "Enter")
3282 # get all needed from database
3283 db_nsr = None
3284 db_nslcmop = None
3285 db_nslcmop_update = {}
3286 nslcmop_operation_state = None
3287 db_nsr_update = {}
3288 exc = None
3289 # in case of error, indicates what part of scale was failed to put nsr at error status
3290 scale_process = None
3291 old_operational_status = ""
3292 old_config_status = ""
3293 vnfr_scaled = False
3294 try:
3295 # wait for any previous tasks in process
3296 step = "Waiting for previous operations to terminate"
3297 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3298
3299 self._write_ns_status(
3300 nsr_id=nsr_id,
3301 ns_state=None,
3302 current_operation="SCALING",
3303 current_operation_id=nslcmop_id
3304 )
3305
3306 step = "Getting nslcmop from database"
3307 self.logger.debug(step + " after having waited for previous tasks to be completed")
3308 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3309 step = "Getting nsr from database"
3310 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3311
3312 old_operational_status = db_nsr["operational-status"]
3313 old_config_status = db_nsr["config-status"]
3314 step = "Parsing scaling parameters"
3315 # self.logger.debug(step)
3316 db_nsr_update["operational-status"] = "scaling"
3317 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3318 nsr_deployed = db_nsr["_admin"].get("deployed")
3319
3320 #######
3321 nsr_deployed = db_nsr["_admin"].get("deployed")
3322 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3323 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3324 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3325 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3326 #######
3327
3328 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3329 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3330 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3331 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3332 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3333
3334 # for backward compatibility
3335 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3336 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3337 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3338 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3339
3340 step = "Getting vnfr from database"
3341 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3342 step = "Getting vnfd from database"
3343 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3344
3345 step = "Getting scaling-group-descriptor"
3346 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3347 if scaling_descriptor["name"] == scaling_group:
3348 break
3349 else:
3350 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3351 "at vnfd:scaling-group-descriptor".format(scaling_group))
3352
3353 # cooldown_time = 0
3354 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3355 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3356 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3357 # break
3358
3359 # TODO check if ns is in a proper status
3360 step = "Sending scale order to VIM"
3361 nb_scale_op = 0
3362 if not db_nsr["_admin"].get("scaling-group"):
3363 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3364 admin_scale_index = 0
3365 else:
3366 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3367 if admin_scale_info["name"] == scaling_group:
3368 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3369 break
3370 else: # not found, set index one plus last element and add new entry with the name
3371 admin_scale_index += 1
3372 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3373 RO_scaling_info = []
3374 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3375 if scaling_type == "SCALE_OUT":
3376 # count if max-instance-count is reached
3377 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3378 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3379 if nb_scale_op >= max_instance_count:
3380 raise LcmException("reached the limit of {} (max-instance-count) "
3381 "scaling-out operations for the "
3382 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3383
3384 nb_scale_op += 1
3385 vdu_scaling_info["scaling_direction"] = "OUT"
3386 vdu_scaling_info["vdu-create"] = {}
3387 for vdu_scale_info in scaling_descriptor["vdu"]:
3388 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3389 "type": "create", "count": vdu_scale_info.get("count", 1)})
3390 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3391
3392 elif scaling_type == "SCALE_IN":
3393 # count if min-instance-count is reached
3394 min_instance_count = 0
3395 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3396 min_instance_count = int(scaling_descriptor["min-instance-count"])
3397 if nb_scale_op <= min_instance_count:
3398 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3399 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3400 nb_scale_op -= 1
3401 vdu_scaling_info["scaling_direction"] = "IN"
3402 vdu_scaling_info["vdu-delete"] = {}
3403 for vdu_scale_info in scaling_descriptor["vdu"]:
3404 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3405 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3406 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3407
3408 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3409 vdu_create = vdu_scaling_info.get("vdu-create")
3410 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3411 if vdu_scaling_info["scaling_direction"] == "IN":
3412 for vdur in reversed(db_vnfr["vdur"]):
3413 if vdu_delete.get(vdur["vdu-id-ref"]):
3414 vdu_delete[vdur["vdu-id-ref"]] -= 1
3415 vdu_scaling_info["vdu"].append({
3416 "name": vdur["name"],
3417 "vdu_id": vdur["vdu-id-ref"],
3418 "interface": []
3419 })
3420 for interface in vdur["interfaces"]:
3421 vdu_scaling_info["vdu"][-1]["interface"].append({
3422 "name": interface["name"],
3423 "ip_address": interface["ip-address"],
3424 "mac_address": interface.get("mac-address"),
3425 })
3426 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3427
3428 # PRE-SCALE BEGIN
3429 step = "Executing pre-scale vnf-config-primitive"
3430 if scaling_descriptor.get("scaling-config-action"):
3431 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3432 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3433 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3434 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3435 step = db_nslcmop_update["detailed-status"] = \
3436 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3437
3438 # look for primitive
3439 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3440 if config_primitive["name"] == vnf_config_primitive:
3441 break
3442 else:
3443 raise LcmException(
3444 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3445 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3446 "primitive".format(scaling_group, config_primitive))
3447
3448 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3449 if db_vnfr.get("additionalParamsForVnf"):
3450 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3451
3452 scale_process = "VCA"
3453 db_nsr_update["config-status"] = "configuring pre-scaling"
3454 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3455
3456 # Pre-scale reintent check: Check if this sub-operation has been executed before
3457 op_index = self._check_or_add_scale_suboperation(
3458 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3459 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3460 # Skip sub-operation
3461 result = 'COMPLETED'
3462 result_detail = 'Done'
3463 self.logger.debug(logging_text +
3464 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3465 vnf_config_primitive, result, result_detail))
3466 else:
3467 if (op_index == self.SUBOPERATION_STATUS_NEW):
3468 # New sub-operation: Get index of this sub-operation
3469 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3470 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3471 format(vnf_config_primitive))
3472 else:
3473 # Reintent: Get registered params for this existing sub-operation
3474 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3475 vnf_index = op.get('member_vnf_index')
3476 vnf_config_primitive = op.get('primitive')
3477 primitive_params = op.get('primitive_params')
3478 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3479 format(vnf_config_primitive))
3480 # Execute the primitive, either with new (first-time) or registered (reintent) args
3481 result, result_detail = await self._ns_execute_primitive(
3482 self._look_for_deployed_vca(nsr_deployed["VCA"],
3483 member_vnf_index=vnf_index,
3484 vdu_id=None,
3485 vdu_count_index=None),
3486 vnf_config_primitive, primitive_params)
3487 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3488 vnf_config_primitive, result, result_detail))
3489 # Update operationState = COMPLETED | FAILED
3490 self._update_suboperation_status(
3491 db_nslcmop, op_index, result, result_detail)
3492
3493 if result == "FAILED":
3494 raise LcmException(result_detail)
3495 db_nsr_update["config-status"] = old_config_status
3496 scale_process = None
3497 # PRE-SCALE END
3498
3499 # SCALE RO - BEGIN
3500 # Should this block be skipped if 'RO_nsr_id' == None ?
3501 # if (RO_nsr_id and RO_scaling_info):
3502 if RO_scaling_info:
3503 scale_process = "RO"
3504 # Scale RO reintent check: Check if this sub-operation has been executed before
3505 op_index = self._check_or_add_scale_suboperation(
3506 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3507 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3508 # Skip sub-operation
3509 result = 'COMPLETED'
3510 result_detail = 'Done'
3511 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3512 result, result_detail))
3513 else:
3514 if (op_index == self.SUBOPERATION_STATUS_NEW):
3515 # New sub-operation: Get index of this sub-operation
3516 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3517 self.logger.debug(logging_text + "New sub-operation RO")
3518 else:
3519 # Reintent: Get registered params for this existing sub-operation
3520 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3521 RO_nsr_id = op.get('RO_nsr_id')
3522 RO_scaling_info = op.get('RO_scaling_info')
3523 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3524 vnf_config_primitive))
3525
3526 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3527 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3528 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3529 # wait until ready
3530 RO_nslcmop_id = RO_desc["instance_action_id"]
3531 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3532
3533 RO_task_done = False
3534 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3535 detailed_status_old = None
3536 self.logger.debug(logging_text + step)
3537
3538 deployment_timeout = 1 * 3600 # One hour
3539 while deployment_timeout > 0:
3540 if not RO_task_done:
3541 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3542 extra_item_id=RO_nslcmop_id)
3543
3544 # deploymentStatus
3545 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3546
3547 ns_status, ns_status_info = self.RO.check_action_status(desc)
3548 if ns_status == "ERROR":
3549 raise ROclient.ROClientException(ns_status_info)
3550 elif ns_status == "BUILD":
3551 detailed_status = step + "; {}".format(ns_status_info)
3552 elif ns_status == "ACTIVE":
3553 RO_task_done = True
3554 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3555 self.logger.debug(logging_text + step)
3556 else:
3557 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3558 else:
3559
3560 if ns_status == "ERROR":
3561 raise ROclient.ROClientException(ns_status_info)
3562 elif ns_status == "BUILD":
3563 detailed_status = step + "; {}".format(ns_status_info)
3564 elif ns_status == "ACTIVE":
3565 step = detailed_status = \
3566 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3567 if not vnfr_scaled:
3568 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3569 vnfr_scaled = True
3570 try:
3571 desc = await self.RO.show("ns", RO_nsr_id)
3572
3573 # deploymentStatus
3574 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3575
3576 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3577 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3578 break
3579 except LcmExceptionNoMgmtIP:
3580 pass
3581 else:
3582 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3583 if detailed_status != detailed_status_old:
3584 self._update_suboperation_status(
3585 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3586 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3587 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3588
3589 await asyncio.sleep(5, loop=self.loop)
3590 deployment_timeout -= 5
3591 if deployment_timeout <= 0:
3592 self._update_suboperation_status(
3593 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3594 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3595
3596 # update VDU_SCALING_INFO with the obtained ip_addresses
3597 if vdu_scaling_info["scaling_direction"] == "OUT":
3598 for vdur in reversed(db_vnfr["vdur"]):
3599 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3600 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3601 vdu_scaling_info["vdu"].append({
3602 "name": vdur["name"],
3603 "vdu_id": vdur["vdu-id-ref"],
3604 "interface": []
3605 })
3606 for interface in vdur["interfaces"]:
3607 vdu_scaling_info["vdu"][-1]["interface"].append({
3608 "name": interface["name"],
3609 "ip_address": interface["ip-address"],
3610 "mac_address": interface.get("mac-address"),
3611 })
3612 del vdu_scaling_info["vdu-create"]
3613
3614 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3615 # SCALE RO - END
3616
3617 scale_process = None
3618 if db_nsr_update:
3619 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3620
3621 # POST-SCALE BEGIN
3622 # execute primitive service POST-SCALING
3623 step = "Executing post-scale vnf-config-primitive"
3624 if scaling_descriptor.get("scaling-config-action"):
3625 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3626 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3627 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3628 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3629 step = db_nslcmop_update["detailed-status"] = \
3630 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3631
3632 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3633 if db_vnfr.get("additionalParamsForVnf"):
3634 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3635
3636 # look for primitive
3637 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3638 if config_primitive["name"] == vnf_config_primitive:
3639 break
3640 else:
3641 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3642 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3643 "match any vnf-configuration:config-primitive".format(scaling_group,
3644 config_primitive))
3645 scale_process = "VCA"
3646 db_nsr_update["config-status"] = "configuring post-scaling"
3647 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3648
3649 # Post-scale reintent check: Check if this sub-operation has been executed before
3650 op_index = self._check_or_add_scale_suboperation(
3651 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3652 if op_index == self.SUBOPERATION_STATUS_SKIP:
3653 # Skip sub-operation
3654 result = 'COMPLETED'
3655 result_detail = 'Done'
3656 self.logger.debug(logging_text +
3657 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3658 format(vnf_config_primitive, result, result_detail))
3659 else:
3660 if op_index == self.SUBOPERATION_STATUS_NEW:
3661 # New sub-operation: Get index of this sub-operation
3662 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3663 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3664 format(vnf_config_primitive))
3665 else:
3666 # Reintent: Get registered params for this existing sub-operation
3667 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3668 vnf_index = op.get('member_vnf_index')
3669 vnf_config_primitive = op.get('primitive')
3670 primitive_params = op.get('primitive_params')
3671 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3672 format(vnf_config_primitive))
3673 # Execute the primitive, either with new (first-time) or registered (reintent) args
3674 result, result_detail = await self._ns_execute_primitive(
3675 self._look_for_deployed_vca(nsr_deployed["VCA"],
3676 member_vnf_index=vnf_index,
3677 vdu_id=None,
3678 vdu_count_index=None),
3679 vnf_config_primitive, primitive_params)
3680 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3681 vnf_config_primitive, result, result_detail))
3682 # Update operationState = COMPLETED | FAILED
3683 self._update_suboperation_status(
3684 db_nslcmop, op_index, result, result_detail)
3685
3686 if result == "FAILED":
3687 raise LcmException(result_detail)
3688 db_nsr_update["config-status"] = old_config_status
3689 scale_process = None
3690 # POST-SCALE END
3691
3692 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3693 db_nslcmop_update["statusEnteredTime"] = time()
3694 db_nslcmop_update["detailed-status"] = "done"
3695 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3696 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3697 else old_operational_status
3698 db_nsr_update["config-status"] = old_config_status
3699 return
3700 except (ROclient.ROClientException, DbException, LcmException) as e:
3701 self.logger.error(logging_text + "Exit Exception {}".format(e))
3702 exc = e
3703 except asyncio.CancelledError:
3704 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3705 exc = "Operation was cancelled"
3706 except Exception as e:
3707 exc = traceback.format_exc()
3708 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3709 finally:
3710 self._write_ns_status(
3711 nsr_id=nsr_id,
3712 ns_state=None,
3713 current_operation="IDLE",
3714 current_operation_id=None
3715 )
3716 if exc:
3717 if db_nslcmop:
3718 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3719 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3720 db_nslcmop_update["statusEnteredTime"] = time()
3721 if db_nsr:
3722 db_nsr_update["operational-status"] = old_operational_status
3723 db_nsr_update["config-status"] = old_config_status
3724 db_nsr_update["detailed-status"] = ""
3725 if scale_process:
3726 if "VCA" in scale_process:
3727 db_nsr_update["config-status"] = "failed"
3728 if "RO" in scale_process:
3729 db_nsr_update["operational-status"] = "failed"
3730 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3731 exc)
3732 try:
3733 if db_nslcmop and db_nslcmop_update:
3734 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3735 if db_nsr:
3736 self._write_ns_status(
3737 nsr_id=nsr_id,
3738 ns_state=None,
3739 current_operation="IDLE",
3740 current_operation_id=None,
3741 other_update=db_nsr_update
3742 )
3743
3744 except DbException as e:
3745 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3746 if nslcmop_operation_state:
3747 try:
3748 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3749 "operationState": nslcmop_operation_state},
3750 loop=self.loop)
3751 # if cooldown_time:
3752 # await asyncio.sleep(cooldown_time, loop=self.loop)
3753 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3754 except Exception as e:
3755 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3756 self.logger.debug(logging_text + "Exit")
3757 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")