some fixes at ns
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52 timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
53
54 SUBOPERATION_STATUS_NOT_FOUND = -1
55 SUBOPERATION_STATUS_NEW = -2
56 SUBOPERATION_STATUS_SKIP = -3
57 task_name_deploy_vca = "Deploying VCA"
58
59 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65 super().__init__(
66 db=db,
67 msg=msg,
68 fs=fs,
69 logger=logging.getLogger('lcm.ns')
70 )
71
72 self.loop = loop
73 self.lcm_tasks = lcm_tasks
74 self.timeout = config["timeout"]
75 self.ro_config = config["ro_config"]
76 self.vca_config = config["VCA"].copy()
77
78 # create N2VC connector
79 self.n2vc = N2VCJujuConnector(
80 db=self.db,
81 fs=self.fs,
82 log=self.logger,
83 loop=self.loop,
84 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
85 username=self.vca_config.get('user', None),
86 vca_config=self.vca_config,
87 on_update_db=self._on_update_n2vc_db
88 )
89
90 self.k8sclusterhelm = K8sHelmConnector(
91 kubectl_command=self.vca_config.get("kubectlpath"),
92 helm_command=self.vca_config.get("helmpath"),
93 fs=self.fs,
94 log=self.logger,
95 db=self.db,
96 on_update_db=None,
97 )
98
99 self.k8sclusterjuju = K8sJujuConnector(
100 kubectl_command=self.vca_config.get("kubectlpath"),
101 juju_command=self.vca_config.get("jujupath"),
102 fs=self.fs,
103 log=self.logger,
104 db=self.db,
105 on_update_db=None,
106 )
107
108 self.k8scluster_map = {
109 "helm-chart": self.k8sclusterhelm,
110 "chart": self.k8sclusterhelm,
111 "juju-bundle": self.k8sclusterjuju,
112 "juju": self.k8sclusterjuju,
113 }
114 # create RO client
115 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
116
117 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
118
119 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
120
121 try:
122 # TODO filter RO descriptor fields...
123
124 # write to database
125 db_dict = dict()
126 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
127 db_dict['deploymentStatus'] = ro_descriptor
128 self.update_db_2("nsrs", nsrs_id, db_dict)
129
130 except Exception as e:
131 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
132
133 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
134
135 # remove last dot from path (if exists)
136 if path.endswith('.'):
137 path = path[:-1]
138
139 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
140 # .format(table, filter, path, updated_data))
141
142 try:
143
144 nsr_id = filter.get('_id')
145
146 # read ns record from database
147 nsr = self.db.get_one(table='nsrs', q_filter=filter)
148 current_ns_status = nsr.get('nsState')
149
150 # get vca status for NS
151 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
152
153 # vcaStatus
154 db_dict = dict()
155 db_dict['vcaStatus'] = status_dict
156
157 # update configurationStatus for this VCA
158 try:
159 vca_index = int(path[path.rfind(".")+1:])
160
161 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
162 vca_status = vca_list[vca_index].get('status')
163
164 configuration_status_list = nsr.get('configurationStatus')
165 config_status = configuration_status_list[vca_index].get('status')
166
167 if config_status == 'BROKEN' and vca_status != 'failed':
168 db_dict['configurationStatus'][vca_index] = 'READY'
169 elif config_status != 'BROKEN' and vca_status == 'failed':
170 db_dict['configurationStatus'][vca_index] = 'BROKEN'
171 except Exception as e:
172 # not update configurationStatus
173 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
174
175 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
176 # if nsState = 'DEGRADED' check if all is OK
177 is_degraded = False
178 if current_ns_status in ('READY', 'DEGRADED'):
179 error_description = ''
180 # check machines
181 if status_dict.get('machines'):
182 for machine_id in status_dict.get('machines'):
183 machine = status_dict.get('machines').get(machine_id)
184 # check machine agent-status
185 if machine.get('agent-status'):
186 s = machine.get('agent-status').get('status')
187 if s != 'started':
188 is_degraded = True
189 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
190 # check machine instance status
191 if machine.get('instance-status'):
192 s = machine.get('instance-status').get('status')
193 if s != 'running':
194 is_degraded = True
195 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
196 # check applications
197 if status_dict.get('applications'):
198 for app_id in status_dict.get('applications'):
199 app = status_dict.get('applications').get(app_id)
200 # check application status
201 if app.get('status'):
202 s = app.get('status').get('status')
203 if s != 'active':
204 is_degraded = True
205 error_description += 'application {} status={} ; '.format(app_id, s)
206
207 if error_description:
208 db_dict['errorDescription'] = error_description
209 if current_ns_status == 'READY' and is_degraded:
210 db_dict['nsState'] = 'DEGRADED'
211 if current_ns_status == 'DEGRADED' and not is_degraded:
212 db_dict['nsState'] = 'READY'
213
214 # write to database
215 self.update_db_2("nsrs", nsr_id, db_dict)
216
217 except (asyncio.CancelledError, asyncio.TimeoutError):
218 raise
219 except Exception as e:
220 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
221
222 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
223 """
224 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
225 :param vnfd: input vnfd
226 :param new_id: overrides vnf id if provided
227 :param additionalParams: Instantiation params for VNFs provided
228 :param nsrId: Id of the NSR
229 :return: copy of vnfd
230 """
231 try:
232 vnfd_RO = deepcopy(vnfd)
233 # remove unused by RO configuration, monitoring, scaling and internal keys
234 vnfd_RO.pop("_id", None)
235 vnfd_RO.pop("_admin", None)
236 vnfd_RO.pop("vnf-configuration", None)
237 vnfd_RO.pop("monitoring-param", None)
238 vnfd_RO.pop("scaling-group-descriptor", None)
239 vnfd_RO.pop("kdu", None)
240 vnfd_RO.pop("k8s-cluster", None)
241 if new_id:
242 vnfd_RO["id"] = new_id
243
244 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
245 for vdu in get_iterable(vnfd_RO, "vdu"):
246 cloud_init_file = None
247 if vdu.get("cloud-init-file"):
248 base_folder = vnfd["_admin"]["storage"]
249 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
250 vdu["cloud-init-file"])
251 with self.fs.file_open(cloud_init_file, "r") as ci_file:
252 cloud_init_content = ci_file.read()
253 vdu.pop("cloud-init-file", None)
254 elif vdu.get("cloud-init"):
255 cloud_init_content = vdu["cloud-init"]
256 else:
257 continue
258
259 env = Environment()
260 ast = env.parse(cloud_init_content)
261 mandatory_vars = meta.find_undeclared_variables(ast)
262 if mandatory_vars:
263 for var in mandatory_vars:
264 if not additionalParams or var not in additionalParams.keys():
265 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
266 "file, must be provided in the instantiation parameters inside the "
267 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
268 template = Template(cloud_init_content)
269 cloud_init_content = template.render(additionalParams or {})
270 vdu["cloud-init"] = cloud_init_content
271
272 return vnfd_RO
273 except FsException as e:
274 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
275 format(vnfd["id"], vdu["id"], cloud_init_file, e))
276 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd["id"], vdu["id"], e))
279
280 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
281 """
282 Creates a RO ns descriptor from OSM ns_instantiate params
283 :param ns_params: OSM instantiate params
284 :return: The RO ns descriptor
285 """
286 vim_2_RO = {}
287 wim_2_RO = {}
288 # TODO feature 1417: Check that no instantiation is set over PDU
289 # check if PDU forces a concrete vim-network-id and add it
290 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
291
292 def vim_account_2_RO(vim_account):
293 if vim_account in vim_2_RO:
294 return vim_2_RO[vim_account]
295
296 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
297 if db_vim["_admin"]["operationalState"] != "ENABLED":
298 raise LcmException("VIM={} is not available. operationalState={}".format(
299 vim_account, db_vim["_admin"]["operationalState"]))
300 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
301 vim_2_RO[vim_account] = RO_vim_id
302 return RO_vim_id
303
304 def wim_account_2_RO(wim_account):
305 if isinstance(wim_account, str):
306 if wim_account in wim_2_RO:
307 return wim_2_RO[wim_account]
308
309 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
310 if db_wim["_admin"]["operationalState"] != "ENABLED":
311 raise LcmException("WIM={} is not available. operationalState={}".format(
312 wim_account, db_wim["_admin"]["operationalState"]))
313 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
314 wim_2_RO[wim_account] = RO_wim_id
315 return RO_wim_id
316 else:
317 return wim_account
318
319 def ip_profile_2_RO(ip_profile):
320 RO_ip_profile = deepcopy((ip_profile))
321 if "dns-server" in RO_ip_profile:
322 if isinstance(RO_ip_profile["dns-server"], list):
323 RO_ip_profile["dns-address"] = []
324 for ds in RO_ip_profile.pop("dns-server"):
325 RO_ip_profile["dns-address"].append(ds['address'])
326 else:
327 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
328 if RO_ip_profile.get("ip-version") == "ipv4":
329 RO_ip_profile["ip-version"] = "IPv4"
330 if RO_ip_profile.get("ip-version") == "ipv6":
331 RO_ip_profile["ip-version"] = "IPv6"
332 if "dhcp-params" in RO_ip_profile:
333 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
334 return RO_ip_profile
335
336 if not ns_params:
337 return None
338 RO_ns_params = {
339 # "name": ns_params["nsName"],
340 # "description": ns_params.get("nsDescription"),
341 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
342 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
343 # "scenario": ns_params["nsdId"],
344 }
345
346 n2vc_key_list = n2vc_key_list or []
347 for vnfd_ref, vnfd in vnfd_dict.items():
348 vdu_needed_access = []
349 mgmt_cp = None
350 if vnfd.get("vnf-configuration"):
351 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required and vnfd.get("mgmt-interface"):
353 if vnfd["mgmt-interface"].get("vdu-id"):
354 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
355 elif vnfd["mgmt-interface"].get("cp"):
356 mgmt_cp = vnfd["mgmt-interface"]["cp"]
357
358 for vdu in vnfd.get("vdu", ()):
359 if vdu.get("vdu-configuration"):
360 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
361 if ssh_required:
362 vdu_needed_access.append(vdu["id"])
363 elif mgmt_cp:
364 for vdu_interface in vdu.get("interface"):
365 if vdu_interface.get("external-connection-point-ref") and \
366 vdu_interface["external-connection-point-ref"] == mgmt_cp:
367 vdu_needed_access.append(vdu["id"])
368 mgmt_cp = None
369 break
370
371 if vdu_needed_access:
372 for vnf_member in nsd.get("constituent-vnfd"):
373 if vnf_member["vnfd-id-ref"] != vnfd_ref:
374 continue
375 for vdu in vdu_needed_access:
376 populate_dict(RO_ns_params,
377 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
378 n2vc_key_list)
379
380 if ns_params.get("vduImage"):
381 RO_ns_params["vduImage"] = ns_params["vduImage"]
382
383 if ns_params.get("ssh_keys"):
384 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
385 for vnf_params in get_iterable(ns_params, "vnf"):
386 for constituent_vnfd in nsd["constituent-vnfd"]:
387 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
388 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
389 break
390 else:
391 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
392 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
393 if vnf_params.get("vimAccountId"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
395 vim_account_2_RO(vnf_params["vimAccountId"]))
396
397 for vdu_params in get_iterable(vnf_params, "vdu"):
398 # TODO feature 1417: check that this VDU exist and it is not a PDU
399 if vdu_params.get("volume"):
400 for volume_params in vdu_params["volume"]:
401 if volume_params.get("vim-volume-id"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
404 volume_params["vim-volume-id"])
405 if vdu_params.get("interface"):
406 for interface_params in vdu_params["interface"]:
407 if interface_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
409 vdu_params["id"], "interfaces", interface_params["name"],
410 "ip_address"),
411 interface_params["ip-address"])
412 if interface_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
414 vdu_params["id"], "interfaces", interface_params["name"],
415 "mac_address"),
416 interface_params["mac-address"])
417 if interface_params.get("floating-ip-required"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
419 vdu_params["id"], "interfaces", interface_params["name"],
420 "floating-ip"),
421 interface_params["floating-ip-required"])
422
423 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
424 if internal_vld_params.get("vim-network-name"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
426 internal_vld_params["name"], "vim-network-name"),
427 internal_vld_params["vim-network-name"])
428 if internal_vld_params.get("vim-network-id"):
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "vim-network-id"),
431 internal_vld_params["vim-network-id"])
432 if internal_vld_params.get("ip-profile"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
434 internal_vld_params["name"], "ip-profile"),
435 ip_profile_2_RO(internal_vld_params["ip-profile"]))
436 if internal_vld_params.get("provider-network"):
437
438 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
439 internal_vld_params["name"], "provider-network"),
440 internal_vld_params["provider-network"].copy())
441
442 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
443 # look for interface
444 iface_found = False
445 for vdu_descriptor in vnf_descriptor["vdu"]:
446 for vdu_interface in vdu_descriptor["interface"]:
447 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
448 if icp_params.get("ip-address"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_descriptor["id"], "interfaces",
451 vdu_interface["name"], "ip_address"),
452 icp_params["ip-address"])
453
454 if icp_params.get("mac-address"):
455 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 vdu_interface["name"], "mac_address"),
458 icp_params["mac-address"])
459 iface_found = True
460 break
461 if iface_found:
462 break
463 else:
464 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
465 "internal-vld:id-ref={} is not present at vnfd:internal-"
466 "connection-point".format(vnf_params["member-vnf-index"],
467 icp_params["id-ref"]))
468
469 for vld_params in get_iterable(ns_params, "vld"):
470 if "ip-profile" in vld_params:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
472 ip_profile_2_RO(vld_params["ip-profile"]))
473
474 if vld_params.get("provider-network"):
475
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
477 vld_params["provider-network"].copy())
478
479 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
480 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
481 wim_account_2_RO(vld_params["wimAccountId"])),
482 if vld_params.get("vim-network-name"):
483 RO_vld_sites = []
484 if isinstance(vld_params["vim-network-name"], dict):
485 for vim_account, vim_net in vld_params["vim-network-name"].items():
486 RO_vld_sites.append({
487 "netmap-use": vim_net,
488 "datacenter": vim_account_2_RO(vim_account)
489 })
490 else: # isinstance str
491 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
492 if RO_vld_sites:
493 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
494
495 if vld_params.get("vim-network-id"):
496 RO_vld_sites = []
497 if isinstance(vld_params["vim-network-id"], dict):
498 for vim_account, vim_net in vld_params["vim-network-id"].items():
499 RO_vld_sites.append({
500 "netmap-use": vim_net,
501 "datacenter": vim_account_2_RO(vim_account)
502 })
503 else: # isinstance str
504 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
505 if RO_vld_sites:
506 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
507 if vld_params.get("ns-net"):
508 if isinstance(vld_params["ns-net"], dict):
509 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
510 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
512 if "vnfd-connection-point-ref" in vld_params:
513 for cp_params in vld_params["vnfd-connection-point-ref"]:
514 # look for interface
515 for constituent_vnfd in nsd["constituent-vnfd"]:
516 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
517 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
518 break
519 else:
520 raise LcmException(
521 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
522 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
523 match_cp = False
524 for vdu_descriptor in vnf_descriptor["vdu"]:
525 for interface_descriptor in vdu_descriptor["interface"]:
526 if interface_descriptor.get("external-connection-point-ref") == \
527 cp_params["vnfd-connection-point-ref"]:
528 match_cp = True
529 break
530 if match_cp:
531 break
532 else:
533 raise LcmException(
534 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
535 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
536 cp_params["member-vnf-index-ref"],
537 cp_params["vnfd-connection-point-ref"],
538 vnf_descriptor["id"]))
539 if cp_params.get("ip-address"):
540 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
541 vdu_descriptor["id"], "interfaces",
542 interface_descriptor["name"], "ip_address"),
543 cp_params["ip-address"])
544 if cp_params.get("mac-address"):
545 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
546 vdu_descriptor["id"], "interfaces",
547 interface_descriptor["name"], "mac_address"),
548 cp_params["mac-address"])
549 return RO_ns_params
550
551 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
552 # make a copy to do not change
553 vdu_create = copy(vdu_create)
554 vdu_delete = copy(vdu_delete)
555
556 vdurs = db_vnfr.get("vdur")
557 if vdurs is None:
558 vdurs = []
559 vdu_index = len(vdurs)
560 while vdu_index:
561 vdu_index -= 1
562 vdur = vdurs[vdu_index]
563 if vdur.get("pdu-type"):
564 continue
565 vdu_id_ref = vdur["vdu-id-ref"]
566 if vdu_create and vdu_create.get(vdu_id_ref):
567 for index in range(0, vdu_create[vdu_id_ref]):
568 vdur = deepcopy(vdur)
569 vdur["_id"] = str(uuid4())
570 vdur["count-index"] += 1
571 vdurs.insert(vdu_index+1+index, vdur)
572 del vdu_create[vdu_id_ref]
573 if vdu_delete and vdu_delete.get(vdu_id_ref):
574 del vdurs[vdu_index]
575 vdu_delete[vdu_id_ref] -= 1
576 if not vdu_delete[vdu_id_ref]:
577 del vdu_delete[vdu_id_ref]
578 # check all operations are done
579 if vdu_create or vdu_delete:
580 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
581 vdu_create))
582 if vdu_delete:
583 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
584 vdu_delete))
585
586 vnfr_update = {"vdur": vdurs}
587 db_vnfr["vdur"] = vdurs
588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
589
590 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
591 """
592 Updates database nsr with the RO info for the created vld
593 :param ns_update_nsr: dictionary to be filled with the updated info
594 :param db_nsr: content of db_nsr. This is also modified
595 :param nsr_desc_RO: nsr descriptor from RO
596 :return: Nothing, LcmException is raised on errors
597 """
598
599 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
600 for net_RO in get_iterable(nsr_desc_RO, "nets"):
601 if vld["id"] != net_RO.get("ns_net_osm_id"):
602 continue
603 vld["vim-id"] = net_RO.get("vim_net_id")
604 vld["name"] = net_RO.get("vim_name")
605 vld["status"] = net_RO.get("status")
606 vld["status-detailed"] = net_RO.get("error_msg")
607 ns_update_nsr["vld.{}".format(vld_index)] = vld
608 break
609 else:
610 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
611
612 def set_vnfr_at_error(self, db_vnfrs, error_text):
613 try:
614 for db_vnfr in db_vnfrs.values():
615 vnfr_update = {"status": "ERROR"}
616 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
617 if "status" not in vdur:
618 vdur["status"] = "ERROR"
619 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
620 if error_text:
621 vdur["status-detailed"] = str(error_text)
622 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
623 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
624 except DbException as e:
625 self.logger.error("Cannot update vnf. {}".format(e))
626
627 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
628 """
629 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
630 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
631 :param nsr_desc_RO: nsr descriptor from RO
632 :return: Nothing, LcmException is raised on errors
633 """
634 for vnf_index, db_vnfr in db_vnfrs.items():
635 for vnf_RO in nsr_desc_RO["vnfs"]:
636 if vnf_RO["member_vnf_index"] != vnf_index:
637 continue
638 vnfr_update = {}
639 if vnf_RO.get("ip_address"):
640 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
641 elif not db_vnfr.get("ip-address"):
642 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
643 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
644
645 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
646 vdur_RO_count_index = 0
647 if vdur.get("pdu-type"):
648 continue
649 for vdur_RO in get_iterable(vnf_RO, "vms"):
650 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
651 continue
652 if vdur["count-index"] != vdur_RO_count_index:
653 vdur_RO_count_index += 1
654 continue
655 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
656 if vdur_RO.get("ip_address"):
657 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
658 else:
659 vdur["ip-address"] = None
660 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
661 vdur["name"] = vdur_RO.get("vim_name")
662 vdur["status"] = vdur_RO.get("status")
663 vdur["status-detailed"] = vdur_RO.get("error_msg")
664 for ifacer in get_iterable(vdur, "interfaces"):
665 for interface_RO in get_iterable(vdur_RO, "interfaces"):
666 if ifacer["name"] == interface_RO.get("internal_name"):
667 ifacer["ip-address"] = interface_RO.get("ip_address")
668 ifacer["mac-address"] = interface_RO.get("mac_address")
669 break
670 else:
671 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info"
673 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
678 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
679
680 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
681 for net_RO in get_iterable(nsr_desc_RO, "nets"):
682 if vld["id"] != net_RO.get("vnf_net_osm_id"):
683 continue
684 vld["vim-id"] = net_RO.get("vim_net_id")
685 vld["name"] = net_RO.get("vim_name")
686 vld["status"] = net_RO.get("status")
687 vld["status-detailed"] = net_RO.get("error_msg")
688 vnfr_update["vld.{}".format(vld_index)] = vld
689 break
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
692 vnf_index, vld["id"]))
693
694 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
695 break
696
697 else:
698 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
699
700 def _get_ns_config_info(self, nsr_id):
701 """
702 Generates a mapping between vnf,vdu elements and the N2VC id
703 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
704 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
705 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
706 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
707 """
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
710 mapping = {}
711 ns_config_info = {"osm-config-mapping": mapping}
712 for vca in vca_deployed_list:
713 if not vca["member-vnf-index"]:
714 continue
715 if not vca["vdu_id"]:
716 mapping[vca["member-vnf-index"]] = vca["application"]
717 else:
718 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
719 vca["application"]
720 return ns_config_info
721
722 @staticmethod
723 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
724 """
725 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
726 primitives as verify-ssh-credentials, or config when needed
727 :param desc_primitive_list: information of the descriptor
728 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
729 this element contains a ssh public key
730 :return: The modified list. Can ba an empty list, but always a list
731 """
732 if desc_primitive_list:
733 primitive_list = desc_primitive_list.copy()
734 else:
735 primitive_list = []
736 # look for primitive config, and get the position. None if not present
737 config_position = None
738 for index, primitive in enumerate(primitive_list):
739 if primitive["name"] == "config":
740 config_position = index
741 break
742
743 # for NS, add always a config primitive if not present (bug 874)
744 if not vca_deployed["member-vnf-index"] and config_position is None:
745 primitive_list.insert(0, {"name": "config", "parameter": []})
746 config_position = 0
747 # for VNF/VDU add verify-ssh-credentials after config
748 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
749 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
750 return primitive_list
751
752 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
753 n2vc_key_list, stage):
754 try:
755 db_nsr_update = {}
756 RO_descriptor_number = 0 # number of descriptors created at RO
757 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
758 nslcmop_id = db_nslcmop["_id"]
759 start_deploy = time()
760 ns_params = db_nslcmop.get("operationParams")
761 if ns_params and ns_params.get("timeout_ns_deploy"):
762 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
763 else:
764 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
765
766 # Check for and optionally request placement optimization. Database will be updated if placement activated
767 stage[2] = "Waiting for Placement."
768 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
769
770 # deploy RO
771
772 # get vnfds, instantiate at RO
773 for c_vnf in nsd.get("constituent-vnfd", ()):
774 member_vnf_index = c_vnf["member-vnf-index"]
775 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
776 vnfd_ref = vnfd["id"]
777
778 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
779 db_nsr_update["detailed-status"] = " ".join(stage)
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 self._write_op_status(nslcmop_id, stage)
782
783 # self.logger.debug(logging_text + stage[2])
784 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
785 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
786 RO_descriptor_number += 1
787
788 # look position at deployed.RO.vnfd if not present it will be appended at the end
789 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
790 if vnf_deployed["member-vnf-index"] == member_vnf_index:
791 break
792 else:
793 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
794 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
795
796 # look if present
797 RO_update = {"member-vnf-index": member_vnf_index}
798 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
799 if vnfd_list:
800 RO_update["id"] = vnfd_list[0]["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
802 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
803 else:
804 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
805 get("additionalParamsForVnf"), nsr_id)
806 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
807 RO_update["id"] = desc["uuid"]
808 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
809 vnfd_ref, member_vnf_index, desc["uuid"]))
810 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
811 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
812
813 # create nsd at RO
814 nsd_ref = nsd["id"]
815
816 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
817 db_nsr_update["detailed-status"] = " ".join(stage)
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819 self._write_op_status(nslcmop_id, stage)
820
821 # self.logger.debug(logging_text + stage[2])
822 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
823 RO_descriptor_number += 1
824 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
825 if nsd_list:
826 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
827 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
828 nsd_ref, RO_nsd_uuid))
829 else:
830 nsd_RO = deepcopy(nsd)
831 nsd_RO["id"] = RO_osm_nsd_id
832 nsd_RO.pop("_id", None)
833 nsd_RO.pop("_admin", None)
834 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
835 member_vnf_index = c_vnf["member-vnf-index"]
836 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
837 for c_vld in nsd_RO.get("vld", ()):
838 for cp in c_vld.get("vnfd-connection-point-ref", ()):
839 member_vnf_index = cp["member-vnf-index-ref"]
840 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
841
842 desc = await self.RO.create("nsd", descriptor=nsd_RO)
843 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
844 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
845 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 # Crate ns at RO
849 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853
854 # if present use it unless in error status
855 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
856 if RO_nsr_id:
857 try:
858 stage[2] = "Looking for existing ns at RO"
859 db_nsr_update["detailed-status"] = " ".join(stage)
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._write_op_status(nslcmop_id, stage)
862 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
863 desc = await self.RO.show("ns", RO_nsr_id)
864
865 except ROclient.ROClientException as e:
866 if e.http_code != HTTPStatus.NOT_FOUND:
867 raise
868 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
869 if RO_nsr_id:
870 ns_status, ns_status_info = self.RO.check_ns_status(desc)
871 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
872 if ns_status == "ERROR":
873 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
874 self.logger.debug(logging_text + stage[2])
875 await self.RO.delete("ns", RO_nsr_id)
876 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
877 if not RO_nsr_id:
878 stage[2] = "Checking dependencies"
879 db_nsr_update["detailed-status"] = " ".join(stage)
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 self._write_op_status(nslcmop_id, stage)
882 # self.logger.debug(logging_text + stage[2])
883
884 # check if VIM is creating and wait look if previous tasks in process
885 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
886 if task_dependency:
887 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
888 self.logger.debug(logging_text + stage[2])
889 await asyncio.wait(task_dependency, timeout=3600)
890 if ns_params.get("vnf"):
891 for vnf in ns_params["vnf"]:
892 if "vimAccountId" in vnf:
893 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
894 vnf["vimAccountId"])
895 if task_dependency:
896 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
897 self.logger.debug(logging_text + stage[2])
898 await asyncio.wait(task_dependency, timeout=3600)
899
900 stage[2] = "Checking instantiation parameters."
901 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
902 stage[2] = "Deploying ns at VIM."
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
908 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
911 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
912
913 # wait until NS is ready
914 stage[2] = "Waiting VIM to deploy ns."
915 db_nsr_update["detailed-status"] = " ".join(stage)
916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
917 self._write_op_status(nslcmop_id, stage)
918 detailed_status_old = None
919 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
920
921 old_desc = None
922 while time() <= start_deploy + timeout_ns_deploy:
923 desc = await self.RO.show("ns", RO_nsr_id)
924
925 # deploymentStatus
926 if desc != old_desc:
927 # desc has changed => update db
928 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
929 old_desc = desc
930
931 ns_status, ns_status_info = self.RO.check_ns_status(desc)
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
933 if ns_status == "ERROR":
934 raise ROclient.ROClientException(ns_status_info)
935 elif ns_status == "BUILD":
936 stage[2] = "VIM: ({})".format(ns_status_info)
937 elif ns_status == "ACTIVE":
938 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
939 try:
940 self.ns_update_vnfr(db_vnfrs, desc)
941 break
942 except LcmExceptionNoMgmtIP:
943 pass
944 else:
945 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
946 if stage[2] != detailed_status_old:
947 detailed_status_old = stage[2]
948 db_nsr_update["detailed-status"] = " ".join(stage)
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950 self._write_op_status(nslcmop_id, stage)
951 await asyncio.sleep(5, loop=self.loop)
952 else: # timeout_ns_deploy
953 raise ROclient.ROClientException("Timeout waiting ns to be ready")
954
955 # Updating NSR
956 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
957
958 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
959 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
960 stage[2] = "Deployed at VIM"
961 db_nsr_update["detailed-status"] = " ".join(stage)
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 self._write_op_status(nslcmop_id, stage)
964 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
965 # self.logger.debug(logging_text + "Deployed at VIM")
966 except (ROclient.ROClientException, LcmException, DbException) as e:
967 stage[2] = "ERROR deploying at VIM"
968 self.set_vnfr_at_error(db_vnfrs, str(e))
969 raise
970
971 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
972 """
973 Wait for ip addres at RO, and optionally, insert public key in virtual machine
974 :param logging_text: prefix use for logging
975 :param nsr_id:
976 :param vnfr_id:
977 :param vdu_id:
978 :param vdu_index:
979 :param pub_key: public ssh key to inject, None to skip
980 :param user: user to apply the public ssh key
981 :return: IP address
982 """
983
984 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
985 ro_nsr_id = None
986 ip_address = None
987 nb_tries = 0
988 target_vdu_id = None
989 ro_retries = 0
990
991 while True:
992
993 ro_retries += 1
994 if ro_retries >= 360: # 1 hour
995 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
996
997 await asyncio.sleep(10, loop=self.loop)
998
999 # get ip address
1000 if not target_vdu_id:
1001 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1002
1003 if not vdu_id: # for the VNF case
1004 if db_vnfr.get("status") == "ERROR":
1005 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1006 ip_address = db_vnfr.get("ip-address")
1007 if not ip_address:
1008 continue
1009 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1010 else: # VDU case
1011 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1012 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1013
1014 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1015 vdur = db_vnfr["vdur"][0]
1016 if not vdur:
1017 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1018 vdu_index))
1019
1020 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1021 ip_address = vdur.get("ip-address")
1022 if not ip_address:
1023 continue
1024 target_vdu_id = vdur["vdu-id-ref"]
1025 elif vdur.get("status") == "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1027
1028 if not target_vdu_id:
1029 continue
1030
1031 # inject public key into machine
1032 if pub_key and user:
1033 # wait until NS is deployed at RO
1034 if not ro_nsr_id:
1035 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1036 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1037 if not ro_nsr_id:
1038 continue
1039
1040 # self.logger.debug(logging_text + "Inserting RO key")
1041 if vdur.get("pdu-type"):
1042 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1043 return ip_address
1044 try:
1045 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1046 result_dict = await self.RO.create_action(
1047 item="ns",
1048 item_id_name=ro_nsr_id,
1049 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1050 )
1051 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1052 if not result_dict or not isinstance(result_dict, dict):
1053 raise LcmException("Unknown response from RO when injecting key")
1054 for result in result_dict.values():
1055 if result.get("vim_result") == 200:
1056 break
1057 else:
1058 raise ROclient.ROClientException("error injecting key: {}".format(
1059 result.get("description")))
1060 break
1061 except ROclient.ROClientException as e:
1062 if not nb_tries:
1063 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1064 format(e, 20*10))
1065 nb_tries += 1
1066 if nb_tries >= 20:
1067 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1068 else:
1069 break
1070
1071 return ip_address
1072
1073 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1074 """
1075 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1076 """
1077 my_vca = vca_deployed_list[vca_index]
1078 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1079 # vdu or kdu: no dependencies
1080 return
1081 timeout = 300
1082 while timeout >= 0:
1083 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1084 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1085 configuration_status_list = db_nsr["configurationStatus"]
1086 for index, vca_deployed in enumerate(configuration_status_list):
1087 if index == vca_index:
1088 # myself
1089 continue
1090 if not my_vca.get("member-vnf-index") or \
1091 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1092 internal_status = configuration_status_list[index].get("status")
1093 if internal_status == 'READY':
1094 continue
1095 elif internal_status == 'BROKEN':
1096 raise LcmException("Configuration aborted because dependent charm/s has failed")
1097 else:
1098 break
1099 else:
1100 # no dependencies, return
1101 return
1102 await asyncio.sleep(10)
1103 timeout -= 1
1104
1105 raise LcmException("Configuration aborted because dependent charm/s timeout")
1106
1107 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1108 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1109 nsr_id = db_nsr["_id"]
1110 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1113 db_dict = {
1114 'collection': 'nsrs',
1115 'filter': {'_id': nsr_id},
1116 'path': db_update_entry
1117 }
1118 step = ""
1119 try:
1120
1121 element_type = 'NS'
1122 element_under_configuration = nsr_id
1123
1124 vnfr_id = None
1125 if db_vnfr:
1126 vnfr_id = db_vnfr["_id"]
1127
1128 namespace = "{nsi}.{ns}".format(
1129 nsi=nsi_id if nsi_id else "",
1130 ns=nsr_id)
1131
1132 if vnfr_id:
1133 element_type = 'VNF'
1134 element_under_configuration = vnfr_id
1135 namespace += ".{}".format(vnfr_id)
1136 if vdu_id:
1137 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1138 element_type = 'VDU'
1139 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1140 elif kdu_name:
1141 namespace += ".{}".format(kdu_name)
1142 element_type = 'KDU'
1143 element_under_configuration = kdu_name
1144
1145 # Get artifact path
1146 self.fs.sync() # Sync from FSMongo
1147 artifact_path = "{}/{}/charms/{}".format(
1148 base_folder["folder"],
1149 base_folder["pkg-dir"],
1150 config_descriptor["juju"]["charm"]
1151 )
1152
1153 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1154 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1155 is_proxy_charm = False
1156
1157 # n2vc_redesign STEP 3.1
1158
1159 # find old ee_id if exists
1160 ee_id = vca_deployed.get("ee_id")
1161
1162 # create or register execution environment in VCA
1163 if is_proxy_charm:
1164
1165 self._write_configuration_status(
1166 nsr_id=nsr_id,
1167 vca_index=vca_index,
1168 status='CREATING',
1169 element_under_configuration=element_under_configuration,
1170 element_type=element_type
1171 )
1172
1173 step = "create execution environment"
1174 self.logger.debug(logging_text + step)
1175 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1176 reuse_ee_id=ee_id,
1177 db_dict=db_dict)
1178
1179 else:
1180 step = "Waiting to VM being up and getting IP address"
1181 self.logger.debug(logging_text + step)
1182 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1183 user=None, pub_key=None)
1184 credentials = {"hostname": rw_mgmt_ip}
1185 # get username
1186 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1187 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1188 # merged. Meanwhile let's get username from initial-config-primitive
1189 if not username and config_descriptor.get("initial-config-primitive"):
1190 for config_primitive in config_descriptor["initial-config-primitive"]:
1191 for param in config_primitive.get("parameter", ()):
1192 if param["name"] == "ssh-username":
1193 username = param["value"]
1194 break
1195 if not username:
1196 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1197 "'config-access.ssh-access.default-user'")
1198 credentials["username"] = username
1199 # n2vc_redesign STEP 3.2
1200
1201 self._write_configuration_status(
1202 nsr_id=nsr_id,
1203 vca_index=vca_index,
1204 status='REGISTERING',
1205 element_under_configuration=element_under_configuration,
1206 element_type=element_type
1207 )
1208
1209 step = "register execution environment {}".format(credentials)
1210 self.logger.debug(logging_text + step)
1211 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1212 db_dict=db_dict)
1213
1214 # for compatibility with MON/POL modules, the need model and application name at database
1215 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1216 ee_id_parts = ee_id.split('.')
1217 model_name = ee_id_parts[0]
1218 application_name = ee_id_parts[1]
1219 db_nsr_update = {db_update_entry + "model": model_name,
1220 db_update_entry + "application": application_name,
1221 db_update_entry + "ee_id": ee_id}
1222
1223 # n2vc_redesign STEP 3.3
1224
1225 step = "Install configuration Software"
1226
1227 self._write_configuration_status(
1228 nsr_id=nsr_id,
1229 vca_index=vca_index,
1230 status='INSTALLING SW',
1231 element_under_configuration=element_under_configuration,
1232 element_type=element_type,
1233 other_update=db_nsr_update
1234 )
1235
1236 # TODO check if already done
1237 self.logger.debug(logging_text + step)
1238 config = None
1239 if not is_proxy_charm:
1240 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1241 if initial_config_primitive_list:
1242 for primitive in initial_config_primitive_list:
1243 if primitive["name"] == "config":
1244 config = self._map_primitive_params(
1245 primitive,
1246 {},
1247 deploy_params
1248 )
1249 break
1250 await self.n2vc.install_configuration_sw(
1251 ee_id=ee_id,
1252 artifact_path=artifact_path,
1253 db_dict=db_dict,
1254 config=config
1255 )
1256
1257 # write in db flag of configuration_sw already installed
1258 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1259
1260 # add relations for this VCA (wait for other peers related with this VCA)
1261 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1262
1263 # if SSH access is required, then get execution environment SSH public
1264 if is_proxy_charm: # if native charm we have waited already to VM be UP
1265 pub_key = None
1266 user = None
1267 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1268 # Needed to inject a ssh key
1269 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1270 step = "Install configuration Software, getting public ssh key"
1271 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1272
1273 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1274 else:
1275 step = "Waiting to VM being up and getting IP address"
1276 self.logger.debug(logging_text + step)
1277
1278 # n2vc_redesign STEP 5.1
1279 # wait for RO (ip-address) Insert pub_key into VM
1280 if vnfr_id:
1281 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1282 user=user, pub_key=pub_key)
1283 else:
1284 rw_mgmt_ip = None # This is for a NS configuration
1285
1286 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1287
1288 # store rw_mgmt_ip in deploy params for later replacement
1289 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1290
1291 # n2vc_redesign STEP 6 Execute initial config primitive
1292 step = 'execute initial config primitive'
1293 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1294
1295 # sort initial config primitives by 'seq'
1296 if initial_config_primitive_list:
1297 try:
1298 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1299 except Exception as e:
1300 self.logger.error(logging_text + step + ": " + str(e))
1301 else:
1302 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1303
1304 # add config if not present for NS charm
1305 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1306 vca_deployed)
1307
1308 # wait for dependent primitives execution (NS -> VNF -> VDU)
1309 if initial_config_primitive_list:
1310 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1311
1312 # stage, in function of element type: vdu, kdu, vnf or ns
1313 my_vca = vca_deployed_list[vca_index]
1314 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1315 # VDU or KDU
1316 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1317 elif my_vca.get("member-vnf-index"):
1318 # VNF
1319 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1320 else:
1321 # NS
1322 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1323
1324 self._write_configuration_status(
1325 nsr_id=nsr_id,
1326 vca_index=vca_index,
1327 status='EXECUTING PRIMITIVE'
1328 )
1329
1330 self._write_op_status(
1331 op_id=nslcmop_id,
1332 stage=stage
1333 )
1334
1335 check_if_terminated_needed = True
1336 for initial_config_primitive in initial_config_primitive_list:
1337 # adding information on the vca_deployed if it is a NS execution environment
1338 if not vca_deployed["member-vnf-index"]:
1339 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1340 # TODO check if already done
1341 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1342
1343 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1344 self.logger.debug(logging_text + step)
1345 await self.n2vc.exec_primitive(
1346 ee_id=ee_id,
1347 primitive_name=initial_config_primitive["name"],
1348 params_dict=primitive_params_,
1349 db_dict=db_dict
1350 )
1351 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1352 if check_if_terminated_needed:
1353 if config_descriptor.get('terminate-config-primitive'):
1354 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1355 check_if_terminated_needed = False
1356
1357 # TODO register in database that primitive is done
1358
1359 step = "instantiated at VCA"
1360 self.logger.debug(logging_text + step)
1361
1362 self._write_configuration_status(
1363 nsr_id=nsr_id,
1364 vca_index=vca_index,
1365 status='READY'
1366 )
1367
1368 except Exception as e: # TODO not use Exception but N2VC exception
1369 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1370 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1371 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1372 self._write_configuration_status(
1373 nsr_id=nsr_id,
1374 vca_index=vca_index,
1375 status='BROKEN'
1376 )
1377 raise LcmException("{} {}".format(step, e)) from e
1378
1379 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1380 error_description: str = None, error_detail: str = None, other_update: dict = None):
1381 """
1382 Update db_nsr fields.
1383 :param nsr_id:
1384 :param ns_state:
1385 :param current_operation:
1386 :param current_operation_id:
1387 :param error_description:
1388 :param error_detail:
1389 :param other_update: Other required changes at database if provided, will be cleared
1390 :return:
1391 """
1392 try:
1393 db_dict = other_update or {}
1394 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1395 db_dict["_admin.current-operation"] = current_operation_id
1396 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1397 db_dict["currentOperation"] = current_operation
1398 db_dict["currentOperationID"] = current_operation_id
1399 db_dict["errorDescription"] = error_description
1400 db_dict["errorDetail"] = error_detail
1401
1402 if ns_state:
1403 db_dict["nsState"] = ns_state
1404 self.update_db_2("nsrs", nsr_id, db_dict)
1405 except DbException as e:
1406 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1407
1408 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1409 operation_state: str = None, other_update: dict = None):
1410 try:
1411 db_dict = other_update or {}
1412 db_dict['queuePosition'] = queuePosition
1413 if isinstance(stage, list):
1414 db_dict['stage'] = stage[0]
1415 db_dict['detailed-status'] = " ".join(stage)
1416 elif stage is not None:
1417 db_dict['stage'] = str(stage)
1418
1419 if error_message is not None:
1420 db_dict['errorMessage'] = error_message
1421 if operation_state is not None:
1422 db_dict['operationState'] = operation_state
1423 db_dict["statusEnteredTime"] = time()
1424 self.update_db_2("nslcmops", op_id, db_dict)
1425 except DbException as e:
1426 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1427
1428 def _write_all_config_status(self, db_nsr: dict, status: str):
1429 try:
1430 nsr_id = db_nsr["_id"]
1431 # configurationStatus
1432 config_status = db_nsr.get('configurationStatus')
1433 if config_status:
1434 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1435 enumerate(config_status) if v}
1436 # update status
1437 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1438
1439 except DbException as e:
1440 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1441
1442 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1443 element_under_configuration: str = None, element_type: str = None,
1444 other_update: dict = None):
1445
1446 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1447 # .format(vca_index, status))
1448
1449 try:
1450 db_path = 'configurationStatus.{}.'.format(vca_index)
1451 db_dict = other_update or {}
1452 if status:
1453 db_dict[db_path + 'status'] = status
1454 if element_under_configuration:
1455 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1456 if element_type:
1457 db_dict[db_path + 'elementType'] = element_type
1458 self.update_db_2("nsrs", nsr_id, db_dict)
1459 except DbException as e:
1460 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1461 .format(status, nsr_id, vca_index, e))
1462
1463 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1464 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1465 if placement_engine == "PLA":
1466 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1467 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1468 db_poll_interval = 5
1469 wait = db_poll_interval * 4
1470 pla_result = None
1471 while not pla_result and wait >= 0:
1472 await asyncio.sleep(db_poll_interval)
1473 wait -= db_poll_interval
1474 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1475 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1476
1477 if not pla_result:
1478 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1479
1480 for pla_vnf in pla_result['vnf']:
1481 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1482 if not pla_vnf.get('vimAccountId') or not vnfr:
1483 continue
1484 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1485 return
1486
1487 def update_nsrs_with_pla_result(self, params):
1488 try:
1489 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1490 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1491 except Exception as e:
1492 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1493
1494 async def instantiate(self, nsr_id, nslcmop_id):
1495 """
1496
1497 :param nsr_id: ns instance to deploy
1498 :param nslcmop_id: operation to run
1499 :return:
1500 """
1501
1502 # Try to lock HA task here
1503 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1504 if not task_is_locked_by_me:
1505 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1506 return
1507
1508 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1509 self.logger.debug(logging_text + "Enter")
1510
1511 # get all needed from database
1512
1513 # database nsrs record
1514 db_nsr = None
1515
1516 # database nslcmops record
1517 db_nslcmop = None
1518
1519 # update operation on nsrs
1520 db_nsr_update = {}
1521 # update operation on nslcmops
1522 db_nslcmop_update = {}
1523
1524 nslcmop_operation_state = None
1525 db_vnfrs = {} # vnf's info indexed by member-index
1526 # n2vc_info = {}
1527 tasks_dict_info = {} # from task to info text
1528 exc = None
1529 error_list = []
1530 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1531 # ^ stage, step, VIM progress
1532 try:
1533 # wait for any previous tasks in process
1534 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1535
1536 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1537 stage[1] = "Reading from database,"
1538 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1539 db_nsr_update["detailed-status"] = "creating"
1540 db_nsr_update["operational-status"] = "init"
1541 self._write_ns_status(
1542 nsr_id=nsr_id,
1543 ns_state="BUILDING",
1544 current_operation="INSTANTIATING",
1545 current_operation_id=nslcmop_id,
1546 other_update=db_nsr_update
1547 )
1548 self._write_op_status(
1549 op_id=nslcmop_id,
1550 stage=stage,
1551 queuePosition=0
1552 )
1553
1554 # read from db: operation
1555 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1556 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1557 ns_params = db_nslcmop.get("operationParams")
1558 if ns_params and ns_params.get("timeout_ns_deploy"):
1559 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1560 else:
1561 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1562
1563 # read from db: ns
1564 stage[1] = "Getting nsr={} from db".format(nsr_id)
1565 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1566 # nsd is replicated into ns (no db read)
1567 nsd = db_nsr["nsd"]
1568 # nsr_name = db_nsr["name"] # TODO short-name??
1569
1570 # read from db: vnf's of this ns
1571 stage[1] = "Getting vnfrs from db"
1572 self.logger.debug(logging_text + stage[1])
1573 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1574
1575 # read from db: vnfd's for every vnf
1576 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1577 db_vnfds = {} # every vnfd data indexed by vnf id
1578 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1579
1580 # for each vnf in ns, read vnfd
1581 for vnfr in db_vnfrs_list:
1582 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1583 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1584 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1585 # if we haven't this vnfd, read it from db
1586 if vnfd_id not in db_vnfds:
1587 # read from db
1588 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1589 self.logger.debug(logging_text + stage[1])
1590 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1591
1592 # store vnfd
1593 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1594 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1595 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1596
1597 # Get or generates the _admin.deployed.VCA list
1598 vca_deployed_list = None
1599 if db_nsr["_admin"].get("deployed"):
1600 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1601 if vca_deployed_list is None:
1602 vca_deployed_list = []
1603 configuration_status_list = []
1604 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1605 db_nsr_update["configurationStatus"] = configuration_status_list
1606 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1607 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1608 elif isinstance(vca_deployed_list, dict):
1609 # maintain backward compatibility. Change a dict to list at database
1610 vca_deployed_list = list(vca_deployed_list.values())
1611 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1612 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1613
1614 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1615 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1616 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1617
1618 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1619 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1620 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1621
1622 # n2vc_redesign STEP 2 Deploy Network Scenario
1623 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1624 self._write_op_status(
1625 op_id=nslcmop_id,
1626 stage=stage
1627 )
1628
1629 stage[1] = "Deploying KDUs,"
1630 # self.logger.debug(logging_text + "Before deploy_kdus")
1631 # Call to deploy_kdus in case exists the "vdu:kdu" param
1632 await self.deploy_kdus(
1633 logging_text=logging_text,
1634 nsr_id=nsr_id,
1635 nslcmop_id=nslcmop_id,
1636 db_vnfrs=db_vnfrs,
1637 db_vnfds=db_vnfds,
1638 task_instantiation_info=tasks_dict_info,
1639 )
1640
1641 stage[1] = "Getting VCA public key."
1642 # n2vc_redesign STEP 1 Get VCA public ssh-key
1643 # feature 1429. Add n2vc public key to needed VMs
1644 n2vc_key = self.n2vc.get_public_key()
1645 n2vc_key_list = [n2vc_key]
1646 if self.vca_config.get("public_key"):
1647 n2vc_key_list.append(self.vca_config["public_key"])
1648
1649 stage[1] = "Deploying NS at VIM."
1650 task_ro = asyncio.ensure_future(
1651 self.instantiate_RO(
1652 logging_text=logging_text,
1653 nsr_id=nsr_id,
1654 nsd=nsd,
1655 db_nsr=db_nsr,
1656 db_nslcmop=db_nslcmop,
1657 db_vnfrs=db_vnfrs,
1658 db_vnfds_ref=db_vnfds_ref,
1659 n2vc_key_list=n2vc_key_list,
1660 stage=stage
1661 )
1662 )
1663 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1664 tasks_dict_info[task_ro] = "Deploying at VIM"
1665
1666 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1667 stage[1] = "Deploying Execution Environments."
1668 self.logger.debug(logging_text + stage[1])
1669
1670 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1671 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1672 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1673 vnfd_id = c_vnf["vnfd-id-ref"]
1674 vnfd = db_vnfds_ref[vnfd_id]
1675 member_vnf_index = str(c_vnf["member-vnf-index"])
1676 db_vnfr = db_vnfrs[member_vnf_index]
1677 base_folder = vnfd["_admin"]["storage"]
1678 vdu_id = None
1679 vdu_index = 0
1680 vdu_name = None
1681 kdu_name = None
1682
1683 # Get additional parameters
1684 deploy_params = {}
1685 if db_vnfr.get("additionalParamsForVnf"):
1686 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1687
1688 descriptor_config = vnfd.get("vnf-configuration")
1689 if descriptor_config and descriptor_config.get("juju"):
1690 self._deploy_n2vc(
1691 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1692 db_nsr=db_nsr,
1693 db_vnfr=db_vnfr,
1694 nslcmop_id=nslcmop_id,
1695 nsr_id=nsr_id,
1696 nsi_id=nsi_id,
1697 vnfd_id=vnfd_id,
1698 vdu_id=vdu_id,
1699 kdu_name=kdu_name,
1700 member_vnf_index=member_vnf_index,
1701 vdu_index=vdu_index,
1702 vdu_name=vdu_name,
1703 deploy_params=deploy_params,
1704 descriptor_config=descriptor_config,
1705 base_folder=base_folder,
1706 task_instantiation_info=tasks_dict_info,
1707 stage=stage
1708 )
1709
1710 # Deploy charms for each VDU that supports one.
1711 for vdud in get_iterable(vnfd, 'vdu'):
1712 vdu_id = vdud["id"]
1713 descriptor_config = vdud.get('vdu-configuration')
1714 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1715 if vdur.get("additionalParams"):
1716 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1717 else:
1718 deploy_params_vdu = deploy_params
1719 if descriptor_config and descriptor_config.get("juju"):
1720 # look for vdu index in the db_vnfr["vdu"] section
1721 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1722 # if vdur["vdu-id-ref"] == vdu_id:
1723 # break
1724 # else:
1725 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1726 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1727 # vdu_name = vdur.get("name")
1728 vdu_name = None
1729 kdu_name = None
1730 for vdu_index in range(int(vdud.get("count", 1))):
1731 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1732 self._deploy_n2vc(
1733 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1734 member_vnf_index, vdu_id, vdu_index),
1735 db_nsr=db_nsr,
1736 db_vnfr=db_vnfr,
1737 nslcmop_id=nslcmop_id,
1738 nsr_id=nsr_id,
1739 nsi_id=nsi_id,
1740 vnfd_id=vnfd_id,
1741 vdu_id=vdu_id,
1742 kdu_name=kdu_name,
1743 member_vnf_index=member_vnf_index,
1744 vdu_index=vdu_index,
1745 vdu_name=vdu_name,
1746 deploy_params=deploy_params_vdu,
1747 descriptor_config=descriptor_config,
1748 base_folder=base_folder,
1749 task_instantiation_info=tasks_dict_info,
1750 stage=stage
1751 )
1752 for kdud in get_iterable(vnfd, 'kdu'):
1753 kdu_name = kdud["name"]
1754 descriptor_config = kdud.get('kdu-configuration')
1755 if descriptor_config and descriptor_config.get("juju"):
1756 vdu_id = None
1757 vdu_index = 0
1758 vdu_name = None
1759 # look for vdu index in the db_vnfr["vdu"] section
1760 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1761 # if vdur["vdu-id-ref"] == vdu_id:
1762 # break
1763 # else:
1764 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1765 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1766 # vdu_name = vdur.get("name")
1767 # vdu_name = None
1768
1769 self._deploy_n2vc(
1770 logging_text=logging_text,
1771 db_nsr=db_nsr,
1772 db_vnfr=db_vnfr,
1773 nslcmop_id=nslcmop_id,
1774 nsr_id=nsr_id,
1775 nsi_id=nsi_id,
1776 vnfd_id=vnfd_id,
1777 vdu_id=vdu_id,
1778 kdu_name=kdu_name,
1779 member_vnf_index=member_vnf_index,
1780 vdu_index=vdu_index,
1781 vdu_name=vdu_name,
1782 deploy_params=deploy_params,
1783 descriptor_config=descriptor_config,
1784 base_folder=base_folder,
1785 task_instantiation_info=tasks_dict_info,
1786 stage=stage
1787 )
1788
1789 # Check if this NS has a charm configuration
1790 descriptor_config = nsd.get("ns-configuration")
1791 if descriptor_config and descriptor_config.get("juju"):
1792 vnfd_id = None
1793 db_vnfr = None
1794 member_vnf_index = None
1795 vdu_id = None
1796 kdu_name = None
1797 vdu_index = 0
1798 vdu_name = None
1799
1800 # Get additional parameters
1801 deploy_params = {}
1802 if db_nsr.get("additionalParamsForNs"):
1803 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1804 base_folder = nsd["_admin"]["storage"]
1805 self._deploy_n2vc(
1806 logging_text=logging_text,
1807 db_nsr=db_nsr,
1808 db_vnfr=db_vnfr,
1809 nslcmop_id=nslcmop_id,
1810 nsr_id=nsr_id,
1811 nsi_id=nsi_id,
1812 vnfd_id=vnfd_id,
1813 vdu_id=vdu_id,
1814 kdu_name=kdu_name,
1815 member_vnf_index=member_vnf_index,
1816 vdu_index=vdu_index,
1817 vdu_name=vdu_name,
1818 deploy_params=deploy_params,
1819 descriptor_config=descriptor_config,
1820 base_folder=base_folder,
1821 task_instantiation_info=tasks_dict_info,
1822 stage=stage
1823 )
1824
1825 # rest of staff will be done at finally
1826
1827 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1828 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1829 exc = e
1830 except asyncio.CancelledError:
1831 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1832 exc = "Operation was cancelled"
1833 except Exception as e:
1834 exc = traceback.format_exc()
1835 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1836 finally:
1837 if exc:
1838 error_list.append(str(exc))
1839 try:
1840 # wait for pending tasks
1841 if tasks_dict_info:
1842 stage[1] = "Waiting for instantiate pending tasks."
1843 self.logger.debug(logging_text + stage[1])
1844 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1845 stage, nslcmop_id, nsr_id=nsr_id)
1846 stage[1] = stage[2] = ""
1847 except asyncio.CancelledError:
1848 error_list.append("Cancelled")
1849 # TODO cancel all tasks
1850 except Exception as exc:
1851 error_list.append(str(exc))
1852
1853 # update operation-status
1854 db_nsr_update["operational-status"] = "running"
1855 # let's begin with VCA 'configured' status (later we can change it)
1856 db_nsr_update["config-status"] = "configured"
1857 for task, task_name in tasks_dict_info.items():
1858 if not task.done() or task.cancelled() or task.exception():
1859 if task_name.startswith(self.task_name_deploy_vca):
1860 # A N2VC task is pending
1861 db_nsr_update["config-status"] = "failed"
1862 else:
1863 # RO or KDU task is pending
1864 db_nsr_update["operational-status"] = "failed"
1865
1866 # update status at database
1867 if error_list:
1868 error_detail = ". ".join(error_list)
1869 self.logger.error(logging_text + error_detail)
1870 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1871 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1872
1873 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1874 db_nslcmop_update["detailed-status"] = error_detail
1875 nslcmop_operation_state = "FAILED"
1876 ns_state = "BROKEN"
1877 else:
1878 error_detail = None
1879 error_description_nsr = error_description_nslcmop = None
1880 ns_state = "READY"
1881 db_nsr_update["detailed-status"] = "Done"
1882 db_nslcmop_update["detailed-status"] = "Done"
1883 nslcmop_operation_state = "COMPLETED"
1884
1885 if db_nsr:
1886 self._write_ns_status(
1887 nsr_id=nsr_id,
1888 ns_state=ns_state,
1889 current_operation="IDLE",
1890 current_operation_id=None,
1891 error_description=error_description_nsr,
1892 error_detail=error_detail,
1893 other_update=db_nsr_update
1894 )
1895 if db_nslcmop:
1896 self._write_op_status(
1897 op_id=nslcmop_id,
1898 stage="",
1899 error_message=error_description_nslcmop,
1900 operation_state=nslcmop_operation_state,
1901 other_update=db_nslcmop_update,
1902 )
1903
1904 if nslcmop_operation_state:
1905 try:
1906 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1907 "operationState": nslcmop_operation_state},
1908 loop=self.loop)
1909 except Exception as e:
1910 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1911
1912 self.logger.debug(logging_text + "Exit")
1913 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1914
1915 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1916
1917 # steps:
1918 # 1. find all relations for this VCA
1919 # 2. wait for other peers related
1920 # 3. add relations
1921
1922 try:
1923
1924 # STEP 1: find all relations for this VCA
1925
1926 # read nsr record
1927 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1928
1929 # this VCA data
1930 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1931
1932 # read all ns-configuration relations
1933 ns_relations = list()
1934 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1935 if db_ns_relations:
1936 for r in db_ns_relations:
1937 # check if this VCA is in the relation
1938 if my_vca.get('member-vnf-index') in\
1939 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1940 ns_relations.append(r)
1941
1942 # read all vnf-configuration relations
1943 vnf_relations = list()
1944 db_vnfd_list = db_nsr.get('vnfd-id')
1945 if db_vnfd_list:
1946 for vnfd in db_vnfd_list:
1947 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1948 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1949 if db_vnf_relations:
1950 for r in db_vnf_relations:
1951 # check if this VCA is in the relation
1952 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1953 vnf_relations.append(r)
1954
1955 # if no relations, terminate
1956 if not ns_relations and not vnf_relations:
1957 self.logger.debug(logging_text + ' No relations')
1958 return True
1959
1960 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1961
1962 # add all relations
1963 start = time()
1964 while True:
1965 # check timeout
1966 now = time()
1967 if now - start >= timeout:
1968 self.logger.error(logging_text + ' : timeout adding relations')
1969 return False
1970
1971 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1972 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1973
1974 # for each defined NS relation, find the VCA's related
1975 for r in ns_relations:
1976 from_vca_ee_id = None
1977 to_vca_ee_id = None
1978 from_vca_endpoint = None
1979 to_vca_endpoint = None
1980 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1981 for vca in vca_list:
1982 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1983 and vca.get('config_sw_installed'):
1984 from_vca_ee_id = vca.get('ee_id')
1985 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1986 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1987 and vca.get('config_sw_installed'):
1988 to_vca_ee_id = vca.get('ee_id')
1989 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1990 if from_vca_ee_id and to_vca_ee_id:
1991 # add relation
1992 await self.n2vc.add_relation(
1993 ee_id_1=from_vca_ee_id,
1994 ee_id_2=to_vca_ee_id,
1995 endpoint_1=from_vca_endpoint,
1996 endpoint_2=to_vca_endpoint)
1997 # remove entry from relations list
1998 ns_relations.remove(r)
1999 else:
2000 # check failed peers
2001 try:
2002 vca_status_list = db_nsr.get('configurationStatus')
2003 if vca_status_list:
2004 for i in range(len(vca_list)):
2005 vca = vca_list[i]
2006 vca_status = vca_status_list[i]
2007 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2008 if vca_status.get('status') == 'BROKEN':
2009 # peer broken: remove relation from list
2010 ns_relations.remove(r)
2011 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2012 if vca_status.get('status') == 'BROKEN':
2013 # peer broken: remove relation from list
2014 ns_relations.remove(r)
2015 except Exception:
2016 # ignore
2017 pass
2018
2019 # for each defined VNF relation, find the VCA's related
2020 for r in vnf_relations:
2021 from_vca_ee_id = None
2022 to_vca_ee_id = None
2023 from_vca_endpoint = None
2024 to_vca_endpoint = None
2025 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2026 for vca in vca_list:
2027 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2028 from_vca_ee_id = vca.get('ee_id')
2029 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2030 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2031 to_vca_ee_id = vca.get('ee_id')
2032 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2033 if from_vca_ee_id and to_vca_ee_id:
2034 # add relation
2035 await self.n2vc.add_relation(
2036 ee_id_1=from_vca_ee_id,
2037 ee_id_2=to_vca_ee_id,
2038 endpoint_1=from_vca_endpoint,
2039 endpoint_2=to_vca_endpoint)
2040 # remove entry from relations list
2041 vnf_relations.remove(r)
2042 else:
2043 # check failed peers
2044 try:
2045 vca_status_list = db_nsr.get('configurationStatus')
2046 if vca_status_list:
2047 for i in range(len(vca_list)):
2048 vca = vca_list[i]
2049 vca_status = vca_status_list[i]
2050 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2051 if vca_status.get('status') == 'BROKEN':
2052 # peer broken: remove relation from list
2053 ns_relations.remove(r)
2054 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2055 if vca_status.get('status') == 'BROKEN':
2056 # peer broken: remove relation from list
2057 ns_relations.remove(r)
2058 except Exception:
2059 # ignore
2060 pass
2061
2062 # wait for next try
2063 await asyncio.sleep(5.0)
2064
2065 if not ns_relations and not vnf_relations:
2066 self.logger.debug('Relations added')
2067 break
2068
2069 return True
2070
2071 except Exception as e:
2072 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2073 return False
2074
2075 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2076 # Launch kdus if present in the descriptor
2077
2078 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2079
2080 def _get_cluster_id(cluster_id, cluster_type):
2081 nonlocal k8scluster_id_2_uuic
2082 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2083 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2084
2085 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2086 if not db_k8scluster:
2087 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2088 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2089 if not k8s_id:
2090 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2091 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2092 return k8s_id
2093
2094 logging_text += "Deploy kdus: "
2095 step = ""
2096 try:
2097 db_nsr_update = {"_admin.deployed.K8s": []}
2098 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2099
2100 index = 0
2101 updated_cluster_list = []
2102
2103 for vnfr_data in db_vnfrs.values():
2104 for kdur in get_iterable(vnfr_data, "kdur"):
2105 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2106 vnfd_id = vnfr_data.get('vnfd-id')
2107 if kdur.get("helm-chart"):
2108 kdumodel = kdur["helm-chart"]
2109 k8sclustertype = "helm-chart"
2110 elif kdur.get("juju-bundle"):
2111 kdumodel = kdur["juju-bundle"]
2112 k8sclustertype = "juju-bundle"
2113 else:
2114 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2115 "juju-bundle. Maybe an old NBI version is running".
2116 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2117 # check if kdumodel is a file and exists
2118 try:
2119 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2120 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2121 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2122 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
2123 kdumodel)
2124 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2125 kdumodel = self.fs.path + filename
2126 except (asyncio.TimeoutError, asyncio.CancelledError):
2127 raise
2128 except Exception: # it is not a file
2129 pass
2130
2131 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2132 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2133 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2134
2135 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2136 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2137 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2138 if del_repo_list or added_repo_dict:
2139 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2140 updated = {'_admin.helm_charts_added.' +
2141 item: name for item, name in added_repo_dict.items()}
2142 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2143 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2144 added_repo_dict))
2145 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2146 updated_cluster_list.append(cluster_uuid)
2147
2148 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2149 kdur["kdu-name"], k8s_cluster_id)
2150
2151 k8s_instace_info = {"kdu-instance": None,
2152 "k8scluster-uuid": cluster_uuid,
2153 "k8scluster-type": k8sclustertype,
2154 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2155 "kdu-name": kdur["kdu-name"],
2156 "kdu-model": kdumodel}
2157 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2159
2160 db_dict = {"collection": "nsrs",
2161 "filter": {"_id": nsr_id},
2162 "path": "_admin.deployed.K8s.{}".format(index)}
2163
2164 task = asyncio.ensure_future(
2165 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2166 atomic=True, params=desc_params,
2167 db_dict=db_dict, timeout=600,
2168 kdu_name=kdur["kdu-name"]))
2169
2170 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2171 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2172
2173 index += 1
2174
2175 except (LcmException, asyncio.CancelledError):
2176 raise
2177 except Exception as e:
2178 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2179 if isinstance(e, (N2VCException, DbException)):
2180 self.logger.error(logging_text + msg)
2181 else:
2182 self.logger.critical(logging_text + msg, exc_info=True)
2183 raise LcmException(msg)
2184 finally:
2185 if db_nsr_update:
2186 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2187
2188 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2189 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2190 base_folder, task_instantiation_info, stage):
2191 # launch instantiate_N2VC in a asyncio task and register task object
2192 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2193 # if not found, create one entry and update database
2194
2195 # fill db_nsr._admin.deployed.VCA.<index>
2196 vca_index = -1
2197 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2198 if not vca_deployed:
2199 continue
2200 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2201 vca_deployed.get("vdu_id") == vdu_id and \
2202 vca_deployed.get("kdu_name") == kdu_name and \
2203 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2204 break
2205 else:
2206 # not found, create one.
2207 vca_deployed = {
2208 "member-vnf-index": member_vnf_index,
2209 "vdu_id": vdu_id,
2210 "kdu_name": kdu_name,
2211 "vdu_count_index": vdu_index,
2212 "operational-status": "init", # TODO revise
2213 "detailed-status": "", # TODO revise
2214 "step": "initial-deploy", # TODO revise
2215 "vnfd_id": vnfd_id,
2216 "vdu_name": vdu_name,
2217 }
2218 vca_index += 1
2219
2220 # create VCA and configurationStatus in db
2221 db_dict = {
2222 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2223 "configurationStatus.{}".format(vca_index): dict()
2224 }
2225 self.update_db_2("nsrs", nsr_id, db_dict)
2226
2227 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2228
2229 # Launch task
2230 task_n2vc = asyncio.ensure_future(
2231 self.instantiate_N2VC(
2232 logging_text=logging_text,
2233 vca_index=vca_index,
2234 nsi_id=nsi_id,
2235 db_nsr=db_nsr,
2236 db_vnfr=db_vnfr,
2237 vdu_id=vdu_id,
2238 kdu_name=kdu_name,
2239 vdu_index=vdu_index,
2240 deploy_params=deploy_params,
2241 config_descriptor=descriptor_config,
2242 base_folder=base_folder,
2243 nslcmop_id=nslcmop_id,
2244 stage=stage
2245 )
2246 )
2247 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2248 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2249 member_vnf_index or "", vdu_id or "")
2250
2251 # Check if this VNFD has a configured terminate action
2252 def _has_terminate_config_primitive(self, vnfd):
2253 vnf_config = vnfd.get("vnf-configuration")
2254 if vnf_config and vnf_config.get("terminate-config-primitive"):
2255 return True
2256 else:
2257 return False
2258
2259 @staticmethod
2260 def _get_terminate_config_primitive_seq_list(vnfd):
2261 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2262 # No need to check for existing primitive twice, already done before
2263 vnf_config = vnfd.get("vnf-configuration")
2264 seq_list = vnf_config.get("terminate-config-primitive")
2265 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2266 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2267 return seq_list_sorted
2268
2269 @staticmethod
2270 def _create_nslcmop(nsr_id, operation, params):
2271 """
2272 Creates a ns-lcm-opp content to be stored at database.
2273 :param nsr_id: internal id of the instance
2274 :param operation: instantiate, terminate, scale, action, ...
2275 :param params: user parameters for the operation
2276 :return: dictionary following SOL005 format
2277 """
2278 # Raise exception if invalid arguments
2279 if not (nsr_id and operation and params):
2280 raise LcmException(
2281 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2282 now = time()
2283 _id = str(uuid4())
2284 nslcmop = {
2285 "id": _id,
2286 "_id": _id,
2287 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2288 "operationState": "PROCESSING",
2289 "statusEnteredTime": now,
2290 "nsInstanceId": nsr_id,
2291 "lcmOperationType": operation,
2292 "startTime": now,
2293 "isAutomaticInvocation": False,
2294 "operationParams": params,
2295 "isCancelPending": False,
2296 "links": {
2297 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2298 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2299 }
2300 }
2301 return nslcmop
2302
2303 def _format_additional_params(self, params):
2304 params = params or {}
2305 for key, value in params.items():
2306 if str(value).startswith("!!yaml "):
2307 params[key] = yaml.safe_load(value[7:])
2308 return params
2309
2310 def _get_terminate_primitive_params(self, seq, vnf_index):
2311 primitive = seq.get('name')
2312 primitive_params = {}
2313 params = {
2314 "member_vnf_index": vnf_index,
2315 "primitive": primitive,
2316 "primitive_params": primitive_params,
2317 }
2318 desc_params = {}
2319 return self._map_primitive_params(seq, params, desc_params)
2320
2321 # sub-operations
2322
2323 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2324 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2325 if op.get('operationState') == 'COMPLETED':
2326 # b. Skip sub-operation
2327 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2328 return self.SUBOPERATION_STATUS_SKIP
2329 else:
2330 # c. Reintent executing sub-operation
2331 # The sub-operation exists, and operationState != 'COMPLETED'
2332 # Update operationState = 'PROCESSING' to indicate a reintent.
2333 operationState = 'PROCESSING'
2334 detailed_status = 'In progress'
2335 self._update_suboperation_status(
2336 db_nslcmop, op_index, operationState, detailed_status)
2337 # Return the sub-operation index
2338 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2339 # with arguments extracted from the sub-operation
2340 return op_index
2341
2342 # Find a sub-operation where all keys in a matching dictionary must match
2343 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2344 def _find_suboperation(self, db_nslcmop, match):
2345 if (db_nslcmop and match):
2346 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2347 for i, op in enumerate(op_list):
2348 if all(op.get(k) == match[k] for k in match):
2349 return i
2350 return self.SUBOPERATION_STATUS_NOT_FOUND
2351
2352 # Update status for a sub-operation given its index
2353 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2354 # Update DB for HA tasks
2355 q_filter = {'_id': db_nslcmop['_id']}
2356 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2357 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2358 self.db.set_one("nslcmops",
2359 q_filter=q_filter,
2360 update_dict=update_dict,
2361 fail_on_empty=False)
2362
2363 # Add sub-operation, return the index of the added sub-operation
2364 # Optionally, set operationState, detailed-status, and operationType
2365 # Status and type are currently set for 'scale' sub-operations:
2366 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2367 # 'detailed-status' : status message
2368 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2369 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2370 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2371 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2372 RO_nsr_id=None, RO_scaling_info=None):
2373 if not db_nslcmop:
2374 return self.SUBOPERATION_STATUS_NOT_FOUND
2375 # Get the "_admin.operations" list, if it exists
2376 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2377 op_list = db_nslcmop_admin.get('operations')
2378 # Create or append to the "_admin.operations" list
2379 new_op = {'member_vnf_index': vnf_index,
2380 'vdu_id': vdu_id,
2381 'vdu_count_index': vdu_count_index,
2382 'primitive': primitive,
2383 'primitive_params': mapped_primitive_params}
2384 if operationState:
2385 new_op['operationState'] = operationState
2386 if detailed_status:
2387 new_op['detailed-status'] = detailed_status
2388 if operationType:
2389 new_op['lcmOperationType'] = operationType
2390 if RO_nsr_id:
2391 new_op['RO_nsr_id'] = RO_nsr_id
2392 if RO_scaling_info:
2393 new_op['RO_scaling_info'] = RO_scaling_info
2394 if not op_list:
2395 # No existing operations, create key 'operations' with current operation as first list element
2396 db_nslcmop_admin.update({'operations': [new_op]})
2397 op_list = db_nslcmop_admin.get('operations')
2398 else:
2399 # Existing operations, append operation to list
2400 op_list.append(new_op)
2401
2402 db_nslcmop_update = {'_admin.operations': op_list}
2403 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2404 op_index = len(op_list) - 1
2405 return op_index
2406
2407 # Helper methods for scale() sub-operations
2408
2409 # pre-scale/post-scale:
2410 # Check for 3 different cases:
2411 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2412 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2413 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2414 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2415 operationType, RO_nsr_id=None, RO_scaling_info=None):
2416 # Find this sub-operation
2417 if (RO_nsr_id and RO_scaling_info):
2418 operationType = 'SCALE-RO'
2419 match = {
2420 'member_vnf_index': vnf_index,
2421 'RO_nsr_id': RO_nsr_id,
2422 'RO_scaling_info': RO_scaling_info,
2423 }
2424 else:
2425 match = {
2426 'member_vnf_index': vnf_index,
2427 'primitive': vnf_config_primitive,
2428 'primitive_params': primitive_params,
2429 'lcmOperationType': operationType
2430 }
2431 op_index = self._find_suboperation(db_nslcmop, match)
2432 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2433 # a. New sub-operation
2434 # The sub-operation does not exist, add it.
2435 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2436 # The following parameters are set to None for all kind of scaling:
2437 vdu_id = None
2438 vdu_count_index = None
2439 vdu_name = None
2440 if RO_nsr_id and RO_scaling_info:
2441 vnf_config_primitive = None
2442 primitive_params = None
2443 else:
2444 RO_nsr_id = None
2445 RO_scaling_info = None
2446 # Initial status for sub-operation
2447 operationState = 'PROCESSING'
2448 detailed_status = 'In progress'
2449 # Add sub-operation for pre/post-scaling (zero or more operations)
2450 self._add_suboperation(db_nslcmop,
2451 vnf_index,
2452 vdu_id,
2453 vdu_count_index,
2454 vdu_name,
2455 vnf_config_primitive,
2456 primitive_params,
2457 operationState,
2458 detailed_status,
2459 operationType,
2460 RO_nsr_id,
2461 RO_scaling_info)
2462 return self.SUBOPERATION_STATUS_NEW
2463 else:
2464 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2465 # or op_index (operationState != 'COMPLETED')
2466 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2467
2468 # Function to return execution_environment id
2469
2470 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2471 # TODO vdu_index_count
2472 for vca in vca_deployed_list:
2473 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2474 return vca["ee_id"]
2475
2476 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2477 """
2478 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2479 :param logging_text:
2480 :param db_nslcmop:
2481 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2482 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2483 :param vca_index: index in the database _admin.deployed.VCA
2484 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2485 :return: None or exception
2486 """
2487 # execute terminate_primitives
2488 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2489 vdu_id = vca_deployed.get("vdu_id")
2490 vdu_count_index = vca_deployed.get("vdu_count_index")
2491 vdu_name = vca_deployed.get("vdu_name")
2492 vnf_index = vca_deployed.get("member-vnf-index")
2493 if terminate_primitives and vca_deployed.get("needed_terminate"):
2494 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2495 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2496 for seq in terminate_primitives:
2497 # For each sequence in list, get primitive and call _ns_execute_primitive()
2498 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2499 vnf_index, seq.get("name"))
2500 self.logger.debug(logging_text + step)
2501 # Create the primitive for each sequence, i.e. "primitive": "touch"
2502 primitive = seq.get('name')
2503 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2504 # The following 3 parameters are currently set to None for 'terminate':
2505 # vdu_id, vdu_count_index, vdu_name
2506
2507 # Add sub-operation
2508 self._add_suboperation(db_nslcmop,
2509 vnf_index,
2510 vdu_id,
2511 vdu_count_index,
2512 vdu_name,
2513 primitive,
2514 mapped_primitive_params)
2515 # Sub-operations: Call _ns_execute_primitive() instead of action()
2516 try:
2517 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2518 mapped_primitive_params)
2519 except LcmException:
2520 # this happens when VCA is not deployed. In this case it is not needed to terminate
2521 continue
2522 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2523 if result not in result_ok:
2524 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2525 "error {}".format(seq.get("name"), vnf_index, result_detail))
2526 # set that this VCA do not need terminated
2527 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2528 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2529
2530 if destroy_ee:
2531 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2532
2533 async def _delete_all_N2VC(self, db_nsr: dict):
2534 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2535 namespace = "." + db_nsr["_id"]
2536 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2537 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2538
2539 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2540 """
2541 Terminates a deployment from RO
2542 :param logging_text:
2543 :param nsr_deployed: db_nsr._admin.deployed
2544 :param nsr_id:
2545 :param nslcmop_id:
2546 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2547 this method will update only the index 2, but it will write on database the concatenated content of the list
2548 :return:
2549 """
2550 db_nsr_update = {}
2551 failed_detail = []
2552 ro_nsr_id = ro_delete_action = None
2553 if nsr_deployed and nsr_deployed.get("RO"):
2554 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2555 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2556 try:
2557 if ro_nsr_id:
2558 stage[2] = "Deleting ns from VIM."
2559 db_nsr_update["detailed-status"] = " ".join(stage)
2560 self._write_op_status(nslcmop_id, stage)
2561 self.logger.debug(logging_text + stage[2])
2562 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2563 self._write_op_status(nslcmop_id, stage)
2564 desc = await self.RO.delete("ns", ro_nsr_id)
2565 ro_delete_action = desc["action_id"]
2566 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2567 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2568 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2569 if ro_delete_action:
2570 # wait until NS is deleted from VIM
2571 stage[2] = "Waiting ns deleted from VIM."
2572 detailed_status_old = None
2573 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2574 ro_delete_action))
2575 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2576 self._write_op_status(nslcmop_id, stage)
2577
2578 delete_timeout = 20 * 60 # 20 minutes
2579 while delete_timeout > 0:
2580 desc = await self.RO.show(
2581 "ns",
2582 item_id_name=ro_nsr_id,
2583 extra_item="action",
2584 extra_item_id=ro_delete_action)
2585
2586 # deploymentStatus
2587 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2588
2589 ns_status, ns_status_info = self.RO.check_action_status(desc)
2590 if ns_status == "ERROR":
2591 raise ROclient.ROClientException(ns_status_info)
2592 elif ns_status == "BUILD":
2593 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2594 elif ns_status == "ACTIVE":
2595 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2596 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2597 break
2598 else:
2599 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2600 if stage[2] != detailed_status_old:
2601 detailed_status_old = stage[2]
2602 db_nsr_update["detailed-status"] = " ".join(stage)
2603 self._write_op_status(nslcmop_id, stage)
2604 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2605 await asyncio.sleep(5, loop=self.loop)
2606 delete_timeout -= 5
2607 else: # delete_timeout <= 0:
2608 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2609
2610 except Exception as e:
2611 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2612 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2613 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2614 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2615 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2616 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2617 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2618 failed_detail.append("delete conflict: {}".format(e))
2619 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2620 else:
2621 failed_detail.append("delete error: {}".format(e))
2622 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2623
2624 # Delete nsd
2625 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2626 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2627 try:
2628 stage[2] = "Deleting nsd from RO."
2629 db_nsr_update["detailed-status"] = " ".join(stage)
2630 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2631 self._write_op_status(nslcmop_id, stage)
2632 await self.RO.delete("nsd", ro_nsd_id)
2633 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2634 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2635 except Exception as e:
2636 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2637 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2638 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2639 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2640 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2641 self.logger.debug(logging_text + failed_detail[-1])
2642 else:
2643 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2644 self.logger.error(logging_text + failed_detail[-1])
2645
2646 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2647 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2648 if not vnf_deployed or not vnf_deployed["id"]:
2649 continue
2650 try:
2651 ro_vnfd_id = vnf_deployed["id"]
2652 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2653 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2654 db_nsr_update["detailed-status"] = " ".join(stage)
2655 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2656 self._write_op_status(nslcmop_id, stage)
2657 await self.RO.delete("vnfd", ro_vnfd_id)
2658 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2659 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2660 except Exception as e:
2661 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2662 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2663 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2664 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2665 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2666 self.logger.debug(logging_text + failed_detail[-1])
2667 else:
2668 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2669 self.logger.error(logging_text + failed_detail[-1])
2670
2671 if failed_detail:
2672 stage[2] = "Error deleting from VIM"
2673 else:
2674 stage[2] = "Deleted from VIM"
2675 db_nsr_update["detailed-status"] = " ".join(stage)
2676 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2677 self._write_op_status(nslcmop_id, stage)
2678
2679 if failed_detail:
2680 raise LcmException("; ".join(failed_detail))
2681
2682 async def terminate(self, nsr_id, nslcmop_id):
2683 # Try to lock HA task here
2684 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2685 if not task_is_locked_by_me:
2686 return
2687
2688 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2689 self.logger.debug(logging_text + "Enter")
2690 timeout_ns_terminate = self.timeout_ns_terminate
2691 db_nsr = None
2692 db_nslcmop = None
2693 exc = None
2694 error_list = [] # annotates all failed error messages
2695 db_nslcmop_update = {}
2696 autoremove = False # autoremove after terminated
2697 tasks_dict_info = {}
2698 db_nsr_update = {}
2699 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2700 # ^ contains [stage, step, VIM-status]
2701 try:
2702 # wait for any previous tasks in process
2703 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2704
2705 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2706 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2707 operation_params = db_nslcmop.get("operationParams") or {}
2708 if operation_params.get("timeout_ns_terminate"):
2709 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2710 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2711 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2712
2713 db_nsr_update["operational-status"] = "terminating"
2714 db_nsr_update["config-status"] = "terminating"
2715 self._write_ns_status(
2716 nsr_id=nsr_id,
2717 ns_state="TERMINATING",
2718 current_operation="TERMINATING",
2719 current_operation_id=nslcmop_id,
2720 other_update=db_nsr_update
2721 )
2722 self._write_op_status(
2723 op_id=nslcmop_id,
2724 queuePosition=0,
2725 stage=stage
2726 )
2727 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2728 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2729 return
2730
2731 stage[1] = "Getting vnf descriptors from db."
2732 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2733 db_vnfds_from_id = {}
2734 db_vnfds_from_member_index = {}
2735 # Loop over VNFRs
2736 for vnfr in db_vnfrs_list:
2737 vnfd_id = vnfr["vnfd-id"]
2738 if vnfd_id not in db_vnfds_from_id:
2739 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2740 db_vnfds_from_id[vnfd_id] = vnfd
2741 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2742
2743 # Destroy individual execution environments when there are terminating primitives.
2744 # Rest of EE will be deleted at once
2745 if not operation_params.get("skip_terminate_primitives"):
2746 stage[0] = "Stage 2/3 execute terminating primitives."
2747 stage[1] = "Looking execution environment that needs terminate."
2748 self.logger.debug(logging_text + stage[1])
2749 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2750 config_descriptor = None
2751 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2752 continue
2753 if not vca.get("member-vnf-index"):
2754 # ns
2755 config_descriptor = db_nsr.get("ns-configuration")
2756 elif vca.get("vdu_id"):
2757 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2758 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2759 if vdud:
2760 config_descriptor = vdud.get("vdu-configuration")
2761 elif vca.get("kdu_name"):
2762 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2763 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2764 if kdud:
2765 config_descriptor = kdud.get("kdu-configuration")
2766 else:
2767 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2768 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2769 vca_index, False))
2770 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2771
2772 # wait for pending tasks of terminate primitives
2773 if tasks_dict_info:
2774 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2775 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2776 min(self.timeout_charm_delete, timeout_ns_terminate),
2777 stage, nslcmop_id)
2778 if error_list:
2779 return # raise LcmException("; ".join(error_list))
2780 tasks_dict_info.clear()
2781
2782 # remove All execution environments at once
2783 stage[0] = "Stage 3/3 delete all."
2784 stage[1] = "Deleting all execution environments."
2785 self.logger.debug(logging_text + stage[1])
2786
2787 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
2788 timeout=self.timeout_charm_delete))
2789 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2790 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2791
2792 # Delete from k8scluster
2793 stage[1] = "Deleting KDUs."
2794 self.logger.debug(logging_text + stage[1])
2795 # print(nsr_deployed)
2796 for kdu in get_iterable(nsr_deployed, "K8s"):
2797 if not kdu or not kdu.get("kdu-instance"):
2798 continue
2799 kdu_instance = kdu.get("kdu-instance")
2800 if kdu.get("k8scluster-type") in self.k8scluster_map:
2801 task_delete_kdu_instance = asyncio.ensure_future(
2802 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2803 cluster_uuid=kdu.get("k8scluster-uuid"),
2804 kdu_instance=kdu_instance))
2805 else:
2806 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2807 format(kdu.get("k8scluster-type")))
2808 continue
2809 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2810
2811 # remove from RO
2812 stage[1] = "Deleting ns from VIM."
2813 task_delete_ro = asyncio.ensure_future(
2814 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2815 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2816
2817 # rest of staff will be done at finally
2818
2819 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2820 self.logger.error(logging_text + "Exit Exception {}".format(e))
2821 exc = e
2822 except asyncio.CancelledError:
2823 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2824 exc = "Operation was cancelled"
2825 except Exception as e:
2826 exc = traceback.format_exc()
2827 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2828 finally:
2829 if exc:
2830 error_list.append(str(exc))
2831 try:
2832 # wait for pending tasks
2833 if tasks_dict_info:
2834 stage[1] = "Waiting for terminate pending tasks."
2835 self.logger.debug(logging_text + stage[1])
2836 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2837 stage, nslcmop_id)
2838 stage[1] = stage[2] = ""
2839 except asyncio.CancelledError:
2840 error_list.append("Cancelled")
2841 # TODO cancell all tasks
2842 except Exception as exc:
2843 error_list.append(str(exc))
2844 # update status at database
2845 if error_list:
2846 error_detail = "; ".join(error_list)
2847 # self.logger.error(logging_text + error_detail)
2848 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2849 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2850
2851 db_nsr_update["operational-status"] = "failed"
2852 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2853 db_nslcmop_update["detailed-status"] = error_detail
2854 nslcmop_operation_state = "FAILED"
2855 ns_state = "BROKEN"
2856 else:
2857 error_detail = None
2858 error_description_nsr = error_description_nslcmop = None
2859 ns_state = "NOT_INSTANTIATED"
2860 db_nsr_update["operational-status"] = "terminated"
2861 db_nsr_update["detailed-status"] = "Done"
2862 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2863 db_nslcmop_update["detailed-status"] = "Done"
2864 nslcmop_operation_state = "COMPLETED"
2865
2866 if db_nsr:
2867 self._write_ns_status(
2868 nsr_id=nsr_id,
2869 ns_state=ns_state,
2870 current_operation="IDLE",
2871 current_operation_id=None,
2872 error_description=error_description_nsr,
2873 error_detail=error_detail,
2874 other_update=db_nsr_update
2875 )
2876 if db_nslcmop:
2877 self._write_op_status(
2878 op_id=nslcmop_id,
2879 stage="",
2880 error_message=error_description_nslcmop,
2881 operation_state=nslcmop_operation_state,
2882 other_update=db_nslcmop_update,
2883 )
2884 autoremove = operation_params.get("autoremove", False)
2885 if nslcmop_operation_state:
2886 try:
2887 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2888 "operationState": nslcmop_operation_state,
2889 "autoremove": autoremove},
2890 loop=self.loop)
2891 except Exception as e:
2892 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2893
2894 self.logger.debug(logging_text + "Exit")
2895 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2896
2897 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2898 time_start = time()
2899 error_detail_list = []
2900 error_list = []
2901 pending_tasks = list(created_tasks_info.keys())
2902 num_tasks = len(pending_tasks)
2903 num_done = 0
2904 stage[1] = "{}/{}.".format(num_done, num_tasks)
2905 self._write_op_status(nslcmop_id, stage)
2906 while pending_tasks:
2907 new_error = None
2908 _timeout = timeout + time_start - time()
2909 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2910 return_when=asyncio.FIRST_COMPLETED)
2911 num_done += len(done)
2912 if not done: # Timeout
2913 for task in pending_tasks:
2914 new_error = created_tasks_info[task] + ": Timeout"
2915 error_detail_list.append(new_error)
2916 error_list.append(new_error)
2917 break
2918 for task in done:
2919 if task.cancelled():
2920 exc = "Cancelled"
2921 else:
2922 exc = task.exception()
2923 if exc:
2924 if isinstance(exc, asyncio.TimeoutError):
2925 exc = "Timeout"
2926 new_error = created_tasks_info[task] + ": {}".format(exc)
2927 error_list.append(created_tasks_info[task])
2928 error_detail_list.append(new_error)
2929 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
2930 self.logger.error(logging_text + new_error)
2931 else:
2932 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2933 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2934 else:
2935 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2936 stage[1] = "{}/{}.".format(num_done, num_tasks)
2937 if new_error:
2938 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2939 if nsr_id: # update also nsr
2940 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2941 "errorDetail": ". ".join(error_detail_list)})
2942 self._write_op_status(nslcmop_id, stage)
2943 return error_detail_list
2944
2945 @staticmethod
2946 def _map_primitive_params(primitive_desc, params, instantiation_params):
2947 """
2948 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2949 The default-value is used. If it is between < > it look for a value at instantiation_params
2950 :param primitive_desc: portion of VNFD/NSD that describes primitive
2951 :param params: Params provided by user
2952 :param instantiation_params: Instantiation params provided by user
2953 :return: a dictionary with the calculated params
2954 """
2955 calculated_params = {}
2956 for parameter in primitive_desc.get("parameter", ()):
2957 param_name = parameter["name"]
2958 if param_name in params:
2959 calculated_params[param_name] = params[param_name]
2960 elif "default-value" in parameter or "value" in parameter:
2961 if "value" in parameter:
2962 calculated_params[param_name] = parameter["value"]
2963 else:
2964 calculated_params[param_name] = parameter["default-value"]
2965 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2966 and calculated_params[param_name].endswith(">"):
2967 if calculated_params[param_name][1:-1] in instantiation_params:
2968 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2969 else:
2970 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2971 format(calculated_params[param_name], primitive_desc["name"]))
2972 else:
2973 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2974 format(param_name, primitive_desc["name"]))
2975
2976 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2977 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2978 width=256)
2979 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2980 calculated_params[param_name] = calculated_params[param_name][7:]
2981
2982 # add always ns_config_info if primitive name is config
2983 if primitive_desc["name"] == "config":
2984 if "ns_config_info" in instantiation_params:
2985 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2986 return calculated_params
2987
2988 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
2989 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
2990 for vca in deployed_vca:
2991 if not vca:
2992 continue
2993 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
2994 continue
2995 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
2996 continue
2997 if kdu_name and kdu_name != vca["kdu_name"]:
2998 continue
2999 break
3000 else:
3001 # vca_deployed not found
3002 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3003 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3004
3005 # get ee_id
3006 ee_id = vca.get("ee_id")
3007 if not ee_id:
3008 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3009 "execution environment"
3010 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3011 return ee_id
3012
3013 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3014 retries_interval=30, timeout=None) -> (str, str):
3015 try:
3016 if primitive == "config":
3017 primitive_params = {"params": primitive_params}
3018
3019 while retries >= 0:
3020 try:
3021 output = await asyncio.wait_for(
3022 self.n2vc.exec_primitive(
3023 ee_id=ee_id,
3024 primitive_name=primitive,
3025 params_dict=primitive_params,
3026 progress_timeout=self.timeout_progress_primitive,
3027 total_timeout=self.timeout_primitive),
3028 timeout=timeout or self.timeout_primitive)
3029 # execution was OK
3030 break
3031 except asyncio.CancelledError:
3032 raise
3033 except Exception as e: # asyncio.TimeoutError
3034 if isinstance(e, asyncio.TimeoutError):
3035 e = "Timeout"
3036 retries -= 1
3037 if retries >= 0:
3038 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3039 # wait and retry
3040 await asyncio.sleep(retries_interval, loop=self.loop)
3041 else:
3042 return 'FAILED', str(e)
3043
3044 return 'COMPLETED', output
3045
3046 except (LcmException, asyncio.CancelledError):
3047 raise
3048 except Exception as e:
3049 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3050
3051 async def action(self, nsr_id, nslcmop_id):
3052
3053 # Try to lock HA task here
3054 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3055 if not task_is_locked_by_me:
3056 return
3057
3058 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3059 self.logger.debug(logging_text + "Enter")
3060 # get all needed from database
3061 db_nsr = None
3062 db_nslcmop = None
3063 db_nsr_update = {}
3064 db_nslcmop_update = {}
3065 nslcmop_operation_state = None
3066 error_description_nslcmop = None
3067 exc = None
3068 try:
3069 # wait for any previous tasks in process
3070 step = "Waiting for previous operations to terminate"
3071 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3072
3073 self._write_ns_status(
3074 nsr_id=nsr_id,
3075 ns_state=None,
3076 current_operation="RUNNING ACTION",
3077 current_operation_id=nslcmop_id
3078 )
3079
3080 step = "Getting information from database"
3081 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3082 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3083
3084 nsr_deployed = db_nsr["_admin"].get("deployed")
3085 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3086 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3087 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3088 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3089 primitive = db_nslcmop["operationParams"]["primitive"]
3090 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3091 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3092
3093 if vnf_index:
3094 step = "Getting vnfr from database"
3095 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3096 step = "Getting vnfd from database"
3097 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3098 else:
3099 step = "Getting nsd from database"
3100 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3101
3102 # for backward compatibility
3103 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3104 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3105 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3106 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3107
3108 # look for primitive
3109 config_primitive_desc = None
3110 if vdu_id:
3111 for vdu in get_iterable(db_vnfd, "vdu"):
3112 if vdu_id == vdu["id"]:
3113 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3114 if config_primitive["name"] == primitive:
3115 config_primitive_desc = config_primitive
3116 break
3117 break
3118 elif kdu_name:
3119 for kdu in get_iterable(db_vnfd, "kdu"):
3120 if kdu_name == kdu["name"]:
3121 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3122 if config_primitive["name"] == primitive:
3123 config_primitive_desc = config_primitive
3124 break
3125 break
3126 elif vnf_index:
3127 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3128 if config_primitive["name"] == primitive:
3129 config_primitive_desc = config_primitive
3130 break
3131 else:
3132 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3133 if config_primitive["name"] == primitive:
3134 config_primitive_desc = config_primitive
3135 break
3136
3137 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3138 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3139 format(primitive))
3140
3141 if vnf_index:
3142 if vdu_id:
3143 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3144 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3145 elif kdu_name:
3146 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3147 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3148 else:
3149 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3150 else:
3151 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3152
3153 # TODO check if ns is in a proper status
3154 if kdu_name and primitive in ("upgrade", "rollback", "status"):
3155 # kdur and desc_params already set from before
3156 if primitive_params:
3157 desc_params.update(primitive_params)
3158 # TODO Check if we will need something at vnf level
3159 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3160 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3161 break
3162 else:
3163 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3164
3165 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3166 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3167 raise LcmException(msg)
3168
3169 db_dict = {"collection": "nsrs",
3170 "filter": {"_id": nsr_id},
3171 "path": "_admin.deployed.K8s.{}".format(index)}
3172 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3173 step = "Executing kdu {}".format(primitive)
3174 if primitive == "upgrade":
3175 if desc_params.get("kdu_model"):
3176 kdu_model = desc_params.get("kdu_model")
3177 del desc_params["kdu_model"]
3178 else:
3179 kdu_model = kdu.get("kdu-model")
3180 parts = kdu_model.split(sep=":")
3181 if len(parts) == 2:
3182 kdu_model = parts[0]
3183
3184 detailed_status = await asyncio.wait_for(
3185 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3186 cluster_uuid=kdu.get("k8scluster-uuid"),
3187 kdu_instance=kdu.get("kdu-instance"),
3188 atomic=True, kdu_model=kdu_model,
3189 params=desc_params, db_dict=db_dict,
3190 timeout=timeout_ns_action),
3191 timeout=timeout_ns_action + 10)
3192 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3193 elif primitive == "rollback":
3194 detailed_status = await asyncio.wait_for(
3195 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3196 cluster_uuid=kdu.get("k8scluster-uuid"),
3197 kdu_instance=kdu.get("kdu-instance"),
3198 db_dict=db_dict),
3199 timeout=timeout_ns_action)
3200 elif primitive == "status":
3201 detailed_status = await asyncio.wait_for(
3202 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3203 cluster_uuid=kdu.get("k8scluster-uuid"),
3204 kdu_instance=kdu.get("kdu-instance")),
3205 timeout=timeout_ns_action)
3206
3207 if detailed_status:
3208 nslcmop_operation_state = 'COMPLETED'
3209 else:
3210 detailed_status = ''
3211 nslcmop_operation_state = 'FAILED'
3212
3213 else:
3214 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3215 self._look_for_deployed_vca(nsr_deployed["VCA"],
3216 member_vnf_index=vnf_index,
3217 vdu_id=vdu_id,
3218 vdu_count_index=vdu_count_index),
3219 primitive=primitive,
3220 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3221 timeout=timeout_ns_action)
3222
3223 db_nslcmop_update["detailed-status"] = detailed_status
3224 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3225 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3226 detailed_status))
3227 return # database update is called inside finally
3228
3229 except (DbException, LcmException, N2VCException) as e:
3230 self.logger.error(logging_text + "Exit Exception {}".format(e))
3231 exc = e
3232 except asyncio.CancelledError:
3233 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3234 exc = "Operation was cancelled"
3235 except asyncio.TimeoutError:
3236 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3237 exc = "Timeout"
3238 except Exception as e:
3239 exc = traceback.format_exc()
3240 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3241 finally:
3242 if exc:
3243 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3244 "FAILED {}: {}".format(step, exc)
3245 nslcmop_operation_state = "FAILED"
3246 if db_nsr:
3247 self._write_ns_status(
3248 nsr_id=nsr_id,
3249 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3250 current_operation="IDLE",
3251 current_operation_id=None,
3252 # error_description=error_description_nsr,
3253 # error_detail=error_detail,
3254 other_update=db_nsr_update
3255 )
3256
3257 if db_nslcmop:
3258 self._write_op_status(
3259 op_id=nslcmop_id,
3260 stage="",
3261 error_message=error_description_nslcmop,
3262 operation_state=nslcmop_operation_state,
3263 other_update=db_nslcmop_update,
3264 )
3265
3266 if nslcmop_operation_state:
3267 try:
3268 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3269 "operationState": nslcmop_operation_state},
3270 loop=self.loop)
3271 except Exception as e:
3272 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3273 self.logger.debug(logging_text + "Exit")
3274 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3275 return nslcmop_operation_state, detailed_status
3276
3277 async def scale(self, nsr_id, nslcmop_id):
3278
3279 # Try to lock HA task here
3280 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3281 if not task_is_locked_by_me:
3282 return
3283
3284 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3285 self.logger.debug(logging_text + "Enter")
3286 # get all needed from database
3287 db_nsr = None
3288 db_nslcmop = None
3289 db_nslcmop_update = {}
3290 nslcmop_operation_state = None
3291 db_nsr_update = {}
3292 exc = None
3293 # in case of error, indicates what part of scale was failed to put nsr at error status
3294 scale_process = None
3295 old_operational_status = ""
3296 old_config_status = ""
3297 vnfr_scaled = False
3298 try:
3299 # wait for any previous tasks in process
3300 step = "Waiting for previous operations to terminate"
3301 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3302
3303 self._write_ns_status(
3304 nsr_id=nsr_id,
3305 ns_state=None,
3306 current_operation="SCALING",
3307 current_operation_id=nslcmop_id
3308 )
3309
3310 step = "Getting nslcmop from database"
3311 self.logger.debug(step + " after having waited for previous tasks to be completed")
3312 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3313 step = "Getting nsr from database"
3314 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3315
3316 old_operational_status = db_nsr["operational-status"]
3317 old_config_status = db_nsr["config-status"]
3318 step = "Parsing scaling parameters"
3319 # self.logger.debug(step)
3320 db_nsr_update["operational-status"] = "scaling"
3321 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3322 nsr_deployed = db_nsr["_admin"].get("deployed")
3323
3324 #######
3325 nsr_deployed = db_nsr["_admin"].get("deployed")
3326 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3327 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3328 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3329 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3330 #######
3331
3332 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3333 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3334 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3335 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3336 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3337
3338 # for backward compatibility
3339 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3340 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3341 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3342 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3343
3344 step = "Getting vnfr from database"
3345 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3346 step = "Getting vnfd from database"
3347 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3348
3349 step = "Getting scaling-group-descriptor"
3350 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3351 if scaling_descriptor["name"] == scaling_group:
3352 break
3353 else:
3354 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3355 "at vnfd:scaling-group-descriptor".format(scaling_group))
3356
3357 # cooldown_time = 0
3358 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3359 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3360 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3361 # break
3362
3363 # TODO check if ns is in a proper status
3364 step = "Sending scale order to VIM"
3365 nb_scale_op = 0
3366 if not db_nsr["_admin"].get("scaling-group"):
3367 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3368 admin_scale_index = 0
3369 else:
3370 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3371 if admin_scale_info["name"] == scaling_group:
3372 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3373 break
3374 else: # not found, set index one plus last element and add new entry with the name
3375 admin_scale_index += 1
3376 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3377 RO_scaling_info = []
3378 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3379 if scaling_type == "SCALE_OUT":
3380 # count if max-instance-count is reached
3381 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3382 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3383 if nb_scale_op >= max_instance_count:
3384 raise LcmException("reached the limit of {} (max-instance-count) "
3385 "scaling-out operations for the "
3386 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3387
3388 nb_scale_op += 1
3389 vdu_scaling_info["scaling_direction"] = "OUT"
3390 vdu_scaling_info["vdu-create"] = {}
3391 for vdu_scale_info in scaling_descriptor["vdu"]:
3392 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3393 "type": "create", "count": vdu_scale_info.get("count", 1)})
3394 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3395
3396 elif scaling_type == "SCALE_IN":
3397 # count if min-instance-count is reached
3398 min_instance_count = 0
3399 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3400 min_instance_count = int(scaling_descriptor["min-instance-count"])
3401 if nb_scale_op <= min_instance_count:
3402 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3403 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3404 nb_scale_op -= 1
3405 vdu_scaling_info["scaling_direction"] = "IN"
3406 vdu_scaling_info["vdu-delete"] = {}
3407 for vdu_scale_info in scaling_descriptor["vdu"]:
3408 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3409 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3410 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3411
3412 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3413 vdu_create = vdu_scaling_info.get("vdu-create")
3414 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3415 if vdu_scaling_info["scaling_direction"] == "IN":
3416 for vdur in reversed(db_vnfr["vdur"]):
3417 if vdu_delete.get(vdur["vdu-id-ref"]):
3418 vdu_delete[vdur["vdu-id-ref"]] -= 1
3419 vdu_scaling_info["vdu"].append({
3420 "name": vdur["name"],
3421 "vdu_id": vdur["vdu-id-ref"],
3422 "interface": []
3423 })
3424 for interface in vdur["interfaces"]:
3425 vdu_scaling_info["vdu"][-1]["interface"].append({
3426 "name": interface["name"],
3427 "ip_address": interface["ip-address"],
3428 "mac_address": interface.get("mac-address"),
3429 })
3430 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3431
3432 # PRE-SCALE BEGIN
3433 step = "Executing pre-scale vnf-config-primitive"
3434 if scaling_descriptor.get("scaling-config-action"):
3435 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3436 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3437 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3438 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3439 step = db_nslcmop_update["detailed-status"] = \
3440 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3441
3442 # look for primitive
3443 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3444 if config_primitive["name"] == vnf_config_primitive:
3445 break
3446 else:
3447 raise LcmException(
3448 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3449 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3450 "primitive".format(scaling_group, config_primitive))
3451
3452 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3453 if db_vnfr.get("additionalParamsForVnf"):
3454 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3455
3456 scale_process = "VCA"
3457 db_nsr_update["config-status"] = "configuring pre-scaling"
3458 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3459
3460 # Pre-scale reintent check: Check if this sub-operation has been executed before
3461 op_index = self._check_or_add_scale_suboperation(
3462 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3463 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3464 # Skip sub-operation
3465 result = 'COMPLETED'
3466 result_detail = 'Done'
3467 self.logger.debug(logging_text +
3468 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3469 vnf_config_primitive, result, result_detail))
3470 else:
3471 if (op_index == self.SUBOPERATION_STATUS_NEW):
3472 # New sub-operation: Get index of this sub-operation
3473 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3474 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3475 format(vnf_config_primitive))
3476 else:
3477 # Reintent: Get registered params for this existing sub-operation
3478 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3479 vnf_index = op.get('member_vnf_index')
3480 vnf_config_primitive = op.get('primitive')
3481 primitive_params = op.get('primitive_params')
3482 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3483 format(vnf_config_primitive))
3484 # Execute the primitive, either with new (first-time) or registered (reintent) args
3485 result, result_detail = await self._ns_execute_primitive(
3486 self._look_for_deployed_vca(nsr_deployed["VCA"],
3487 member_vnf_index=vnf_index,
3488 vdu_id=None,
3489 vdu_count_index=None),
3490 vnf_config_primitive, primitive_params)
3491 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3492 vnf_config_primitive, result, result_detail))
3493 # Update operationState = COMPLETED | FAILED
3494 self._update_suboperation_status(
3495 db_nslcmop, op_index, result, result_detail)
3496
3497 if result == "FAILED":
3498 raise LcmException(result_detail)
3499 db_nsr_update["config-status"] = old_config_status
3500 scale_process = None
3501 # PRE-SCALE END
3502
3503 # SCALE RO - BEGIN
3504 # Should this block be skipped if 'RO_nsr_id' == None ?
3505 # if (RO_nsr_id and RO_scaling_info):
3506 if RO_scaling_info:
3507 scale_process = "RO"
3508 # Scale RO reintent check: Check if this sub-operation has been executed before
3509 op_index = self._check_or_add_scale_suboperation(
3510 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3511 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3512 # Skip sub-operation
3513 result = 'COMPLETED'
3514 result_detail = 'Done'
3515 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3516 result, result_detail))
3517 else:
3518 if (op_index == self.SUBOPERATION_STATUS_NEW):
3519 # New sub-operation: Get index of this sub-operation
3520 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3521 self.logger.debug(logging_text + "New sub-operation RO")
3522 else:
3523 # Reintent: Get registered params for this existing sub-operation
3524 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3525 RO_nsr_id = op.get('RO_nsr_id')
3526 RO_scaling_info = op.get('RO_scaling_info')
3527 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3528 vnf_config_primitive))
3529
3530 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3531 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3532 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3533 # wait until ready
3534 RO_nslcmop_id = RO_desc["instance_action_id"]
3535 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3536
3537 RO_task_done = False
3538 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3539 detailed_status_old = None
3540 self.logger.debug(logging_text + step)
3541
3542 deployment_timeout = 1 * 3600 # One hour
3543 while deployment_timeout > 0:
3544 if not RO_task_done:
3545 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3546 extra_item_id=RO_nslcmop_id)
3547
3548 # deploymentStatus
3549 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3550
3551 ns_status, ns_status_info = self.RO.check_action_status(desc)
3552 if ns_status == "ERROR":
3553 raise ROclient.ROClientException(ns_status_info)
3554 elif ns_status == "BUILD":
3555 detailed_status = step + "; {}".format(ns_status_info)
3556 elif ns_status == "ACTIVE":
3557 RO_task_done = True
3558 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3559 self.logger.debug(logging_text + step)
3560 else:
3561 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3562 else:
3563
3564 if ns_status == "ERROR":
3565 raise ROclient.ROClientException(ns_status_info)
3566 elif ns_status == "BUILD":
3567 detailed_status = step + "; {}".format(ns_status_info)
3568 elif ns_status == "ACTIVE":
3569 step = detailed_status = \
3570 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3571 if not vnfr_scaled:
3572 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3573 vnfr_scaled = True
3574 try:
3575 desc = await self.RO.show("ns", RO_nsr_id)
3576
3577 # deploymentStatus
3578 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3579
3580 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3581 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3582 break
3583 except LcmExceptionNoMgmtIP:
3584 pass
3585 else:
3586 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3587 if detailed_status != detailed_status_old:
3588 self._update_suboperation_status(
3589 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3590 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3591 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3592
3593 await asyncio.sleep(5, loop=self.loop)
3594 deployment_timeout -= 5
3595 if deployment_timeout <= 0:
3596 self._update_suboperation_status(
3597 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3598 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3599
3600 # update VDU_SCALING_INFO with the obtained ip_addresses
3601 if vdu_scaling_info["scaling_direction"] == "OUT":
3602 for vdur in reversed(db_vnfr["vdur"]):
3603 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3604 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3605 vdu_scaling_info["vdu"].append({
3606 "name": vdur["name"],
3607 "vdu_id": vdur["vdu-id-ref"],
3608 "interface": []
3609 })
3610 for interface in vdur["interfaces"]:
3611 vdu_scaling_info["vdu"][-1]["interface"].append({
3612 "name": interface["name"],
3613 "ip_address": interface["ip-address"],
3614 "mac_address": interface.get("mac-address"),
3615 })
3616 del vdu_scaling_info["vdu-create"]
3617
3618 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3619 # SCALE RO - END
3620
3621 scale_process = None
3622 if db_nsr_update:
3623 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3624
3625 # POST-SCALE BEGIN
3626 # execute primitive service POST-SCALING
3627 step = "Executing post-scale vnf-config-primitive"
3628 if scaling_descriptor.get("scaling-config-action"):
3629 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3630 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3631 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3632 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3633 step = db_nslcmop_update["detailed-status"] = \
3634 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3635
3636 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3637 if db_vnfr.get("additionalParamsForVnf"):
3638 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3639
3640 # look for primitive
3641 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3642 if config_primitive["name"] == vnf_config_primitive:
3643 break
3644 else:
3645 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3646 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3647 "match any vnf-configuration:config-primitive".format(scaling_group,
3648 config_primitive))
3649 scale_process = "VCA"
3650 db_nsr_update["config-status"] = "configuring post-scaling"
3651 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3652
3653 # Post-scale reintent check: Check if this sub-operation has been executed before
3654 op_index = self._check_or_add_scale_suboperation(
3655 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3656 if op_index == self.SUBOPERATION_STATUS_SKIP:
3657 # Skip sub-operation
3658 result = 'COMPLETED'
3659 result_detail = 'Done'
3660 self.logger.debug(logging_text +
3661 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3662 format(vnf_config_primitive, result, result_detail))
3663 else:
3664 if op_index == self.SUBOPERATION_STATUS_NEW:
3665 # New sub-operation: Get index of this sub-operation
3666 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3667 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3668 format(vnf_config_primitive))
3669 else:
3670 # Reintent: Get registered params for this existing sub-operation
3671 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3672 vnf_index = op.get('member_vnf_index')
3673 vnf_config_primitive = op.get('primitive')
3674 primitive_params = op.get('primitive_params')
3675 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3676 format(vnf_config_primitive))
3677 # Execute the primitive, either with new (first-time) or registered (reintent) args
3678 result, result_detail = await self._ns_execute_primitive(
3679 self._look_for_deployed_vca(nsr_deployed["VCA"],
3680 member_vnf_index=vnf_index,
3681 vdu_id=None,
3682 vdu_count_index=None),
3683 vnf_config_primitive, primitive_params)
3684 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3685 vnf_config_primitive, result, result_detail))
3686 # Update operationState = COMPLETED | FAILED
3687 self._update_suboperation_status(
3688 db_nslcmop, op_index, result, result_detail)
3689
3690 if result == "FAILED":
3691 raise LcmException(result_detail)
3692 db_nsr_update["config-status"] = old_config_status
3693 scale_process = None
3694 # POST-SCALE END
3695
3696 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3697 db_nslcmop_update["statusEnteredTime"] = time()
3698 db_nslcmop_update["detailed-status"] = "done"
3699 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3700 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3701 else old_operational_status
3702 db_nsr_update["config-status"] = old_config_status
3703 return
3704 except (ROclient.ROClientException, DbException, LcmException) as e:
3705 self.logger.error(logging_text + "Exit Exception {}".format(e))
3706 exc = e
3707 except asyncio.CancelledError:
3708 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3709 exc = "Operation was cancelled"
3710 except Exception as e:
3711 exc = traceback.format_exc()
3712 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3713 finally:
3714 self._write_ns_status(
3715 nsr_id=nsr_id,
3716 ns_state=None,
3717 current_operation="IDLE",
3718 current_operation_id=None
3719 )
3720 if exc:
3721 if db_nslcmop:
3722 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3723 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3724 db_nslcmop_update["statusEnteredTime"] = time()
3725 if db_nsr:
3726 db_nsr_update["operational-status"] = old_operational_status
3727 db_nsr_update["config-status"] = old_config_status
3728 db_nsr_update["detailed-status"] = ""
3729 if scale_process:
3730 if "VCA" in scale_process:
3731 db_nsr_update["config-status"] = "failed"
3732 if "RO" in scale_process:
3733 db_nsr_update["operational-status"] = "failed"
3734 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3735 exc)
3736 try:
3737 if db_nslcmop and db_nslcmop_update:
3738 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3739 if db_nsr:
3740 self._write_ns_status(
3741 nsr_id=nsr_id,
3742 ns_state=None,
3743 current_operation="IDLE",
3744 current_operation_id=None,
3745 other_update=db_nsr_update
3746 )
3747
3748 except DbException as e:
3749 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3750 if nslcmop_operation_state:
3751 try:
3752 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3753 "operationState": nslcmop_operation_state},
3754 loop=self.loop)
3755 # if cooldown_time:
3756 # await asyncio.sleep(cooldown_time, loop=self.loop)
3757 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3758 except Exception as e:
3759 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3760 self.logger.debug(logging_text + "Exit")
3761 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")