allow k8s namespace parameter
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
50 timeout_charm_delete = 10 * 60
51 timeout_primitive = 10 * 60 # timeout for primitive execution
52 timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
53
54 SUBOPERATION_STATUS_NOT_FOUND = -1
55 SUBOPERATION_STATUS_NEW = -2
56 SUBOPERATION_STATUS_SKIP = -3
57 task_name_deploy_vca = "Deploying VCA"
58
59 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
60 """
61 Init, Connect to database, filesystem storage, and messaging
62 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
63 :return: None
64 """
65 super().__init__(
66 db=db,
67 msg=msg,
68 fs=fs,
69 logger=logging.getLogger('lcm.ns')
70 )
71
72 self.loop = loop
73 self.lcm_tasks = lcm_tasks
74 self.timeout = config["timeout"]
75 self.ro_config = config["ro_config"]
76 self.vca_config = config["VCA"].copy()
77
78 # create N2VC connector
79 self.n2vc = N2VCJujuConnector(
80 db=self.db,
81 fs=self.fs,
82 log=self.logger,
83 loop=self.loop,
84 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
85 username=self.vca_config.get('user', None),
86 vca_config=self.vca_config,
87 on_update_db=self._on_update_n2vc_db
88 )
89
90 self.k8sclusterhelm = K8sHelmConnector(
91 kubectl_command=self.vca_config.get("kubectlpath"),
92 helm_command=self.vca_config.get("helmpath"),
93 fs=self.fs,
94 log=self.logger,
95 db=self.db,
96 on_update_db=None,
97 )
98
99 self.k8sclusterjuju = K8sJujuConnector(
100 kubectl_command=self.vca_config.get("kubectlpath"),
101 juju_command=self.vca_config.get("jujupath"),
102 fs=self.fs,
103 log=self.logger,
104 db=self.db,
105 on_update_db=None,
106 )
107
108 self.k8scluster_map = {
109 "helm-chart": self.k8sclusterhelm,
110 "chart": self.k8sclusterhelm,
111 "juju-bundle": self.k8sclusterjuju,
112 "juju": self.k8sclusterjuju,
113 }
114 # create RO client
115 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
116
117 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
118
119 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
120
121 try:
122 # TODO filter RO descriptor fields...
123
124 # write to database
125 db_dict = dict()
126 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
127 db_dict['deploymentStatus'] = ro_descriptor
128 self.update_db_2("nsrs", nsrs_id, db_dict)
129
130 except Exception as e:
131 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
132
133 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
134
135 # remove last dot from path (if exists)
136 if path.endswith('.'):
137 path = path[:-1]
138
139 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
140 # .format(table, filter, path, updated_data))
141
142 try:
143
144 nsr_id = filter.get('_id')
145
146 # read ns record from database
147 nsr = self.db.get_one(table='nsrs', q_filter=filter)
148 current_ns_status = nsr.get('nsState')
149
150 # get vca status for NS
151 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
152
153 # vcaStatus
154 db_dict = dict()
155 db_dict['vcaStatus'] = status_dict
156
157 # update configurationStatus for this VCA
158 try:
159 vca_index = int(path[path.rfind(".")+1:])
160
161 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
162 vca_status = vca_list[vca_index].get('status')
163
164 configuration_status_list = nsr.get('configurationStatus')
165 config_status = configuration_status_list[vca_index].get('status')
166
167 if config_status == 'BROKEN' and vca_status != 'failed':
168 db_dict['configurationStatus'][vca_index] = 'READY'
169 elif config_status != 'BROKEN' and vca_status == 'failed':
170 db_dict['configurationStatus'][vca_index] = 'BROKEN'
171 except Exception as e:
172 # not update configurationStatus
173 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
174
175 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
176 # if nsState = 'DEGRADED' check if all is OK
177 is_degraded = False
178 if current_ns_status in ('READY', 'DEGRADED'):
179 error_description = ''
180 # check machines
181 if status_dict.get('machines'):
182 for machine_id in status_dict.get('machines'):
183 machine = status_dict.get('machines').get(machine_id)
184 # check machine agent-status
185 if machine.get('agent-status'):
186 s = machine.get('agent-status').get('status')
187 if s != 'started':
188 is_degraded = True
189 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
190 # check machine instance status
191 if machine.get('instance-status'):
192 s = machine.get('instance-status').get('status')
193 if s != 'running':
194 is_degraded = True
195 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
196 # check applications
197 if status_dict.get('applications'):
198 for app_id in status_dict.get('applications'):
199 app = status_dict.get('applications').get(app_id)
200 # check application status
201 if app.get('status'):
202 s = app.get('status').get('status')
203 if s != 'active':
204 is_degraded = True
205 error_description += 'application {} status={} ; '.format(app_id, s)
206
207 if error_description:
208 db_dict['errorDescription'] = error_description
209 if current_ns_status == 'READY' and is_degraded:
210 db_dict['nsState'] = 'DEGRADED'
211 if current_ns_status == 'DEGRADED' and not is_degraded:
212 db_dict['nsState'] = 'READY'
213
214 # write to database
215 self.update_db_2("nsrs", nsr_id, db_dict)
216
217 except (asyncio.CancelledError, asyncio.TimeoutError):
218 raise
219 except Exception as e:
220 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
221
222 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
223 """
224 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
225 :param vnfd: input vnfd
226 :param new_id: overrides vnf id if provided
227 :param additionalParams: Instantiation params for VNFs provided
228 :param nsrId: Id of the NSR
229 :return: copy of vnfd
230 """
231 try:
232 vnfd_RO = deepcopy(vnfd)
233 # remove unused by RO configuration, monitoring, scaling and internal keys
234 vnfd_RO.pop("_id", None)
235 vnfd_RO.pop("_admin", None)
236 vnfd_RO.pop("vnf-configuration", None)
237 vnfd_RO.pop("monitoring-param", None)
238 vnfd_RO.pop("scaling-group-descriptor", None)
239 vnfd_RO.pop("kdu", None)
240 vnfd_RO.pop("k8s-cluster", None)
241 if new_id:
242 vnfd_RO["id"] = new_id
243
244 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
245 for vdu in get_iterable(vnfd_RO, "vdu"):
246 cloud_init_file = None
247 if vdu.get("cloud-init-file"):
248 base_folder = vnfd["_admin"]["storage"]
249 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
250 vdu["cloud-init-file"])
251 with self.fs.file_open(cloud_init_file, "r") as ci_file:
252 cloud_init_content = ci_file.read()
253 vdu.pop("cloud-init-file", None)
254 elif vdu.get("cloud-init"):
255 cloud_init_content = vdu["cloud-init"]
256 else:
257 continue
258
259 env = Environment()
260 ast = env.parse(cloud_init_content)
261 mandatory_vars = meta.find_undeclared_variables(ast)
262 if mandatory_vars:
263 for var in mandatory_vars:
264 if not additionalParams or var not in additionalParams.keys():
265 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
266 "file, must be provided in the instantiation parameters inside the "
267 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
268 template = Template(cloud_init_content)
269 cloud_init_content = template.render(additionalParams or {})
270 vdu["cloud-init"] = cloud_init_content
271
272 return vnfd_RO
273 except FsException as e:
274 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
275 format(vnfd["id"], vdu["id"], cloud_init_file, e))
276 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd["id"], vdu["id"], e))
279
280 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
281 """
282 Creates a RO ns descriptor from OSM ns_instantiate params
283 :param ns_params: OSM instantiate params
284 :return: The RO ns descriptor
285 """
286 vim_2_RO = {}
287 wim_2_RO = {}
288 # TODO feature 1417: Check that no instantiation is set over PDU
289 # check if PDU forces a concrete vim-network-id and add it
290 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
291
292 def vim_account_2_RO(vim_account):
293 if vim_account in vim_2_RO:
294 return vim_2_RO[vim_account]
295
296 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
297 if db_vim["_admin"]["operationalState"] != "ENABLED":
298 raise LcmException("VIM={} is not available. operationalState={}".format(
299 vim_account, db_vim["_admin"]["operationalState"]))
300 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
301 vim_2_RO[vim_account] = RO_vim_id
302 return RO_vim_id
303
304 def wim_account_2_RO(wim_account):
305 if isinstance(wim_account, str):
306 if wim_account in wim_2_RO:
307 return wim_2_RO[wim_account]
308
309 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
310 if db_wim["_admin"]["operationalState"] != "ENABLED":
311 raise LcmException("WIM={} is not available. operationalState={}".format(
312 wim_account, db_wim["_admin"]["operationalState"]))
313 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
314 wim_2_RO[wim_account] = RO_wim_id
315 return RO_wim_id
316 else:
317 return wim_account
318
319 def ip_profile_2_RO(ip_profile):
320 RO_ip_profile = deepcopy((ip_profile))
321 if "dns-server" in RO_ip_profile:
322 if isinstance(RO_ip_profile["dns-server"], list):
323 RO_ip_profile["dns-address"] = []
324 for ds in RO_ip_profile.pop("dns-server"):
325 RO_ip_profile["dns-address"].append(ds['address'])
326 else:
327 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
328 if RO_ip_profile.get("ip-version") == "ipv4":
329 RO_ip_profile["ip-version"] = "IPv4"
330 if RO_ip_profile.get("ip-version") == "ipv6":
331 RO_ip_profile["ip-version"] = "IPv6"
332 if "dhcp-params" in RO_ip_profile:
333 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
334 return RO_ip_profile
335
336 if not ns_params:
337 return None
338 RO_ns_params = {
339 # "name": ns_params["nsName"],
340 # "description": ns_params.get("nsDescription"),
341 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
342 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
343 # "scenario": ns_params["nsdId"],
344 }
345
346 n2vc_key_list = n2vc_key_list or []
347 for vnfd_ref, vnfd in vnfd_dict.items():
348 vdu_needed_access = []
349 mgmt_cp = None
350 if vnfd.get("vnf-configuration"):
351 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required and vnfd.get("mgmt-interface"):
353 if vnfd["mgmt-interface"].get("vdu-id"):
354 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
355 elif vnfd["mgmt-interface"].get("cp"):
356 mgmt_cp = vnfd["mgmt-interface"]["cp"]
357
358 for vdu in vnfd.get("vdu", ()):
359 if vdu.get("vdu-configuration"):
360 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
361 if ssh_required:
362 vdu_needed_access.append(vdu["id"])
363 elif mgmt_cp:
364 for vdu_interface in vdu.get("interface"):
365 if vdu_interface.get("external-connection-point-ref") and \
366 vdu_interface["external-connection-point-ref"] == mgmt_cp:
367 vdu_needed_access.append(vdu["id"])
368 mgmt_cp = None
369 break
370
371 if vdu_needed_access:
372 for vnf_member in nsd.get("constituent-vnfd"):
373 if vnf_member["vnfd-id-ref"] != vnfd_ref:
374 continue
375 for vdu in vdu_needed_access:
376 populate_dict(RO_ns_params,
377 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
378 n2vc_key_list)
379
380 if ns_params.get("vduImage"):
381 RO_ns_params["vduImage"] = ns_params["vduImage"]
382
383 if ns_params.get("ssh_keys"):
384 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
385 for vnf_params in get_iterable(ns_params, "vnf"):
386 for constituent_vnfd in nsd["constituent-vnfd"]:
387 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
388 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
389 break
390 else:
391 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
392 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
393 if vnf_params.get("vimAccountId"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
395 vim_account_2_RO(vnf_params["vimAccountId"]))
396
397 for vdu_params in get_iterable(vnf_params, "vdu"):
398 # TODO feature 1417: check that this VDU exist and it is not a PDU
399 if vdu_params.get("volume"):
400 for volume_params in vdu_params["volume"]:
401 if volume_params.get("vim-volume-id"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
404 volume_params["vim-volume-id"])
405 if vdu_params.get("interface"):
406 for interface_params in vdu_params["interface"]:
407 if interface_params.get("ip-address"):
408 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
409 vdu_params["id"], "interfaces", interface_params["name"],
410 "ip_address"),
411 interface_params["ip-address"])
412 if interface_params.get("mac-address"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
414 vdu_params["id"], "interfaces", interface_params["name"],
415 "mac_address"),
416 interface_params["mac-address"])
417 if interface_params.get("floating-ip-required"):
418 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
419 vdu_params["id"], "interfaces", interface_params["name"],
420 "floating-ip"),
421 interface_params["floating-ip-required"])
422
423 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
424 if internal_vld_params.get("vim-network-name"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
426 internal_vld_params["name"], "vim-network-name"),
427 internal_vld_params["vim-network-name"])
428 if internal_vld_params.get("vim-network-id"):
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "vim-network-id"),
431 internal_vld_params["vim-network-id"])
432 if internal_vld_params.get("ip-profile"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
434 internal_vld_params["name"], "ip-profile"),
435 ip_profile_2_RO(internal_vld_params["ip-profile"]))
436 if internal_vld_params.get("provider-network"):
437
438 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
439 internal_vld_params["name"], "provider-network"),
440 internal_vld_params["provider-network"].copy())
441
442 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
443 # look for interface
444 iface_found = False
445 for vdu_descriptor in vnf_descriptor["vdu"]:
446 for vdu_interface in vdu_descriptor["interface"]:
447 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
448 if icp_params.get("ip-address"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_descriptor["id"], "interfaces",
451 vdu_interface["name"], "ip_address"),
452 icp_params["ip-address"])
453
454 if icp_params.get("mac-address"):
455 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
456 vdu_descriptor["id"], "interfaces",
457 vdu_interface["name"], "mac_address"),
458 icp_params["mac-address"])
459 iface_found = True
460 break
461 if iface_found:
462 break
463 else:
464 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
465 "internal-vld:id-ref={} is not present at vnfd:internal-"
466 "connection-point".format(vnf_params["member-vnf-index"],
467 icp_params["id-ref"]))
468
469 for vld_params in get_iterable(ns_params, "vld"):
470 if "ip-profile" in vld_params:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
472 ip_profile_2_RO(vld_params["ip-profile"]))
473
474 if vld_params.get("provider-network"):
475
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
477 vld_params["provider-network"].copy())
478
479 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
480 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
481 wim_account_2_RO(vld_params["wimAccountId"])),
482 if vld_params.get("vim-network-name"):
483 RO_vld_sites = []
484 if isinstance(vld_params["vim-network-name"], dict):
485 for vim_account, vim_net in vld_params["vim-network-name"].items():
486 RO_vld_sites.append({
487 "netmap-use": vim_net,
488 "datacenter": vim_account_2_RO(vim_account)
489 })
490 else: # isinstance str
491 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
492 if RO_vld_sites:
493 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
494
495 if vld_params.get("vim-network-id"):
496 RO_vld_sites = []
497 if isinstance(vld_params["vim-network-id"], dict):
498 for vim_account, vim_net in vld_params["vim-network-id"].items():
499 RO_vld_sites.append({
500 "netmap-use": vim_net,
501 "datacenter": vim_account_2_RO(vim_account)
502 })
503 else: # isinstance str
504 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
505 if RO_vld_sites:
506 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
507 if vld_params.get("ns-net"):
508 if isinstance(vld_params["ns-net"], dict):
509 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
510 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
512 if "vnfd-connection-point-ref" in vld_params:
513 for cp_params in vld_params["vnfd-connection-point-ref"]:
514 # look for interface
515 for constituent_vnfd in nsd["constituent-vnfd"]:
516 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
517 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
518 break
519 else:
520 raise LcmException(
521 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
522 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
523 match_cp = False
524 for vdu_descriptor in vnf_descriptor["vdu"]:
525 for interface_descriptor in vdu_descriptor["interface"]:
526 if interface_descriptor.get("external-connection-point-ref") == \
527 cp_params["vnfd-connection-point-ref"]:
528 match_cp = True
529 break
530 if match_cp:
531 break
532 else:
533 raise LcmException(
534 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
535 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
536 cp_params["member-vnf-index-ref"],
537 cp_params["vnfd-connection-point-ref"],
538 vnf_descriptor["id"]))
539 if cp_params.get("ip-address"):
540 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
541 vdu_descriptor["id"], "interfaces",
542 interface_descriptor["name"], "ip_address"),
543 cp_params["ip-address"])
544 if cp_params.get("mac-address"):
545 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
546 vdu_descriptor["id"], "interfaces",
547 interface_descriptor["name"], "mac_address"),
548 cp_params["mac-address"])
549 return RO_ns_params
550
551 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
552 # make a copy to do not change
553 vdu_create = copy(vdu_create)
554 vdu_delete = copy(vdu_delete)
555
556 vdurs = db_vnfr.get("vdur")
557 if vdurs is None:
558 vdurs = []
559 vdu_index = len(vdurs)
560 while vdu_index:
561 vdu_index -= 1
562 vdur = vdurs[vdu_index]
563 if vdur.get("pdu-type"):
564 continue
565 vdu_id_ref = vdur["vdu-id-ref"]
566 if vdu_create and vdu_create.get(vdu_id_ref):
567 for index in range(0, vdu_create[vdu_id_ref]):
568 vdur = deepcopy(vdur)
569 vdur["_id"] = str(uuid4())
570 vdur["count-index"] += 1
571 vdurs.insert(vdu_index+1+index, vdur)
572 del vdu_create[vdu_id_ref]
573 if vdu_delete and vdu_delete.get(vdu_id_ref):
574 del vdurs[vdu_index]
575 vdu_delete[vdu_id_ref] -= 1
576 if not vdu_delete[vdu_id_ref]:
577 del vdu_delete[vdu_id_ref]
578 # check all operations are done
579 if vdu_create or vdu_delete:
580 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
581 vdu_create))
582 if vdu_delete:
583 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
584 vdu_delete))
585
586 vnfr_update = {"vdur": vdurs}
587 db_vnfr["vdur"] = vdurs
588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
589
590 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
591 """
592 Updates database nsr with the RO info for the created vld
593 :param ns_update_nsr: dictionary to be filled with the updated info
594 :param db_nsr: content of db_nsr. This is also modified
595 :param nsr_desc_RO: nsr descriptor from RO
596 :return: Nothing, LcmException is raised on errors
597 """
598
599 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
600 for net_RO in get_iterable(nsr_desc_RO, "nets"):
601 if vld["id"] != net_RO.get("ns_net_osm_id"):
602 continue
603 vld["vim-id"] = net_RO.get("vim_net_id")
604 vld["name"] = net_RO.get("vim_name")
605 vld["status"] = net_RO.get("status")
606 vld["status-detailed"] = net_RO.get("error_msg")
607 ns_update_nsr["vld.{}".format(vld_index)] = vld
608 break
609 else:
610 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
611
612 def set_vnfr_at_error(self, db_vnfrs, error_text):
613 try:
614 for db_vnfr in db_vnfrs.values():
615 vnfr_update = {"status": "ERROR"}
616 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
617 if "status" not in vdur:
618 vdur["status"] = "ERROR"
619 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
620 if error_text:
621 vdur["status-detailed"] = str(error_text)
622 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
623 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
624 except DbException as e:
625 self.logger.error("Cannot update vnf. {}".format(e))
626
627 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
628 """
629 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
630 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
631 :param nsr_desc_RO: nsr descriptor from RO
632 :return: Nothing, LcmException is raised on errors
633 """
634 for vnf_index, db_vnfr in db_vnfrs.items():
635 for vnf_RO in nsr_desc_RO["vnfs"]:
636 if vnf_RO["member_vnf_index"] != vnf_index:
637 continue
638 vnfr_update = {}
639 if vnf_RO.get("ip_address"):
640 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
641 elif not db_vnfr.get("ip-address"):
642 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
643 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
644
645 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
646 vdur_RO_count_index = 0
647 if vdur.get("pdu-type"):
648 continue
649 for vdur_RO in get_iterable(vnf_RO, "vms"):
650 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
651 continue
652 if vdur["count-index"] != vdur_RO_count_index:
653 vdur_RO_count_index += 1
654 continue
655 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
656 if vdur_RO.get("ip_address"):
657 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
658 else:
659 vdur["ip-address"] = None
660 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
661 vdur["name"] = vdur_RO.get("vim_name")
662 vdur["status"] = vdur_RO.get("status")
663 vdur["status-detailed"] = vdur_RO.get("error_msg")
664 for ifacer in get_iterable(vdur, "interfaces"):
665 for interface_RO in get_iterable(vdur_RO, "interfaces"):
666 if ifacer["name"] == interface_RO.get("internal_name"):
667 ifacer["ip-address"] = interface_RO.get("ip_address")
668 ifacer["mac-address"] = interface_RO.get("mac_address")
669 break
670 else:
671 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info"
673 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
678 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
679
680 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
681 for net_RO in get_iterable(nsr_desc_RO, "nets"):
682 if vld["id"] != net_RO.get("vnf_net_osm_id"):
683 continue
684 vld["vim-id"] = net_RO.get("vim_net_id")
685 vld["name"] = net_RO.get("vim_name")
686 vld["status"] = net_RO.get("status")
687 vld["status-detailed"] = net_RO.get("error_msg")
688 vnfr_update["vld.{}".format(vld_index)] = vld
689 break
690 else:
691 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
692 vnf_index, vld["id"]))
693
694 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
695 break
696
697 else:
698 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
699
700 def _get_ns_config_info(self, nsr_id):
701 """
702 Generates a mapping between vnf,vdu elements and the N2VC id
703 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
704 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
705 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
706 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
707 """
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
710 mapping = {}
711 ns_config_info = {"osm-config-mapping": mapping}
712 for vca in vca_deployed_list:
713 if not vca["member-vnf-index"]:
714 continue
715 if not vca["vdu_id"]:
716 mapping[vca["member-vnf-index"]] = vca["application"]
717 else:
718 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
719 vca["application"]
720 return ns_config_info
721
722 @staticmethod
723 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
724 """
725 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
726 primitives as verify-ssh-credentials, or config when needed
727 :param desc_primitive_list: information of the descriptor
728 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
729 this element contains a ssh public key
730 :return: The modified list. Can ba an empty list, but always a list
731 """
732 if desc_primitive_list:
733 primitive_list = desc_primitive_list.copy()
734 else:
735 primitive_list = []
736 # look for primitive config, and get the position. None if not present
737 config_position = None
738 for index, primitive in enumerate(primitive_list):
739 if primitive["name"] == "config":
740 config_position = index
741 break
742
743 # for NS, add always a config primitive if not present (bug 874)
744 if not vca_deployed["member-vnf-index"] and config_position is None:
745 primitive_list.insert(0, {"name": "config", "parameter": []})
746 config_position = 0
747 # for VNF/VDU add verify-ssh-credentials after config
748 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
749 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
750 return primitive_list
751
752 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
753 n2vc_key_list, stage):
754 try:
755 db_nsr_update = {}
756 RO_descriptor_number = 0 # number of descriptors created at RO
757 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
758 nslcmop_id = db_nslcmop["_id"]
759 start_deploy = time()
760 ns_params = db_nslcmop.get("operationParams")
761 if ns_params and ns_params.get("timeout_ns_deploy"):
762 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
763 else:
764 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
765
766 # Check for and optionally request placement optimization. Database will be updated if placement activated
767 stage[2] = "Waiting for Placement."
768 await self._do_placement(logging_text, db_nslcmop, db_vnfrs)
769
770 # deploy RO
771
772 # get vnfds, instantiate at RO
773 for c_vnf in nsd.get("constituent-vnfd", ()):
774 member_vnf_index = c_vnf["member-vnf-index"]
775 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
776 vnfd_ref = vnfd["id"]
777
778 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
779 db_nsr_update["detailed-status"] = " ".join(stage)
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 self._write_op_status(nslcmop_id, stage)
782
783 # self.logger.debug(logging_text + stage[2])
784 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
785 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
786 RO_descriptor_number += 1
787
788 # look position at deployed.RO.vnfd if not present it will be appended at the end
789 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
790 if vnf_deployed["member-vnf-index"] == member_vnf_index:
791 break
792 else:
793 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
794 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
795
796 # look if present
797 RO_update = {"member-vnf-index": member_vnf_index}
798 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
799 if vnfd_list:
800 RO_update["id"] = vnfd_list[0]["uuid"]
801 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
802 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
803 else:
804 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
805 get("additionalParamsForVnf"), nsr_id)
806 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
807 RO_update["id"] = desc["uuid"]
808 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
809 vnfd_ref, member_vnf_index, desc["uuid"]))
810 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
811 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
812
813 # create nsd at RO
814 nsd_ref = nsd["id"]
815
816 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
817 db_nsr_update["detailed-status"] = " ".join(stage)
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819 self._write_op_status(nslcmop_id, stage)
820
821 # self.logger.debug(logging_text + stage[2])
822 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
823 RO_descriptor_number += 1
824 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
825 if nsd_list:
826 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
827 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
828 nsd_ref, RO_nsd_uuid))
829 else:
830 nsd_RO = deepcopy(nsd)
831 nsd_RO["id"] = RO_osm_nsd_id
832 nsd_RO.pop("_id", None)
833 nsd_RO.pop("_admin", None)
834 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
835 member_vnf_index = c_vnf["member-vnf-index"]
836 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
837 for c_vld in nsd_RO.get("vld", ()):
838 for cp in c_vld.get("vnfd-connection-point-ref", ()):
839 member_vnf_index = cp["member-vnf-index-ref"]
840 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
841
842 desc = await self.RO.create("nsd", descriptor=nsd_RO)
843 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
844 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
845 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
846 self.update_db_2("nsrs", nsr_id, db_nsr_update)
847
848 # Crate ns at RO
849 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853
854 # if present use it unless in error status
855 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
856 if RO_nsr_id:
857 try:
858 stage[2] = "Looking for existing ns at RO"
859 db_nsr_update["detailed-status"] = " ".join(stage)
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._write_op_status(nslcmop_id, stage)
862 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
863 desc = await self.RO.show("ns", RO_nsr_id)
864
865 except ROclient.ROClientException as e:
866 if e.http_code != HTTPStatus.NOT_FOUND:
867 raise
868 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
869 if RO_nsr_id:
870 ns_status, ns_status_info = self.RO.check_ns_status(desc)
871 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
872 if ns_status == "ERROR":
873 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
874 self.logger.debug(logging_text + stage[2])
875 await self.RO.delete("ns", RO_nsr_id)
876 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
877 if not RO_nsr_id:
878 stage[2] = "Checking dependencies"
879 db_nsr_update["detailed-status"] = " ".join(stage)
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 self._write_op_status(nslcmop_id, stage)
882 # self.logger.debug(logging_text + stage[2])
883
884 # check if VIM is creating and wait look if previous tasks in process
885 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
886 if task_dependency:
887 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
888 self.logger.debug(logging_text + stage[2])
889 await asyncio.wait(task_dependency, timeout=3600)
890 if ns_params.get("vnf"):
891 for vnf in ns_params["vnf"]:
892 if "vimAccountId" in vnf:
893 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
894 vnf["vimAccountId"])
895 if task_dependency:
896 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
897 self.logger.debug(logging_text + stage[2])
898 await asyncio.wait(task_dependency, timeout=3600)
899
900 stage[2] = "Checking instantiation parameters."
901 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
902 stage[2] = "Deploying ns at VIM."
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
908 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
911 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
912
913 # wait until NS is ready
914 stage[2] = "Waiting VIM to deploy ns."
915 db_nsr_update["detailed-status"] = " ".join(stage)
916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
917 self._write_op_status(nslcmop_id, stage)
918 detailed_status_old = None
919 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
920
921 old_desc = None
922 while time() <= start_deploy + timeout_ns_deploy:
923 desc = await self.RO.show("ns", RO_nsr_id)
924
925 # deploymentStatus
926 if desc != old_desc:
927 # desc has changed => update db
928 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
929 old_desc = desc
930
931 ns_status, ns_status_info = self.RO.check_ns_status(desc)
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
933 if ns_status == "ERROR":
934 raise ROclient.ROClientException(ns_status_info)
935 elif ns_status == "BUILD":
936 stage[2] = "VIM: ({})".format(ns_status_info)
937 elif ns_status == "ACTIVE":
938 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
939 try:
940 self.ns_update_vnfr(db_vnfrs, desc)
941 break
942 except LcmExceptionNoMgmtIP:
943 pass
944 else:
945 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
946 if stage[2] != detailed_status_old:
947 detailed_status_old = stage[2]
948 db_nsr_update["detailed-status"] = " ".join(stage)
949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
950 self._write_op_status(nslcmop_id, stage)
951 await asyncio.sleep(5, loop=self.loop)
952 else: # timeout_ns_deploy
953 raise ROclient.ROClientException("Timeout waiting ns to be ready")
954
955 # Updating NSR
956 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
957
958 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
959 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
960 stage[2] = "Deployed at VIM"
961 db_nsr_update["detailed-status"] = " ".join(stage)
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 self._write_op_status(nslcmop_id, stage)
964 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
965 # self.logger.debug(logging_text + "Deployed at VIM")
966 except (ROclient.ROClientException, LcmException, DbException) as e:
967 stage[2] = "ERROR deploying at VIM"
968 self.set_vnfr_at_error(db_vnfrs, str(e))
969 raise
970
971 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
972 """
973 Wait for ip addres at RO, and optionally, insert public key in virtual machine
974 :param logging_text: prefix use for logging
975 :param nsr_id:
976 :param vnfr_id:
977 :param vdu_id:
978 :param vdu_index:
979 :param pub_key: public ssh key to inject, None to skip
980 :param user: user to apply the public ssh key
981 :return: IP address
982 """
983
984 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
985 ro_nsr_id = None
986 ip_address = None
987 nb_tries = 0
988 target_vdu_id = None
989 ro_retries = 0
990
991 while True:
992
993 ro_retries += 1
994 if ro_retries >= 360: # 1 hour
995 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
996
997 await asyncio.sleep(10, loop=self.loop)
998
999 # get ip address
1000 if not target_vdu_id:
1001 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1002
1003 if not vdu_id: # for the VNF case
1004 if db_vnfr.get("status") == "ERROR":
1005 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1006 ip_address = db_vnfr.get("ip-address")
1007 if not ip_address:
1008 continue
1009 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1010 else: # VDU case
1011 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1012 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1013
1014 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1015 vdur = db_vnfr["vdur"][0]
1016 if not vdur:
1017 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1018 vdu_index))
1019
1020 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1021 ip_address = vdur.get("ip-address")
1022 if not ip_address:
1023 continue
1024 target_vdu_id = vdur["vdu-id-ref"]
1025 elif vdur.get("status") == "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1027
1028 if not target_vdu_id:
1029 continue
1030
1031 # inject public key into machine
1032 if pub_key and user:
1033 # wait until NS is deployed at RO
1034 if not ro_nsr_id:
1035 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1036 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1037 if not ro_nsr_id:
1038 continue
1039
1040 # self.logger.debug(logging_text + "Inserting RO key")
1041 if vdur.get("pdu-type"):
1042 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1043 return ip_address
1044 try:
1045 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1046 result_dict = await self.RO.create_action(
1047 item="ns",
1048 item_id_name=ro_nsr_id,
1049 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1050 )
1051 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1052 if not result_dict or not isinstance(result_dict, dict):
1053 raise LcmException("Unknown response from RO when injecting key")
1054 for result in result_dict.values():
1055 if result.get("vim_result") == 200:
1056 break
1057 else:
1058 raise ROclient.ROClientException("error injecting key: {}".format(
1059 result.get("description")))
1060 break
1061 except ROclient.ROClientException as e:
1062 if not nb_tries:
1063 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1064 format(e, 20*10))
1065 nb_tries += 1
1066 if nb_tries >= 20:
1067 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1068 else:
1069 break
1070
1071 return ip_address
1072
1073 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1074 """
1075 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1076 """
1077 my_vca = vca_deployed_list[vca_index]
1078 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1079 # vdu or kdu: no dependencies
1080 return
1081 timeout = 300
1082 while timeout >= 0:
1083 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1084 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1085 configuration_status_list = db_nsr["configurationStatus"]
1086 for index, vca_deployed in enumerate(configuration_status_list):
1087 if index == vca_index:
1088 # myself
1089 continue
1090 if not my_vca.get("member-vnf-index") or \
1091 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1092 internal_status = configuration_status_list[index].get("status")
1093 if internal_status == 'READY':
1094 continue
1095 elif internal_status == 'BROKEN':
1096 raise LcmException("Configuration aborted because dependent charm/s has failed")
1097 else:
1098 break
1099 else:
1100 # no dependencies, return
1101 return
1102 await asyncio.sleep(10)
1103 timeout -= 1
1104
1105 raise LcmException("Configuration aborted because dependent charm/s timeout")
1106
1107 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1108 config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
1109 nsr_id = db_nsr["_id"]
1110 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1113 db_dict = {
1114 'collection': 'nsrs',
1115 'filter': {'_id': nsr_id},
1116 'path': db_update_entry
1117 }
1118 step = ""
1119 try:
1120
1121 element_type = 'NS'
1122 element_under_configuration = nsr_id
1123
1124 vnfr_id = None
1125 if db_vnfr:
1126 vnfr_id = db_vnfr["_id"]
1127
1128 namespace = "{nsi}.{ns}".format(
1129 nsi=nsi_id if nsi_id else "",
1130 ns=nsr_id)
1131
1132 if vnfr_id:
1133 element_type = 'VNF'
1134 element_under_configuration = vnfr_id
1135 namespace += ".{}".format(vnfr_id)
1136 if vdu_id:
1137 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1138 element_type = 'VDU'
1139 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1140 elif kdu_name:
1141 namespace += ".{}".format(kdu_name)
1142 element_type = 'KDU'
1143 element_under_configuration = kdu_name
1144
1145 # Get artifact path
1146 self.fs.sync() # Sync from FSMongo
1147 artifact_path = "{}/{}/charms/{}".format(
1148 base_folder["folder"],
1149 base_folder["pkg-dir"],
1150 config_descriptor["juju"]["charm"]
1151 )
1152
1153 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1154 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1155 is_proxy_charm = False
1156
1157 # n2vc_redesign STEP 3.1
1158
1159 # find old ee_id if exists
1160 ee_id = vca_deployed.get("ee_id")
1161
1162 # create or register execution environment in VCA
1163 if is_proxy_charm:
1164
1165 self._write_configuration_status(
1166 nsr_id=nsr_id,
1167 vca_index=vca_index,
1168 status='CREATING',
1169 element_under_configuration=element_under_configuration,
1170 element_type=element_type
1171 )
1172
1173 step = "create execution environment"
1174 self.logger.debug(logging_text + step)
1175 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1176 reuse_ee_id=ee_id,
1177 db_dict=db_dict)
1178
1179 else:
1180 step = "Waiting to VM being up and getting IP address"
1181 self.logger.debug(logging_text + step)
1182 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1183 user=None, pub_key=None)
1184 credentials = {"hostname": rw_mgmt_ip}
1185 # get username
1186 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1187 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1188 # merged. Meanwhile let's get username from initial-config-primitive
1189 if not username and config_descriptor.get("initial-config-primitive"):
1190 for config_primitive in config_descriptor["initial-config-primitive"]:
1191 for param in config_primitive.get("parameter", ()):
1192 if param["name"] == "ssh-username":
1193 username = param["value"]
1194 break
1195 if not username:
1196 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1197 "'config-access.ssh-access.default-user'")
1198 credentials["username"] = username
1199 # n2vc_redesign STEP 3.2
1200
1201 self._write_configuration_status(
1202 nsr_id=nsr_id,
1203 vca_index=vca_index,
1204 status='REGISTERING',
1205 element_under_configuration=element_under_configuration,
1206 element_type=element_type
1207 )
1208
1209 step = "register execution environment {}".format(credentials)
1210 self.logger.debug(logging_text + step)
1211 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1212 db_dict=db_dict)
1213
1214 # for compatibility with MON/POL modules, the need model and application name at database
1215 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1216 ee_id_parts = ee_id.split('.')
1217 model_name = ee_id_parts[0]
1218 application_name = ee_id_parts[1]
1219 db_nsr_update = {db_update_entry + "model": model_name,
1220 db_update_entry + "application": application_name,
1221 db_update_entry + "ee_id": ee_id}
1222
1223 # n2vc_redesign STEP 3.3
1224
1225 step = "Install configuration Software"
1226
1227 self._write_configuration_status(
1228 nsr_id=nsr_id,
1229 vca_index=vca_index,
1230 status='INSTALLING SW',
1231 element_under_configuration=element_under_configuration,
1232 element_type=element_type,
1233 other_update=db_nsr_update
1234 )
1235
1236 # TODO check if already done
1237 self.logger.debug(logging_text + step)
1238 config = None
1239 if not is_proxy_charm:
1240 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1241 if initial_config_primitive_list:
1242 for primitive in initial_config_primitive_list:
1243 if primitive["name"] == "config":
1244 config = self._map_primitive_params(
1245 primitive,
1246 {},
1247 deploy_params
1248 )
1249 break
1250 await self.n2vc.install_configuration_sw(
1251 ee_id=ee_id,
1252 artifact_path=artifact_path,
1253 db_dict=db_dict,
1254 config=config
1255 )
1256
1257 # write in db flag of configuration_sw already installed
1258 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1259
1260 # add relations for this VCA (wait for other peers related with this VCA)
1261 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1262
1263 # if SSH access is required, then get execution environment SSH public
1264 if is_proxy_charm: # if native charm we have waited already to VM be UP
1265 pub_key = None
1266 user = None
1267 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1268 # Needed to inject a ssh key
1269 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1270 step = "Install configuration Software, getting public ssh key"
1271 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1272
1273 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1274 else:
1275 step = "Waiting to VM being up and getting IP address"
1276 self.logger.debug(logging_text + step)
1277
1278 # n2vc_redesign STEP 5.1
1279 # wait for RO (ip-address) Insert pub_key into VM
1280 if vnfr_id:
1281 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1282 user=user, pub_key=pub_key)
1283 else:
1284 rw_mgmt_ip = None # This is for a NS configuration
1285
1286 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1287
1288 # store rw_mgmt_ip in deploy params for later replacement
1289 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1290
1291 # n2vc_redesign STEP 6 Execute initial config primitive
1292 step = 'execute initial config primitive'
1293 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1294
1295 # sort initial config primitives by 'seq'
1296 if initial_config_primitive_list:
1297 try:
1298 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1299 except Exception as e:
1300 self.logger.error(logging_text + step + ": " + str(e))
1301 else:
1302 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1303
1304 # add config if not present for NS charm
1305 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1306 vca_deployed)
1307
1308 # wait for dependent primitives execution (NS -> VNF -> VDU)
1309 if initial_config_primitive_list:
1310 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1311
1312 # stage, in function of element type: vdu, kdu, vnf or ns
1313 my_vca = vca_deployed_list[vca_index]
1314 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1315 # VDU or KDU
1316 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1317 elif my_vca.get("member-vnf-index"):
1318 # VNF
1319 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1320 else:
1321 # NS
1322 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1323
1324 self._write_configuration_status(
1325 nsr_id=nsr_id,
1326 vca_index=vca_index,
1327 status='EXECUTING PRIMITIVE'
1328 )
1329
1330 self._write_op_status(
1331 op_id=nslcmop_id,
1332 stage=stage
1333 )
1334
1335 check_if_terminated_needed = True
1336 for initial_config_primitive in initial_config_primitive_list:
1337 # adding information on the vca_deployed if it is a NS execution environment
1338 if not vca_deployed["member-vnf-index"]:
1339 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1340 # TODO check if already done
1341 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1342
1343 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1344 self.logger.debug(logging_text + step)
1345 await self.n2vc.exec_primitive(
1346 ee_id=ee_id,
1347 primitive_name=initial_config_primitive["name"],
1348 params_dict=primitive_params_,
1349 db_dict=db_dict
1350 )
1351 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1352 if check_if_terminated_needed:
1353 if config_descriptor.get('terminate-config-primitive'):
1354 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1355 check_if_terminated_needed = False
1356
1357 # TODO register in database that primitive is done
1358
1359 step = "instantiated at VCA"
1360 self.logger.debug(logging_text + step)
1361
1362 self._write_configuration_status(
1363 nsr_id=nsr_id,
1364 vca_index=vca_index,
1365 status='READY'
1366 )
1367
1368 except Exception as e: # TODO not use Exception but N2VC exception
1369 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1370 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1371 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1372 self._write_configuration_status(
1373 nsr_id=nsr_id,
1374 vca_index=vca_index,
1375 status='BROKEN'
1376 )
1377 raise LcmException("{} {}".format(step, e)) from e
1378
1379 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1380 error_description: str = None, error_detail: str = None, other_update: dict = None):
1381 """
1382 Update db_nsr fields.
1383 :param nsr_id:
1384 :param ns_state:
1385 :param current_operation:
1386 :param current_operation_id:
1387 :param error_description:
1388 :param error_detail:
1389 :param other_update: Other required changes at database if provided, will be cleared
1390 :return:
1391 """
1392 try:
1393 db_dict = other_update or {}
1394 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1395 db_dict["_admin.current-operation"] = current_operation_id
1396 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1397 db_dict["currentOperation"] = current_operation
1398 db_dict["currentOperationID"] = current_operation_id
1399 db_dict["errorDescription"] = error_description
1400 db_dict["errorDetail"] = error_detail
1401
1402 if ns_state:
1403 db_dict["nsState"] = ns_state
1404 self.update_db_2("nsrs", nsr_id, db_dict)
1405 except DbException as e:
1406 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1407
1408 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1409 operation_state: str = None, other_update: dict = None):
1410 try:
1411 db_dict = other_update or {}
1412 db_dict['queuePosition'] = queuePosition
1413 if isinstance(stage, list):
1414 db_dict['stage'] = stage[0]
1415 db_dict['detailed-status'] = " ".join(stage)
1416 elif stage is not None:
1417 db_dict['stage'] = str(stage)
1418
1419 if error_message is not None:
1420 db_dict['errorMessage'] = error_message
1421 if operation_state is not None:
1422 db_dict['operationState'] = operation_state
1423 db_dict["statusEnteredTime"] = time()
1424 self.update_db_2("nslcmops", op_id, db_dict)
1425 except DbException as e:
1426 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1427
1428 def _write_all_config_status(self, db_nsr: dict, status: str):
1429 try:
1430 nsr_id = db_nsr["_id"]
1431 # configurationStatus
1432 config_status = db_nsr.get('configurationStatus')
1433 if config_status:
1434 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1435 enumerate(config_status) if v}
1436 # update status
1437 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1438
1439 except DbException as e:
1440 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1441
1442 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1443 element_under_configuration: str = None, element_type: str = None,
1444 other_update: dict = None):
1445
1446 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1447 # .format(vca_index, status))
1448
1449 try:
1450 db_path = 'configurationStatus.{}.'.format(vca_index)
1451 db_dict = other_update or {}
1452 if status:
1453 db_dict[db_path + 'status'] = status
1454 if element_under_configuration:
1455 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1456 if element_type:
1457 db_dict[db_path + 'elementType'] = element_type
1458 self.update_db_2("nsrs", nsr_id, db_dict)
1459 except DbException as e:
1460 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1461 .format(status, nsr_id, vca_index, e))
1462
1463 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1464 """
1465 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1466 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1467 Database is used because the result can be obtained from a different LCM worker in case of HA.
1468 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1469 :param db_nslcmop: database content of nslcmop
1470 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1471 :return: None. Modifies database vnfrs and parameter db_vnfr with the computed 'vim-account-id'
1472 """
1473 nslcmop_id = db_nslcmop['_id']
1474 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1475 if placement_engine == "PLA":
1476 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1477 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1478 db_poll_interval = 5
1479 wait = db_poll_interval * 10
1480 pla_result = None
1481 while not pla_result and wait >= 0:
1482 await asyncio.sleep(db_poll_interval)
1483 wait -= db_poll_interval
1484 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1485 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1486
1487 if not pla_result:
1488 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1489
1490 for pla_vnf in pla_result['vnf']:
1491 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1492 if not pla_vnf.get('vimAccountId') or not vnfr:
1493 continue
1494 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1495 # Modifies db_vnfrs
1496 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1497 return
1498
1499 def update_nsrs_with_pla_result(self, params):
1500 try:
1501 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1502 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1503 except Exception as e:
1504 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1505
1506 async def instantiate(self, nsr_id, nslcmop_id):
1507 """
1508
1509 :param nsr_id: ns instance to deploy
1510 :param nslcmop_id: operation to run
1511 :return:
1512 """
1513
1514 # Try to lock HA task here
1515 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1516 if not task_is_locked_by_me:
1517 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1518 return
1519
1520 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1521 self.logger.debug(logging_text + "Enter")
1522
1523 # get all needed from database
1524
1525 # database nsrs record
1526 db_nsr = None
1527
1528 # database nslcmops record
1529 db_nslcmop = None
1530
1531 # update operation on nsrs
1532 db_nsr_update = {}
1533 # update operation on nslcmops
1534 db_nslcmop_update = {}
1535
1536 nslcmop_operation_state = None
1537 db_vnfrs = {} # vnf's info indexed by member-index
1538 # n2vc_info = {}
1539 tasks_dict_info = {} # from task to info text
1540 exc = None
1541 error_list = []
1542 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1543 # ^ stage, step, VIM progress
1544 try:
1545 # wait for any previous tasks in process
1546 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1547
1548 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1549 stage[1] = "Reading from database,"
1550 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1551 db_nsr_update["detailed-status"] = "creating"
1552 db_nsr_update["operational-status"] = "init"
1553 self._write_ns_status(
1554 nsr_id=nsr_id,
1555 ns_state="BUILDING",
1556 current_operation="INSTANTIATING",
1557 current_operation_id=nslcmop_id,
1558 other_update=db_nsr_update
1559 )
1560 self._write_op_status(
1561 op_id=nslcmop_id,
1562 stage=stage,
1563 queuePosition=0
1564 )
1565
1566 # read from db: operation
1567 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1568 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1569 ns_params = db_nslcmop.get("operationParams")
1570 if ns_params and ns_params.get("timeout_ns_deploy"):
1571 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1572 else:
1573 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1574
1575 # read from db: ns
1576 stage[1] = "Getting nsr={} from db".format(nsr_id)
1577 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1578 # nsd is replicated into ns (no db read)
1579 nsd = db_nsr["nsd"]
1580 # nsr_name = db_nsr["name"] # TODO short-name??
1581
1582 # read from db: vnf's of this ns
1583 stage[1] = "Getting vnfrs from db"
1584 self.logger.debug(logging_text + stage[1])
1585 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1586
1587 # read from db: vnfd's for every vnf
1588 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1589 db_vnfds = {} # every vnfd data indexed by vnf id
1590 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1591
1592 # for each vnf in ns, read vnfd
1593 for vnfr in db_vnfrs_list:
1594 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1595 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1596 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1597 # if we haven't this vnfd, read it from db
1598 if vnfd_id not in db_vnfds:
1599 # read from db
1600 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1601 self.logger.debug(logging_text + stage[1])
1602 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1603
1604 # store vnfd
1605 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1606 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1607 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1608
1609 # Get or generates the _admin.deployed.VCA list
1610 vca_deployed_list = None
1611 if db_nsr["_admin"].get("deployed"):
1612 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1613 if vca_deployed_list is None:
1614 vca_deployed_list = []
1615 configuration_status_list = []
1616 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1617 db_nsr_update["configurationStatus"] = configuration_status_list
1618 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1619 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1620 elif isinstance(vca_deployed_list, dict):
1621 # maintain backward compatibility. Change a dict to list at database
1622 vca_deployed_list = list(vca_deployed_list.values())
1623 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1624 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1625
1626 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1627 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1628 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1629
1630 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1631 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1632 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1633
1634 # n2vc_redesign STEP 2 Deploy Network Scenario
1635 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1636 self._write_op_status(
1637 op_id=nslcmop_id,
1638 stage=stage
1639 )
1640
1641 stage[1] = "Deploying KDUs,"
1642 # self.logger.debug(logging_text + "Before deploy_kdus")
1643 # Call to deploy_kdus in case exists the "vdu:kdu" param
1644 await self.deploy_kdus(
1645 logging_text=logging_text,
1646 nsr_id=nsr_id,
1647 nslcmop_id=nslcmop_id,
1648 db_vnfrs=db_vnfrs,
1649 db_vnfds=db_vnfds,
1650 task_instantiation_info=tasks_dict_info,
1651 )
1652
1653 stage[1] = "Getting VCA public key."
1654 # n2vc_redesign STEP 1 Get VCA public ssh-key
1655 # feature 1429. Add n2vc public key to needed VMs
1656 n2vc_key = self.n2vc.get_public_key()
1657 n2vc_key_list = [n2vc_key]
1658 if self.vca_config.get("public_key"):
1659 n2vc_key_list.append(self.vca_config["public_key"])
1660
1661 stage[1] = "Deploying NS at VIM."
1662 task_ro = asyncio.ensure_future(
1663 self.instantiate_RO(
1664 logging_text=logging_text,
1665 nsr_id=nsr_id,
1666 nsd=nsd,
1667 db_nsr=db_nsr,
1668 db_nslcmop=db_nslcmop,
1669 db_vnfrs=db_vnfrs,
1670 db_vnfds_ref=db_vnfds_ref,
1671 n2vc_key_list=n2vc_key_list,
1672 stage=stage
1673 )
1674 )
1675 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1676 tasks_dict_info[task_ro] = "Deploying at VIM"
1677
1678 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1679 stage[1] = "Deploying Execution Environments."
1680 self.logger.debug(logging_text + stage[1])
1681
1682 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1683 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1684 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1685 vnfd_id = c_vnf["vnfd-id-ref"]
1686 vnfd = db_vnfds_ref[vnfd_id]
1687 member_vnf_index = str(c_vnf["member-vnf-index"])
1688 db_vnfr = db_vnfrs[member_vnf_index]
1689 base_folder = vnfd["_admin"]["storage"]
1690 vdu_id = None
1691 vdu_index = 0
1692 vdu_name = None
1693 kdu_name = None
1694
1695 # Get additional parameters
1696 deploy_params = {}
1697 if db_vnfr.get("additionalParamsForVnf"):
1698 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1699
1700 descriptor_config = vnfd.get("vnf-configuration")
1701 if descriptor_config and descriptor_config.get("juju"):
1702 self._deploy_n2vc(
1703 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1704 db_nsr=db_nsr,
1705 db_vnfr=db_vnfr,
1706 nslcmop_id=nslcmop_id,
1707 nsr_id=nsr_id,
1708 nsi_id=nsi_id,
1709 vnfd_id=vnfd_id,
1710 vdu_id=vdu_id,
1711 kdu_name=kdu_name,
1712 member_vnf_index=member_vnf_index,
1713 vdu_index=vdu_index,
1714 vdu_name=vdu_name,
1715 deploy_params=deploy_params,
1716 descriptor_config=descriptor_config,
1717 base_folder=base_folder,
1718 task_instantiation_info=tasks_dict_info,
1719 stage=stage
1720 )
1721
1722 # Deploy charms for each VDU that supports one.
1723 for vdud in get_iterable(vnfd, 'vdu'):
1724 vdu_id = vdud["id"]
1725 descriptor_config = vdud.get('vdu-configuration')
1726 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1727 if vdur.get("additionalParams"):
1728 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1729 else:
1730 deploy_params_vdu = deploy_params
1731 if descriptor_config and descriptor_config.get("juju"):
1732 # look for vdu index in the db_vnfr["vdu"] section
1733 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1734 # if vdur["vdu-id-ref"] == vdu_id:
1735 # break
1736 # else:
1737 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1738 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1739 # vdu_name = vdur.get("name")
1740 vdu_name = None
1741 kdu_name = None
1742 for vdu_index in range(int(vdud.get("count", 1))):
1743 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1744 self._deploy_n2vc(
1745 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1746 member_vnf_index, vdu_id, vdu_index),
1747 db_nsr=db_nsr,
1748 db_vnfr=db_vnfr,
1749 nslcmop_id=nslcmop_id,
1750 nsr_id=nsr_id,
1751 nsi_id=nsi_id,
1752 vnfd_id=vnfd_id,
1753 vdu_id=vdu_id,
1754 kdu_name=kdu_name,
1755 member_vnf_index=member_vnf_index,
1756 vdu_index=vdu_index,
1757 vdu_name=vdu_name,
1758 deploy_params=deploy_params_vdu,
1759 descriptor_config=descriptor_config,
1760 base_folder=base_folder,
1761 task_instantiation_info=tasks_dict_info,
1762 stage=stage
1763 )
1764 for kdud in get_iterable(vnfd, 'kdu'):
1765 kdu_name = kdud["name"]
1766 descriptor_config = kdud.get('kdu-configuration')
1767 if descriptor_config and descriptor_config.get("juju"):
1768 vdu_id = None
1769 vdu_index = 0
1770 vdu_name = None
1771 # look for vdu index in the db_vnfr["vdu"] section
1772 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1773 # if vdur["vdu-id-ref"] == vdu_id:
1774 # break
1775 # else:
1776 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1777 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1778 # vdu_name = vdur.get("name")
1779 # vdu_name = None
1780
1781 self._deploy_n2vc(
1782 logging_text=logging_text,
1783 db_nsr=db_nsr,
1784 db_vnfr=db_vnfr,
1785 nslcmop_id=nslcmop_id,
1786 nsr_id=nsr_id,
1787 nsi_id=nsi_id,
1788 vnfd_id=vnfd_id,
1789 vdu_id=vdu_id,
1790 kdu_name=kdu_name,
1791 member_vnf_index=member_vnf_index,
1792 vdu_index=vdu_index,
1793 vdu_name=vdu_name,
1794 deploy_params=deploy_params,
1795 descriptor_config=descriptor_config,
1796 base_folder=base_folder,
1797 task_instantiation_info=tasks_dict_info,
1798 stage=stage
1799 )
1800
1801 # Check if this NS has a charm configuration
1802 descriptor_config = nsd.get("ns-configuration")
1803 if descriptor_config and descriptor_config.get("juju"):
1804 vnfd_id = None
1805 db_vnfr = None
1806 member_vnf_index = None
1807 vdu_id = None
1808 kdu_name = None
1809 vdu_index = 0
1810 vdu_name = None
1811
1812 # Get additional parameters
1813 deploy_params = {}
1814 if db_nsr.get("additionalParamsForNs"):
1815 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1816 base_folder = nsd["_admin"]["storage"]
1817 self._deploy_n2vc(
1818 logging_text=logging_text,
1819 db_nsr=db_nsr,
1820 db_vnfr=db_vnfr,
1821 nslcmop_id=nslcmop_id,
1822 nsr_id=nsr_id,
1823 nsi_id=nsi_id,
1824 vnfd_id=vnfd_id,
1825 vdu_id=vdu_id,
1826 kdu_name=kdu_name,
1827 member_vnf_index=member_vnf_index,
1828 vdu_index=vdu_index,
1829 vdu_name=vdu_name,
1830 deploy_params=deploy_params,
1831 descriptor_config=descriptor_config,
1832 base_folder=base_folder,
1833 task_instantiation_info=tasks_dict_info,
1834 stage=stage
1835 )
1836
1837 # rest of staff will be done at finally
1838
1839 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1840 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1841 exc = e
1842 except asyncio.CancelledError:
1843 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1844 exc = "Operation was cancelled"
1845 except Exception as e:
1846 exc = traceback.format_exc()
1847 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1848 finally:
1849 if exc:
1850 error_list.append(str(exc))
1851 try:
1852 # wait for pending tasks
1853 if tasks_dict_info:
1854 stage[1] = "Waiting for instantiate pending tasks."
1855 self.logger.debug(logging_text + stage[1])
1856 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1857 stage, nslcmop_id, nsr_id=nsr_id)
1858 stage[1] = stage[2] = ""
1859 except asyncio.CancelledError:
1860 error_list.append("Cancelled")
1861 # TODO cancel all tasks
1862 except Exception as exc:
1863 error_list.append(str(exc))
1864
1865 # update operation-status
1866 db_nsr_update["operational-status"] = "running"
1867 # let's begin with VCA 'configured' status (later we can change it)
1868 db_nsr_update["config-status"] = "configured"
1869 for task, task_name in tasks_dict_info.items():
1870 if not task.done() or task.cancelled() or task.exception():
1871 if task_name.startswith(self.task_name_deploy_vca):
1872 # A N2VC task is pending
1873 db_nsr_update["config-status"] = "failed"
1874 else:
1875 # RO or KDU task is pending
1876 db_nsr_update["operational-status"] = "failed"
1877
1878 # update status at database
1879 if error_list:
1880 error_detail = ". ".join(error_list)
1881 self.logger.error(logging_text + error_detail)
1882 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
1883 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
1884
1885 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1886 db_nslcmop_update["detailed-status"] = error_detail
1887 nslcmop_operation_state = "FAILED"
1888 ns_state = "BROKEN"
1889 else:
1890 error_detail = None
1891 error_description_nsr = error_description_nslcmop = None
1892 ns_state = "READY"
1893 db_nsr_update["detailed-status"] = "Done"
1894 db_nslcmop_update["detailed-status"] = "Done"
1895 nslcmop_operation_state = "COMPLETED"
1896
1897 if db_nsr:
1898 self._write_ns_status(
1899 nsr_id=nsr_id,
1900 ns_state=ns_state,
1901 current_operation="IDLE",
1902 current_operation_id=None,
1903 error_description=error_description_nsr,
1904 error_detail=error_detail,
1905 other_update=db_nsr_update
1906 )
1907 if db_nslcmop:
1908 self._write_op_status(
1909 op_id=nslcmop_id,
1910 stage="",
1911 error_message=error_description_nslcmop,
1912 operation_state=nslcmop_operation_state,
1913 other_update=db_nslcmop_update,
1914 )
1915
1916 if nslcmop_operation_state:
1917 try:
1918 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1919 "operationState": nslcmop_operation_state},
1920 loop=self.loop)
1921 except Exception as e:
1922 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1923
1924 self.logger.debug(logging_text + "Exit")
1925 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1926
1927 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1928
1929 # steps:
1930 # 1. find all relations for this VCA
1931 # 2. wait for other peers related
1932 # 3. add relations
1933
1934 try:
1935
1936 # STEP 1: find all relations for this VCA
1937
1938 # read nsr record
1939 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1940
1941 # this VCA data
1942 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1943
1944 # read all ns-configuration relations
1945 ns_relations = list()
1946 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1947 if db_ns_relations:
1948 for r in db_ns_relations:
1949 # check if this VCA is in the relation
1950 if my_vca.get('member-vnf-index') in\
1951 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1952 ns_relations.append(r)
1953
1954 # read all vnf-configuration relations
1955 vnf_relations = list()
1956 db_vnfd_list = db_nsr.get('vnfd-id')
1957 if db_vnfd_list:
1958 for vnfd in db_vnfd_list:
1959 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1960 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1961 if db_vnf_relations:
1962 for r in db_vnf_relations:
1963 # check if this VCA is in the relation
1964 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1965 vnf_relations.append(r)
1966
1967 # if no relations, terminate
1968 if not ns_relations and not vnf_relations:
1969 self.logger.debug(logging_text + ' No relations')
1970 return True
1971
1972 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1973
1974 # add all relations
1975 start = time()
1976 while True:
1977 # check timeout
1978 now = time()
1979 if now - start >= timeout:
1980 self.logger.error(logging_text + ' : timeout adding relations')
1981 return False
1982
1983 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1984 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1985
1986 # for each defined NS relation, find the VCA's related
1987 for r in ns_relations:
1988 from_vca_ee_id = None
1989 to_vca_ee_id = None
1990 from_vca_endpoint = None
1991 to_vca_endpoint = None
1992 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1993 for vca in vca_list:
1994 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1995 and vca.get('config_sw_installed'):
1996 from_vca_ee_id = vca.get('ee_id')
1997 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1998 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1999 and vca.get('config_sw_installed'):
2000 to_vca_ee_id = vca.get('ee_id')
2001 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2002 if from_vca_ee_id and to_vca_ee_id:
2003 # add relation
2004 await self.n2vc.add_relation(
2005 ee_id_1=from_vca_ee_id,
2006 ee_id_2=to_vca_ee_id,
2007 endpoint_1=from_vca_endpoint,
2008 endpoint_2=to_vca_endpoint)
2009 # remove entry from relations list
2010 ns_relations.remove(r)
2011 else:
2012 # check failed peers
2013 try:
2014 vca_status_list = db_nsr.get('configurationStatus')
2015 if vca_status_list:
2016 for i in range(len(vca_list)):
2017 vca = vca_list[i]
2018 vca_status = vca_status_list[i]
2019 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2020 if vca_status.get('status') == 'BROKEN':
2021 # peer broken: remove relation from list
2022 ns_relations.remove(r)
2023 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2024 if vca_status.get('status') == 'BROKEN':
2025 # peer broken: remove relation from list
2026 ns_relations.remove(r)
2027 except Exception:
2028 # ignore
2029 pass
2030
2031 # for each defined VNF relation, find the VCA's related
2032 for r in vnf_relations:
2033 from_vca_ee_id = None
2034 to_vca_ee_id = None
2035 from_vca_endpoint = None
2036 to_vca_endpoint = None
2037 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2038 for vca in vca_list:
2039 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2040 from_vca_ee_id = vca.get('ee_id')
2041 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2042 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2043 to_vca_ee_id = vca.get('ee_id')
2044 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2045 if from_vca_ee_id and to_vca_ee_id:
2046 # add relation
2047 await self.n2vc.add_relation(
2048 ee_id_1=from_vca_ee_id,
2049 ee_id_2=to_vca_ee_id,
2050 endpoint_1=from_vca_endpoint,
2051 endpoint_2=to_vca_endpoint)
2052 # remove entry from relations list
2053 vnf_relations.remove(r)
2054 else:
2055 # check failed peers
2056 try:
2057 vca_status_list = db_nsr.get('configurationStatus')
2058 if vca_status_list:
2059 for i in range(len(vca_list)):
2060 vca = vca_list[i]
2061 vca_status = vca_status_list[i]
2062 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2063 if vca_status.get('status') == 'BROKEN':
2064 # peer broken: remove relation from list
2065 ns_relations.remove(r)
2066 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2067 if vca_status.get('status') == 'BROKEN':
2068 # peer broken: remove relation from list
2069 ns_relations.remove(r)
2070 except Exception:
2071 # ignore
2072 pass
2073
2074 # wait for next try
2075 await asyncio.sleep(5.0)
2076
2077 if not ns_relations and not vnf_relations:
2078 self.logger.debug('Relations added')
2079 break
2080
2081 return True
2082
2083 except Exception as e:
2084 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2085 return False
2086
2087 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2088 # Launch kdus if present in the descriptor
2089
2090 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2091
2092 def _get_cluster_id(cluster_id, cluster_type):
2093 nonlocal k8scluster_id_2_uuic
2094 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2095 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2096
2097 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2098 if not db_k8scluster:
2099 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2100 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2101 if not k8s_id:
2102 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2103 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2104 return k8s_id
2105
2106 logging_text += "Deploy kdus: "
2107 step = ""
2108 try:
2109 db_nsr_update = {"_admin.deployed.K8s": []}
2110 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2111
2112 index = 0
2113 updated_cluster_list = []
2114
2115 for vnfr_data in db_vnfrs.values():
2116 for kdur in get_iterable(vnfr_data, "kdur"):
2117 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2118 vnfd_id = vnfr_data.get('vnfd-id')
2119 namespace = kdur.get("k8s-namespace")
2120 if kdur.get("helm-chart"):
2121 kdumodel = kdur["helm-chart"]
2122 k8sclustertype = "helm-chart"
2123 elif kdur.get("juju-bundle"):
2124 kdumodel = kdur["juju-bundle"]
2125 k8sclustertype = "juju-bundle"
2126 else:
2127 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2128 "juju-bundle. Maybe an old NBI version is running".
2129 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2130 # check if kdumodel is a file and exists
2131 try:
2132 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2133 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2134 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2135 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
2136 kdumodel)
2137 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2138 kdumodel = self.fs.path + filename
2139 except (asyncio.TimeoutError, asyncio.CancelledError):
2140 raise
2141 except Exception: # it is not a file
2142 pass
2143
2144 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2145 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2146 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2147
2148 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2149 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2150 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2151 if del_repo_list or added_repo_dict:
2152 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2153 updated = {'_admin.helm_charts_added.' +
2154 item: name for item, name in added_repo_dict.items()}
2155 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2156 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2157 added_repo_dict))
2158 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2159 updated_cluster_list.append(cluster_uuid)
2160
2161 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2162 kdur["kdu-name"], k8s_cluster_id)
2163
2164 k8s_instace_info = {"kdu-instance": None,
2165 "k8scluster-uuid": cluster_uuid,
2166 "k8scluster-type": k8sclustertype,
2167 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2168 "kdu-name": kdur["kdu-name"],
2169 "kdu-model": kdumodel,
2170 "namespace": namespace}
2171 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2172 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2173
2174 db_dict = {"collection": "nsrs",
2175 "filter": {"_id": nsr_id},
2176 "path": "_admin.deployed.K8s.{}".format(index)}
2177
2178 task = asyncio.ensure_future(
2179 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2180 atomic=True, params=desc_params,
2181 db_dict=db_dict, timeout=600,
2182 kdu_name=kdur["kdu-name"], namespace=namespace))
2183
2184 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2185 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2186
2187 index += 1
2188
2189 except (LcmException, asyncio.CancelledError):
2190 raise
2191 except Exception as e:
2192 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2193 if isinstance(e, (N2VCException, DbException)):
2194 self.logger.error(logging_text + msg)
2195 else:
2196 self.logger.critical(logging_text + msg, exc_info=True)
2197 raise LcmException(msg)
2198 finally:
2199 if db_nsr_update:
2200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2201
2202 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2203 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2204 base_folder, task_instantiation_info, stage):
2205 # launch instantiate_N2VC in a asyncio task and register task object
2206 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2207 # if not found, create one entry and update database
2208
2209 # fill db_nsr._admin.deployed.VCA.<index>
2210 vca_index = -1
2211 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2212 if not vca_deployed:
2213 continue
2214 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2215 vca_deployed.get("vdu_id") == vdu_id and \
2216 vca_deployed.get("kdu_name") == kdu_name and \
2217 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2218 break
2219 else:
2220 # not found, create one.
2221 vca_deployed = {
2222 "member-vnf-index": member_vnf_index,
2223 "vdu_id": vdu_id,
2224 "kdu_name": kdu_name,
2225 "vdu_count_index": vdu_index,
2226 "operational-status": "init", # TODO revise
2227 "detailed-status": "", # TODO revise
2228 "step": "initial-deploy", # TODO revise
2229 "vnfd_id": vnfd_id,
2230 "vdu_name": vdu_name,
2231 }
2232 vca_index += 1
2233
2234 # create VCA and configurationStatus in db
2235 db_dict = {
2236 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2237 "configurationStatus.{}".format(vca_index): dict()
2238 }
2239 self.update_db_2("nsrs", nsr_id, db_dict)
2240
2241 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2242
2243 # Launch task
2244 task_n2vc = asyncio.ensure_future(
2245 self.instantiate_N2VC(
2246 logging_text=logging_text,
2247 vca_index=vca_index,
2248 nsi_id=nsi_id,
2249 db_nsr=db_nsr,
2250 db_vnfr=db_vnfr,
2251 vdu_id=vdu_id,
2252 kdu_name=kdu_name,
2253 vdu_index=vdu_index,
2254 deploy_params=deploy_params,
2255 config_descriptor=descriptor_config,
2256 base_folder=base_folder,
2257 nslcmop_id=nslcmop_id,
2258 stage=stage
2259 )
2260 )
2261 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2262 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2263 member_vnf_index or "", vdu_id or "")
2264
2265 # Check if this VNFD has a configured terminate action
2266 def _has_terminate_config_primitive(self, vnfd):
2267 vnf_config = vnfd.get("vnf-configuration")
2268 if vnf_config and vnf_config.get("terminate-config-primitive"):
2269 return True
2270 else:
2271 return False
2272
2273 @staticmethod
2274 def _get_terminate_config_primitive_seq_list(vnfd):
2275 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2276 # No need to check for existing primitive twice, already done before
2277 vnf_config = vnfd.get("vnf-configuration")
2278 seq_list = vnf_config.get("terminate-config-primitive")
2279 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2280 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2281 return seq_list_sorted
2282
2283 @staticmethod
2284 def _create_nslcmop(nsr_id, operation, params):
2285 """
2286 Creates a ns-lcm-opp content to be stored at database.
2287 :param nsr_id: internal id of the instance
2288 :param operation: instantiate, terminate, scale, action, ...
2289 :param params: user parameters for the operation
2290 :return: dictionary following SOL005 format
2291 """
2292 # Raise exception if invalid arguments
2293 if not (nsr_id and operation and params):
2294 raise LcmException(
2295 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2296 now = time()
2297 _id = str(uuid4())
2298 nslcmop = {
2299 "id": _id,
2300 "_id": _id,
2301 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2302 "operationState": "PROCESSING",
2303 "statusEnteredTime": now,
2304 "nsInstanceId": nsr_id,
2305 "lcmOperationType": operation,
2306 "startTime": now,
2307 "isAutomaticInvocation": False,
2308 "operationParams": params,
2309 "isCancelPending": False,
2310 "links": {
2311 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2312 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2313 }
2314 }
2315 return nslcmop
2316
2317 def _format_additional_params(self, params):
2318 params = params or {}
2319 for key, value in params.items():
2320 if str(value).startswith("!!yaml "):
2321 params[key] = yaml.safe_load(value[7:])
2322 return params
2323
2324 def _get_terminate_primitive_params(self, seq, vnf_index):
2325 primitive = seq.get('name')
2326 primitive_params = {}
2327 params = {
2328 "member_vnf_index": vnf_index,
2329 "primitive": primitive,
2330 "primitive_params": primitive_params,
2331 }
2332 desc_params = {}
2333 return self._map_primitive_params(seq, params, desc_params)
2334
2335 # sub-operations
2336
2337 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2338 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2339 if op.get('operationState') == 'COMPLETED':
2340 # b. Skip sub-operation
2341 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2342 return self.SUBOPERATION_STATUS_SKIP
2343 else:
2344 # c. Reintent executing sub-operation
2345 # The sub-operation exists, and operationState != 'COMPLETED'
2346 # Update operationState = 'PROCESSING' to indicate a reintent.
2347 operationState = 'PROCESSING'
2348 detailed_status = 'In progress'
2349 self._update_suboperation_status(
2350 db_nslcmop, op_index, operationState, detailed_status)
2351 # Return the sub-operation index
2352 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2353 # with arguments extracted from the sub-operation
2354 return op_index
2355
2356 # Find a sub-operation where all keys in a matching dictionary must match
2357 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2358 def _find_suboperation(self, db_nslcmop, match):
2359 if (db_nslcmop and match):
2360 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2361 for i, op in enumerate(op_list):
2362 if all(op.get(k) == match[k] for k in match):
2363 return i
2364 return self.SUBOPERATION_STATUS_NOT_FOUND
2365
2366 # Update status for a sub-operation given its index
2367 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2368 # Update DB for HA tasks
2369 q_filter = {'_id': db_nslcmop['_id']}
2370 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2371 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2372 self.db.set_one("nslcmops",
2373 q_filter=q_filter,
2374 update_dict=update_dict,
2375 fail_on_empty=False)
2376
2377 # Add sub-operation, return the index of the added sub-operation
2378 # Optionally, set operationState, detailed-status, and operationType
2379 # Status and type are currently set for 'scale' sub-operations:
2380 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2381 # 'detailed-status' : status message
2382 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2383 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2384 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2385 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2386 RO_nsr_id=None, RO_scaling_info=None):
2387 if not db_nslcmop:
2388 return self.SUBOPERATION_STATUS_NOT_FOUND
2389 # Get the "_admin.operations" list, if it exists
2390 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2391 op_list = db_nslcmop_admin.get('operations')
2392 # Create or append to the "_admin.operations" list
2393 new_op = {'member_vnf_index': vnf_index,
2394 'vdu_id': vdu_id,
2395 'vdu_count_index': vdu_count_index,
2396 'primitive': primitive,
2397 'primitive_params': mapped_primitive_params}
2398 if operationState:
2399 new_op['operationState'] = operationState
2400 if detailed_status:
2401 new_op['detailed-status'] = detailed_status
2402 if operationType:
2403 new_op['lcmOperationType'] = operationType
2404 if RO_nsr_id:
2405 new_op['RO_nsr_id'] = RO_nsr_id
2406 if RO_scaling_info:
2407 new_op['RO_scaling_info'] = RO_scaling_info
2408 if not op_list:
2409 # No existing operations, create key 'operations' with current operation as first list element
2410 db_nslcmop_admin.update({'operations': [new_op]})
2411 op_list = db_nslcmop_admin.get('operations')
2412 else:
2413 # Existing operations, append operation to list
2414 op_list.append(new_op)
2415
2416 db_nslcmop_update = {'_admin.operations': op_list}
2417 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2418 op_index = len(op_list) - 1
2419 return op_index
2420
2421 # Helper methods for scale() sub-operations
2422
2423 # pre-scale/post-scale:
2424 # Check for 3 different cases:
2425 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2426 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2427 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2428 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2429 operationType, RO_nsr_id=None, RO_scaling_info=None):
2430 # Find this sub-operation
2431 if (RO_nsr_id and RO_scaling_info):
2432 operationType = 'SCALE-RO'
2433 match = {
2434 'member_vnf_index': vnf_index,
2435 'RO_nsr_id': RO_nsr_id,
2436 'RO_scaling_info': RO_scaling_info,
2437 }
2438 else:
2439 match = {
2440 'member_vnf_index': vnf_index,
2441 'primitive': vnf_config_primitive,
2442 'primitive_params': primitive_params,
2443 'lcmOperationType': operationType
2444 }
2445 op_index = self._find_suboperation(db_nslcmop, match)
2446 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2447 # a. New sub-operation
2448 # The sub-operation does not exist, add it.
2449 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2450 # The following parameters are set to None for all kind of scaling:
2451 vdu_id = None
2452 vdu_count_index = None
2453 vdu_name = None
2454 if RO_nsr_id and RO_scaling_info:
2455 vnf_config_primitive = None
2456 primitive_params = None
2457 else:
2458 RO_nsr_id = None
2459 RO_scaling_info = None
2460 # Initial status for sub-operation
2461 operationState = 'PROCESSING'
2462 detailed_status = 'In progress'
2463 # Add sub-operation for pre/post-scaling (zero or more operations)
2464 self._add_suboperation(db_nslcmop,
2465 vnf_index,
2466 vdu_id,
2467 vdu_count_index,
2468 vdu_name,
2469 vnf_config_primitive,
2470 primitive_params,
2471 operationState,
2472 detailed_status,
2473 operationType,
2474 RO_nsr_id,
2475 RO_scaling_info)
2476 return self.SUBOPERATION_STATUS_NEW
2477 else:
2478 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2479 # or op_index (operationState != 'COMPLETED')
2480 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2481
2482 # Function to return execution_environment id
2483
2484 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2485 # TODO vdu_index_count
2486 for vca in vca_deployed_list:
2487 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2488 return vca["ee_id"]
2489
2490 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
2491 """
2492 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2493 :param logging_text:
2494 :param db_nslcmop:
2495 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2496 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2497 :param vca_index: index in the database _admin.deployed.VCA
2498 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2499 :return: None or exception
2500 """
2501 # execute terminate_primitives
2502 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2503 vdu_id = vca_deployed.get("vdu_id")
2504 vdu_count_index = vca_deployed.get("vdu_count_index")
2505 vdu_name = vca_deployed.get("vdu_name")
2506 vnf_index = vca_deployed.get("member-vnf-index")
2507 if terminate_primitives and vca_deployed.get("needed_terminate"):
2508 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2509 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2510 for seq in terminate_primitives:
2511 # For each sequence in list, get primitive and call _ns_execute_primitive()
2512 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2513 vnf_index, seq.get("name"))
2514 self.logger.debug(logging_text + step)
2515 # Create the primitive for each sequence, i.e. "primitive": "touch"
2516 primitive = seq.get('name')
2517 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2518 # The following 3 parameters are currently set to None for 'terminate':
2519 # vdu_id, vdu_count_index, vdu_name
2520
2521 # Add sub-operation
2522 self._add_suboperation(db_nslcmop,
2523 vnf_index,
2524 vdu_id,
2525 vdu_count_index,
2526 vdu_name,
2527 primitive,
2528 mapped_primitive_params)
2529 # Sub-operations: Call _ns_execute_primitive() instead of action()
2530 try:
2531 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2532 mapped_primitive_params)
2533 except LcmException:
2534 # this happens when VCA is not deployed. In this case it is not needed to terminate
2535 continue
2536 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2537 if result not in result_ok:
2538 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2539 "error {}".format(seq.get("name"), vnf_index, result_detail))
2540 # set that this VCA do not need terminated
2541 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2542 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2543
2544 if destroy_ee:
2545 await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
2546
2547 async def _delete_all_N2VC(self, db_nsr: dict):
2548 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2549 namespace = "." + db_nsr["_id"]
2550 try:
2551 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2552 except N2VCNotFound: # already deleted. Skip
2553 pass
2554 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2555
2556 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2557 """
2558 Terminates a deployment from RO
2559 :param logging_text:
2560 :param nsr_deployed: db_nsr._admin.deployed
2561 :param nsr_id:
2562 :param nslcmop_id:
2563 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2564 this method will update only the index 2, but it will write on database the concatenated content of the list
2565 :return:
2566 """
2567 db_nsr_update = {}
2568 failed_detail = []
2569 ro_nsr_id = ro_delete_action = None
2570 if nsr_deployed and nsr_deployed.get("RO"):
2571 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2572 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2573 try:
2574 if ro_nsr_id:
2575 stage[2] = "Deleting ns from VIM."
2576 db_nsr_update["detailed-status"] = " ".join(stage)
2577 self._write_op_status(nslcmop_id, stage)
2578 self.logger.debug(logging_text + stage[2])
2579 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2580 self._write_op_status(nslcmop_id, stage)
2581 desc = await self.RO.delete("ns", ro_nsr_id)
2582 ro_delete_action = desc["action_id"]
2583 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2584 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2585 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2586 if ro_delete_action:
2587 # wait until NS is deleted from VIM
2588 stage[2] = "Waiting ns deleted from VIM."
2589 detailed_status_old = None
2590 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2591 ro_delete_action))
2592 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2593 self._write_op_status(nslcmop_id, stage)
2594
2595 delete_timeout = 20 * 60 # 20 minutes
2596 while delete_timeout > 0:
2597 desc = await self.RO.show(
2598 "ns",
2599 item_id_name=ro_nsr_id,
2600 extra_item="action",
2601 extra_item_id=ro_delete_action)
2602
2603 # deploymentStatus
2604 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2605
2606 ns_status, ns_status_info = self.RO.check_action_status(desc)
2607 if ns_status == "ERROR":
2608 raise ROclient.ROClientException(ns_status_info)
2609 elif ns_status == "BUILD":
2610 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2611 elif ns_status == "ACTIVE":
2612 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2613 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2614 break
2615 else:
2616 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2617 if stage[2] != detailed_status_old:
2618 detailed_status_old = stage[2]
2619 db_nsr_update["detailed-status"] = " ".join(stage)
2620 self._write_op_status(nslcmop_id, stage)
2621 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2622 await asyncio.sleep(5, loop=self.loop)
2623 delete_timeout -= 5
2624 else: # delete_timeout <= 0:
2625 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2626
2627 except Exception as e:
2628 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2629 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2630 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2631 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2632 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2633 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2634 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2635 failed_detail.append("delete conflict: {}".format(e))
2636 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2637 else:
2638 failed_detail.append("delete error: {}".format(e))
2639 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2640
2641 # Delete nsd
2642 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2643 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2644 try:
2645 stage[2] = "Deleting nsd from RO."
2646 db_nsr_update["detailed-status"] = " ".join(stage)
2647 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2648 self._write_op_status(nslcmop_id, stage)
2649 await self.RO.delete("nsd", ro_nsd_id)
2650 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2651 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2652 except Exception as e:
2653 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2654 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2655 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2656 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2657 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2658 self.logger.debug(logging_text + failed_detail[-1])
2659 else:
2660 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2661 self.logger.error(logging_text + failed_detail[-1])
2662
2663 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2664 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2665 if not vnf_deployed or not vnf_deployed["id"]:
2666 continue
2667 try:
2668 ro_vnfd_id = vnf_deployed["id"]
2669 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2670 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2671 db_nsr_update["detailed-status"] = " ".join(stage)
2672 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2673 self._write_op_status(nslcmop_id, stage)
2674 await self.RO.delete("vnfd", ro_vnfd_id)
2675 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2676 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2677 except Exception as e:
2678 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2679 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2680 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2681 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2682 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2683 self.logger.debug(logging_text + failed_detail[-1])
2684 else:
2685 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2686 self.logger.error(logging_text + failed_detail[-1])
2687
2688 if failed_detail:
2689 stage[2] = "Error deleting from VIM"
2690 else:
2691 stage[2] = "Deleted from VIM"
2692 db_nsr_update["detailed-status"] = " ".join(stage)
2693 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2694 self._write_op_status(nslcmop_id, stage)
2695
2696 if failed_detail:
2697 raise LcmException("; ".join(failed_detail))
2698
2699 async def terminate(self, nsr_id, nslcmop_id):
2700 # Try to lock HA task here
2701 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2702 if not task_is_locked_by_me:
2703 return
2704
2705 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2706 self.logger.debug(logging_text + "Enter")
2707 timeout_ns_terminate = self.timeout_ns_terminate
2708 db_nsr = None
2709 db_nslcmop = None
2710 exc = None
2711 error_list = [] # annotates all failed error messages
2712 db_nslcmop_update = {}
2713 autoremove = False # autoremove after terminated
2714 tasks_dict_info = {}
2715 db_nsr_update = {}
2716 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2717 # ^ contains [stage, step, VIM-status]
2718 try:
2719 # wait for any previous tasks in process
2720 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2721
2722 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2723 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2724 operation_params = db_nslcmop.get("operationParams") or {}
2725 if operation_params.get("timeout_ns_terminate"):
2726 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2727 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2728 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2729
2730 db_nsr_update["operational-status"] = "terminating"
2731 db_nsr_update["config-status"] = "terminating"
2732 self._write_ns_status(
2733 nsr_id=nsr_id,
2734 ns_state="TERMINATING",
2735 current_operation="TERMINATING",
2736 current_operation_id=nslcmop_id,
2737 other_update=db_nsr_update
2738 )
2739 self._write_op_status(
2740 op_id=nslcmop_id,
2741 queuePosition=0,
2742 stage=stage
2743 )
2744 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
2745 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2746 return
2747
2748 stage[1] = "Getting vnf descriptors from db."
2749 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2750 db_vnfds_from_id = {}
2751 db_vnfds_from_member_index = {}
2752 # Loop over VNFRs
2753 for vnfr in db_vnfrs_list:
2754 vnfd_id = vnfr["vnfd-id"]
2755 if vnfd_id not in db_vnfds_from_id:
2756 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2757 db_vnfds_from_id[vnfd_id] = vnfd
2758 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
2759
2760 # Destroy individual execution environments when there are terminating primitives.
2761 # Rest of EE will be deleted at once
2762 if not operation_params.get("skip_terminate_primitives"):
2763 stage[0] = "Stage 2/3 execute terminating primitives."
2764 stage[1] = "Looking execution environment that needs terminate."
2765 self.logger.debug(logging_text + stage[1])
2766 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
2767 config_descriptor = None
2768 if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
2769 continue
2770 if not vca.get("member-vnf-index"):
2771 # ns
2772 config_descriptor = db_nsr.get("ns-configuration")
2773 elif vca.get("vdu_id"):
2774 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2775 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
2776 if vdud:
2777 config_descriptor = vdud.get("vdu-configuration")
2778 elif vca.get("kdu_name"):
2779 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
2780 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
2781 if kdud:
2782 config_descriptor = kdud.get("kdu-configuration")
2783 else:
2784 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
2785 task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
2786 vca_index, False))
2787 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
2788
2789 # wait for pending tasks of terminate primitives
2790 if tasks_dict_info:
2791 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
2792 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
2793 min(self.timeout_charm_delete, timeout_ns_terminate),
2794 stage, nslcmop_id)
2795 if error_list:
2796 return # raise LcmException("; ".join(error_list))
2797 tasks_dict_info.clear()
2798
2799 # remove All execution environments at once
2800 stage[0] = "Stage 3/3 delete all."
2801
2802 if nsr_deployed.get("VCA"):
2803 stage[1] = "Deleting all execution environments."
2804 self.logger.debug(logging_text + stage[1])
2805 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
2806 timeout=self.timeout_charm_delete))
2807 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2808 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
2809
2810 # Delete from k8scluster
2811 stage[1] = "Deleting KDUs."
2812 self.logger.debug(logging_text + stage[1])
2813 # print(nsr_deployed)
2814 for kdu in get_iterable(nsr_deployed, "K8s"):
2815 if not kdu or not kdu.get("kdu-instance"):
2816 continue
2817 kdu_instance = kdu.get("kdu-instance")
2818 if kdu.get("k8scluster-type") in self.k8scluster_map:
2819 task_delete_kdu_instance = asyncio.ensure_future(
2820 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
2821 cluster_uuid=kdu.get("k8scluster-uuid"),
2822 kdu_instance=kdu_instance))
2823 else:
2824 self.logger.error(logging_text + "Unknown k8s deployment type {}".
2825 format(kdu.get("k8scluster-type")))
2826 continue
2827 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
2828
2829 # remove from RO
2830 stage[1] = "Deleting ns from VIM."
2831 task_delete_ro = asyncio.ensure_future(
2832 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
2833 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
2834
2835 # rest of staff will be done at finally
2836
2837 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2838 self.logger.error(logging_text + "Exit Exception {}".format(e))
2839 exc = e
2840 except asyncio.CancelledError:
2841 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2842 exc = "Operation was cancelled"
2843 except Exception as e:
2844 exc = traceback.format_exc()
2845 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2846 finally:
2847 if exc:
2848 error_list.append(str(exc))
2849 try:
2850 # wait for pending tasks
2851 if tasks_dict_info:
2852 stage[1] = "Waiting for terminate pending tasks."
2853 self.logger.debug(logging_text + stage[1])
2854 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
2855 stage, nslcmop_id)
2856 stage[1] = stage[2] = ""
2857 except asyncio.CancelledError:
2858 error_list.append("Cancelled")
2859 # TODO cancell all tasks
2860 except Exception as exc:
2861 error_list.append(str(exc))
2862 # update status at database
2863 if error_list:
2864 error_detail = "; ".join(error_list)
2865 # self.logger.error(logging_text + error_detail)
2866 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2867 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
2868
2869 db_nsr_update["operational-status"] = "failed"
2870 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2871 db_nslcmop_update["detailed-status"] = error_detail
2872 nslcmop_operation_state = "FAILED"
2873 ns_state = "BROKEN"
2874 else:
2875 error_detail = None
2876 error_description_nsr = error_description_nslcmop = None
2877 ns_state = "NOT_INSTANTIATED"
2878 db_nsr_update["operational-status"] = "terminated"
2879 db_nsr_update["detailed-status"] = "Done"
2880 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2881 db_nslcmop_update["detailed-status"] = "Done"
2882 nslcmop_operation_state = "COMPLETED"
2883
2884 if db_nsr:
2885 self._write_ns_status(
2886 nsr_id=nsr_id,
2887 ns_state=ns_state,
2888 current_operation="IDLE",
2889 current_operation_id=None,
2890 error_description=error_description_nsr,
2891 error_detail=error_detail,
2892 other_update=db_nsr_update
2893 )
2894 if db_nslcmop:
2895 self._write_op_status(
2896 op_id=nslcmop_id,
2897 stage="",
2898 error_message=error_description_nslcmop,
2899 operation_state=nslcmop_operation_state,
2900 other_update=db_nslcmop_update,
2901 )
2902 autoremove = operation_params.get("autoremove", False)
2903 if nslcmop_operation_state:
2904 try:
2905 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2906 "operationState": nslcmop_operation_state,
2907 "autoremove": autoremove},
2908 loop=self.loop)
2909 except Exception as e:
2910 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2911
2912 self.logger.debug(logging_text + "Exit")
2913 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2914
2915 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
2916 time_start = time()
2917 error_detail_list = []
2918 error_list = []
2919 pending_tasks = list(created_tasks_info.keys())
2920 num_tasks = len(pending_tasks)
2921 num_done = 0
2922 stage[1] = "{}/{}.".format(num_done, num_tasks)
2923 self._write_op_status(nslcmop_id, stage)
2924 while pending_tasks:
2925 new_error = None
2926 _timeout = timeout + time_start - time()
2927 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
2928 return_when=asyncio.FIRST_COMPLETED)
2929 num_done += len(done)
2930 if not done: # Timeout
2931 for task in pending_tasks:
2932 new_error = created_tasks_info[task] + ": Timeout"
2933 error_detail_list.append(new_error)
2934 error_list.append(new_error)
2935 break
2936 for task in done:
2937 if task.cancelled():
2938 exc = "Cancelled"
2939 else:
2940 exc = task.exception()
2941 if exc:
2942 if isinstance(exc, asyncio.TimeoutError):
2943 exc = "Timeout"
2944 new_error = created_tasks_info[task] + ": {}".format(exc)
2945 error_list.append(created_tasks_info[task])
2946 error_detail_list.append(new_error)
2947 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
2948 self.logger.error(logging_text + new_error)
2949 else:
2950 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
2951 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
2952 else:
2953 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
2954 stage[1] = "{}/{}.".format(num_done, num_tasks)
2955 if new_error:
2956 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
2957 if nsr_id: # update also nsr
2958 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
2959 "errorDetail": ". ".join(error_detail_list)})
2960 self._write_op_status(nslcmop_id, stage)
2961 return error_detail_list
2962
2963 @staticmethod
2964 def _map_primitive_params(primitive_desc, params, instantiation_params):
2965 """
2966 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2967 The default-value is used. If it is between < > it look for a value at instantiation_params
2968 :param primitive_desc: portion of VNFD/NSD that describes primitive
2969 :param params: Params provided by user
2970 :param instantiation_params: Instantiation params provided by user
2971 :return: a dictionary with the calculated params
2972 """
2973 calculated_params = {}
2974 for parameter in primitive_desc.get("parameter", ()):
2975 param_name = parameter["name"]
2976 if param_name in params:
2977 calculated_params[param_name] = params[param_name]
2978 elif "default-value" in parameter or "value" in parameter:
2979 if "value" in parameter:
2980 calculated_params[param_name] = parameter["value"]
2981 else:
2982 calculated_params[param_name] = parameter["default-value"]
2983 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2984 and calculated_params[param_name].endswith(">"):
2985 if calculated_params[param_name][1:-1] in instantiation_params:
2986 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2987 else:
2988 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2989 format(calculated_params[param_name], primitive_desc["name"]))
2990 else:
2991 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2992 format(param_name, primitive_desc["name"]))
2993
2994 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2995 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2996 width=256)
2997 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2998 calculated_params[param_name] = calculated_params[param_name][7:]
2999
3000 # add always ns_config_info if primitive name is config
3001 if primitive_desc["name"] == "config":
3002 if "ns_config_info" in instantiation_params:
3003 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3004 return calculated_params
3005
3006 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
3007 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3008 for vca in deployed_vca:
3009 if not vca:
3010 continue
3011 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3012 continue
3013 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3014 continue
3015 if kdu_name and kdu_name != vca["kdu_name"]:
3016 continue
3017 break
3018 else:
3019 # vca_deployed not found
3020 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3021 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3022
3023 # get ee_id
3024 ee_id = vca.get("ee_id")
3025 if not ee_id:
3026 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3027 "execution environment"
3028 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3029 return ee_id
3030
3031 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3032 retries_interval=30, timeout=None) -> (str, str):
3033 try:
3034 if primitive == "config":
3035 primitive_params = {"params": primitive_params}
3036
3037 while retries >= 0:
3038 try:
3039 output = await asyncio.wait_for(
3040 self.n2vc.exec_primitive(
3041 ee_id=ee_id,
3042 primitive_name=primitive,
3043 params_dict=primitive_params,
3044 progress_timeout=self.timeout_progress_primitive,
3045 total_timeout=self.timeout_primitive),
3046 timeout=timeout or self.timeout_primitive)
3047 # execution was OK
3048 break
3049 except asyncio.CancelledError:
3050 raise
3051 except Exception as e: # asyncio.TimeoutError
3052 if isinstance(e, asyncio.TimeoutError):
3053 e = "Timeout"
3054 retries -= 1
3055 if retries >= 0:
3056 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3057 # wait and retry
3058 await asyncio.sleep(retries_interval, loop=self.loop)
3059 else:
3060 return 'FAILED', str(e)
3061
3062 return 'COMPLETED', output
3063
3064 except (LcmException, asyncio.CancelledError):
3065 raise
3066 except Exception as e:
3067 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3068
3069 async def action(self, nsr_id, nslcmop_id):
3070
3071 # Try to lock HA task here
3072 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3073 if not task_is_locked_by_me:
3074 return
3075
3076 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3077 self.logger.debug(logging_text + "Enter")
3078 # get all needed from database
3079 db_nsr = None
3080 db_nslcmop = None
3081 db_nsr_update = {}
3082 db_nslcmop_update = {}
3083 nslcmop_operation_state = None
3084 error_description_nslcmop = None
3085 exc = None
3086 try:
3087 # wait for any previous tasks in process
3088 step = "Waiting for previous operations to terminate"
3089 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3090
3091 self._write_ns_status(
3092 nsr_id=nsr_id,
3093 ns_state=None,
3094 current_operation="RUNNING ACTION",
3095 current_operation_id=nslcmop_id
3096 )
3097
3098 step = "Getting information from database"
3099 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3100 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3101
3102 nsr_deployed = db_nsr["_admin"].get("deployed")
3103 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3104 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3105 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3106 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3107 primitive = db_nslcmop["operationParams"]["primitive"]
3108 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3109 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3110
3111 if vnf_index:
3112 step = "Getting vnfr from database"
3113 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3114 step = "Getting vnfd from database"
3115 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3116 else:
3117 step = "Getting nsd from database"
3118 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3119
3120 # for backward compatibility
3121 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3122 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3123 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3124 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3125
3126 # look for primitive
3127 config_primitive_desc = None
3128 if vdu_id:
3129 for vdu in get_iterable(db_vnfd, "vdu"):
3130 if vdu_id == vdu["id"]:
3131 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3132 if config_primitive["name"] == primitive:
3133 config_primitive_desc = config_primitive
3134 break
3135 break
3136 elif kdu_name:
3137 for kdu in get_iterable(db_vnfd, "kdu"):
3138 if kdu_name == kdu["name"]:
3139 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3140 if config_primitive["name"] == primitive:
3141 config_primitive_desc = config_primitive
3142 break
3143 break
3144 elif vnf_index:
3145 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3146 if config_primitive["name"] == primitive:
3147 config_primitive_desc = config_primitive
3148 break
3149 else:
3150 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3151 if config_primitive["name"] == primitive:
3152 config_primitive_desc = config_primitive
3153 break
3154
3155 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3156 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3157 format(primitive))
3158
3159 if vnf_index:
3160 if vdu_id:
3161 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3162 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3163 elif kdu_name:
3164 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3165 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3166 else:
3167 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3168 else:
3169 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3170
3171 # TODO check if ns is in a proper status
3172 if kdu_name and primitive in ("upgrade", "rollback", "status"):
3173 # kdur and desc_params already set from before
3174 if primitive_params:
3175 desc_params.update(primitive_params)
3176 # TODO Check if we will need something at vnf level
3177 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3178 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3179 break
3180 else:
3181 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3182
3183 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3184 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3185 raise LcmException(msg)
3186
3187 db_dict = {"collection": "nsrs",
3188 "filter": {"_id": nsr_id},
3189 "path": "_admin.deployed.K8s.{}".format(index)}
3190 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3191 step = "Executing kdu {}".format(primitive)
3192 if primitive == "upgrade":
3193 if desc_params.get("kdu_model"):
3194 kdu_model = desc_params.get("kdu_model")
3195 del desc_params["kdu_model"]
3196 else:
3197 kdu_model = kdu.get("kdu-model")
3198 parts = kdu_model.split(sep=":")
3199 if len(parts) == 2:
3200 kdu_model = parts[0]
3201
3202 detailed_status = await asyncio.wait_for(
3203 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3204 cluster_uuid=kdu.get("k8scluster-uuid"),
3205 kdu_instance=kdu.get("kdu-instance"),
3206 atomic=True, kdu_model=kdu_model,
3207 params=desc_params, db_dict=db_dict,
3208 timeout=timeout_ns_action),
3209 timeout=timeout_ns_action + 10)
3210 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3211 elif primitive == "rollback":
3212 detailed_status = await asyncio.wait_for(
3213 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3214 cluster_uuid=kdu.get("k8scluster-uuid"),
3215 kdu_instance=kdu.get("kdu-instance"),
3216 db_dict=db_dict),
3217 timeout=timeout_ns_action)
3218 elif primitive == "status":
3219 detailed_status = await asyncio.wait_for(
3220 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3221 cluster_uuid=kdu.get("k8scluster-uuid"),
3222 kdu_instance=kdu.get("kdu-instance")),
3223 timeout=timeout_ns_action)
3224
3225 if detailed_status:
3226 nslcmop_operation_state = 'COMPLETED'
3227 else:
3228 detailed_status = ''
3229 nslcmop_operation_state = 'FAILED'
3230
3231 else:
3232 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3233 self._look_for_deployed_vca(nsr_deployed["VCA"],
3234 member_vnf_index=vnf_index,
3235 vdu_id=vdu_id,
3236 vdu_count_index=vdu_count_index),
3237 primitive=primitive,
3238 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3239 timeout=timeout_ns_action)
3240
3241 db_nslcmop_update["detailed-status"] = detailed_status
3242 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3243 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3244 detailed_status))
3245 return # database update is called inside finally
3246
3247 except (DbException, LcmException, N2VCException, K8sException) as e:
3248 self.logger.error(logging_text + "Exit Exception {}".format(e))
3249 exc = e
3250 except asyncio.CancelledError:
3251 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3252 exc = "Operation was cancelled"
3253 except asyncio.TimeoutError:
3254 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3255 exc = "Timeout"
3256 except Exception as e:
3257 exc = traceback.format_exc()
3258 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3259 finally:
3260 if exc:
3261 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3262 "FAILED {}: {}".format(step, exc)
3263 nslcmop_operation_state = "FAILED"
3264 if db_nsr:
3265 self._write_ns_status(
3266 nsr_id=nsr_id,
3267 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3268 current_operation="IDLE",
3269 current_operation_id=None,
3270 # error_description=error_description_nsr,
3271 # error_detail=error_detail,
3272 other_update=db_nsr_update
3273 )
3274
3275 if db_nslcmop:
3276 self._write_op_status(
3277 op_id=nslcmop_id,
3278 stage="",
3279 error_message=error_description_nslcmop,
3280 operation_state=nslcmop_operation_state,
3281 other_update=db_nslcmop_update,
3282 )
3283
3284 if nslcmop_operation_state:
3285 try:
3286 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3287 "operationState": nslcmop_operation_state},
3288 loop=self.loop)
3289 except Exception as e:
3290 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3291 self.logger.debug(logging_text + "Exit")
3292 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3293 return nslcmop_operation_state, detailed_status
3294
3295 async def scale(self, nsr_id, nslcmop_id):
3296
3297 # Try to lock HA task here
3298 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3299 if not task_is_locked_by_me:
3300 return
3301
3302 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3303 self.logger.debug(logging_text + "Enter")
3304 # get all needed from database
3305 db_nsr = None
3306 db_nslcmop = None
3307 db_nslcmop_update = {}
3308 nslcmop_operation_state = None
3309 db_nsr_update = {}
3310 exc = None
3311 # in case of error, indicates what part of scale was failed to put nsr at error status
3312 scale_process = None
3313 old_operational_status = ""
3314 old_config_status = ""
3315 vnfr_scaled = False
3316 try:
3317 # wait for any previous tasks in process
3318 step = "Waiting for previous operations to terminate"
3319 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3320
3321 self._write_ns_status(
3322 nsr_id=nsr_id,
3323 ns_state=None,
3324 current_operation="SCALING",
3325 current_operation_id=nslcmop_id
3326 )
3327
3328 step = "Getting nslcmop from database"
3329 self.logger.debug(step + " after having waited for previous tasks to be completed")
3330 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3331 step = "Getting nsr from database"
3332 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3333
3334 old_operational_status = db_nsr["operational-status"]
3335 old_config_status = db_nsr["config-status"]
3336 step = "Parsing scaling parameters"
3337 # self.logger.debug(step)
3338 db_nsr_update["operational-status"] = "scaling"
3339 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3340 nsr_deployed = db_nsr["_admin"].get("deployed")
3341
3342 #######
3343 nsr_deployed = db_nsr["_admin"].get("deployed")
3344 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3345 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3346 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3347 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3348 #######
3349
3350 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3351 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3352 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3353 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3354 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3355
3356 # for backward compatibility
3357 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3358 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3359 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3360 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3361
3362 step = "Getting vnfr from database"
3363 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3364 step = "Getting vnfd from database"
3365 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3366
3367 step = "Getting scaling-group-descriptor"
3368 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3369 if scaling_descriptor["name"] == scaling_group:
3370 break
3371 else:
3372 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3373 "at vnfd:scaling-group-descriptor".format(scaling_group))
3374
3375 # cooldown_time = 0
3376 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3377 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3378 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3379 # break
3380
3381 # TODO check if ns is in a proper status
3382 step = "Sending scale order to VIM"
3383 nb_scale_op = 0
3384 if not db_nsr["_admin"].get("scaling-group"):
3385 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3386 admin_scale_index = 0
3387 else:
3388 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3389 if admin_scale_info["name"] == scaling_group:
3390 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3391 break
3392 else: # not found, set index one plus last element and add new entry with the name
3393 admin_scale_index += 1
3394 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3395 RO_scaling_info = []
3396 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3397 if scaling_type == "SCALE_OUT":
3398 # count if max-instance-count is reached
3399 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3400 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3401 if nb_scale_op >= max_instance_count:
3402 raise LcmException("reached the limit of {} (max-instance-count) "
3403 "scaling-out operations for the "
3404 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3405
3406 nb_scale_op += 1
3407 vdu_scaling_info["scaling_direction"] = "OUT"
3408 vdu_scaling_info["vdu-create"] = {}
3409 for vdu_scale_info in scaling_descriptor["vdu"]:
3410 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3411 "type": "create", "count": vdu_scale_info.get("count", 1)})
3412 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3413
3414 elif scaling_type == "SCALE_IN":
3415 # count if min-instance-count is reached
3416 min_instance_count = 0
3417 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3418 min_instance_count = int(scaling_descriptor["min-instance-count"])
3419 if nb_scale_op <= min_instance_count:
3420 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3421 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3422 nb_scale_op -= 1
3423 vdu_scaling_info["scaling_direction"] = "IN"
3424 vdu_scaling_info["vdu-delete"] = {}
3425 for vdu_scale_info in scaling_descriptor["vdu"]:
3426 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3427 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3428 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3429
3430 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3431 vdu_create = vdu_scaling_info.get("vdu-create")
3432 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3433 if vdu_scaling_info["scaling_direction"] == "IN":
3434 for vdur in reversed(db_vnfr["vdur"]):
3435 if vdu_delete.get(vdur["vdu-id-ref"]):
3436 vdu_delete[vdur["vdu-id-ref"]] -= 1
3437 vdu_scaling_info["vdu"].append({
3438 "name": vdur["name"],
3439 "vdu_id": vdur["vdu-id-ref"],
3440 "interface": []
3441 })
3442 for interface in vdur["interfaces"]:
3443 vdu_scaling_info["vdu"][-1]["interface"].append({
3444 "name": interface["name"],
3445 "ip_address": interface["ip-address"],
3446 "mac_address": interface.get("mac-address"),
3447 })
3448 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3449
3450 # PRE-SCALE BEGIN
3451 step = "Executing pre-scale vnf-config-primitive"
3452 if scaling_descriptor.get("scaling-config-action"):
3453 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3454 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3455 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3456 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3457 step = db_nslcmop_update["detailed-status"] = \
3458 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3459
3460 # look for primitive
3461 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3462 if config_primitive["name"] == vnf_config_primitive:
3463 break
3464 else:
3465 raise LcmException(
3466 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3467 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3468 "primitive".format(scaling_group, config_primitive))
3469
3470 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3471 if db_vnfr.get("additionalParamsForVnf"):
3472 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3473
3474 scale_process = "VCA"
3475 db_nsr_update["config-status"] = "configuring pre-scaling"
3476 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3477
3478 # Pre-scale reintent check: Check if this sub-operation has been executed before
3479 op_index = self._check_or_add_scale_suboperation(
3480 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3481 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3482 # Skip sub-operation
3483 result = 'COMPLETED'
3484 result_detail = 'Done'
3485 self.logger.debug(logging_text +
3486 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3487 vnf_config_primitive, result, result_detail))
3488 else:
3489 if (op_index == self.SUBOPERATION_STATUS_NEW):
3490 # New sub-operation: Get index of this sub-operation
3491 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3492 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3493 format(vnf_config_primitive))
3494 else:
3495 # Reintent: Get registered params for this existing sub-operation
3496 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3497 vnf_index = op.get('member_vnf_index')
3498 vnf_config_primitive = op.get('primitive')
3499 primitive_params = op.get('primitive_params')
3500 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3501 format(vnf_config_primitive))
3502 # Execute the primitive, either with new (first-time) or registered (reintent) args
3503 result, result_detail = await self._ns_execute_primitive(
3504 self._look_for_deployed_vca(nsr_deployed["VCA"],
3505 member_vnf_index=vnf_index,
3506 vdu_id=None,
3507 vdu_count_index=None),
3508 vnf_config_primitive, primitive_params)
3509 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3510 vnf_config_primitive, result, result_detail))
3511 # Update operationState = COMPLETED | FAILED
3512 self._update_suboperation_status(
3513 db_nslcmop, op_index, result, result_detail)
3514
3515 if result == "FAILED":
3516 raise LcmException(result_detail)
3517 db_nsr_update["config-status"] = old_config_status
3518 scale_process = None
3519 # PRE-SCALE END
3520
3521 # SCALE RO - BEGIN
3522 # Should this block be skipped if 'RO_nsr_id' == None ?
3523 # if (RO_nsr_id and RO_scaling_info):
3524 if RO_scaling_info:
3525 scale_process = "RO"
3526 # Scale RO reintent check: Check if this sub-operation has been executed before
3527 op_index = self._check_or_add_scale_suboperation(
3528 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3529 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3530 # Skip sub-operation
3531 result = 'COMPLETED'
3532 result_detail = 'Done'
3533 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3534 result, result_detail))
3535 else:
3536 if (op_index == self.SUBOPERATION_STATUS_NEW):
3537 # New sub-operation: Get index of this sub-operation
3538 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3539 self.logger.debug(logging_text + "New sub-operation RO")
3540 else:
3541 # Reintent: Get registered params for this existing sub-operation
3542 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3543 RO_nsr_id = op.get('RO_nsr_id')
3544 RO_scaling_info = op.get('RO_scaling_info')
3545 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3546 vnf_config_primitive))
3547
3548 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3549 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3550 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3551 # wait until ready
3552 RO_nslcmop_id = RO_desc["instance_action_id"]
3553 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3554
3555 RO_task_done = False
3556 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3557 detailed_status_old = None
3558 self.logger.debug(logging_text + step)
3559
3560 deployment_timeout = 1 * 3600 # One hour
3561 while deployment_timeout > 0:
3562 if not RO_task_done:
3563 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3564 extra_item_id=RO_nslcmop_id)
3565
3566 # deploymentStatus
3567 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3568
3569 ns_status, ns_status_info = self.RO.check_action_status(desc)
3570 if ns_status == "ERROR":
3571 raise ROclient.ROClientException(ns_status_info)
3572 elif ns_status == "BUILD":
3573 detailed_status = step + "; {}".format(ns_status_info)
3574 elif ns_status == "ACTIVE":
3575 RO_task_done = True
3576 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3577 self.logger.debug(logging_text + step)
3578 else:
3579 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3580 else:
3581
3582 if ns_status == "ERROR":
3583 raise ROclient.ROClientException(ns_status_info)
3584 elif ns_status == "BUILD":
3585 detailed_status = step + "; {}".format(ns_status_info)
3586 elif ns_status == "ACTIVE":
3587 step = detailed_status = \
3588 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3589 if not vnfr_scaled:
3590 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3591 vnfr_scaled = True
3592 try:
3593 desc = await self.RO.show("ns", RO_nsr_id)
3594
3595 # deploymentStatus
3596 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3597
3598 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3599 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3600 break
3601 except LcmExceptionNoMgmtIP:
3602 pass
3603 else:
3604 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3605 if detailed_status != detailed_status_old:
3606 self._update_suboperation_status(
3607 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3608 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3609 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3610
3611 await asyncio.sleep(5, loop=self.loop)
3612 deployment_timeout -= 5
3613 if deployment_timeout <= 0:
3614 self._update_suboperation_status(
3615 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3616 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3617
3618 # update VDU_SCALING_INFO with the obtained ip_addresses
3619 if vdu_scaling_info["scaling_direction"] == "OUT":
3620 for vdur in reversed(db_vnfr["vdur"]):
3621 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3622 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3623 vdu_scaling_info["vdu"].append({
3624 "name": vdur["name"],
3625 "vdu_id": vdur["vdu-id-ref"],
3626 "interface": []
3627 })
3628 for interface in vdur["interfaces"]:
3629 vdu_scaling_info["vdu"][-1]["interface"].append({
3630 "name": interface["name"],
3631 "ip_address": interface["ip-address"],
3632 "mac_address": interface.get("mac-address"),
3633 })
3634 del vdu_scaling_info["vdu-create"]
3635
3636 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3637 # SCALE RO - END
3638
3639 scale_process = None
3640 if db_nsr_update:
3641 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3642
3643 # POST-SCALE BEGIN
3644 # execute primitive service POST-SCALING
3645 step = "Executing post-scale vnf-config-primitive"
3646 if scaling_descriptor.get("scaling-config-action"):
3647 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3648 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3649 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3650 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3651 step = db_nslcmop_update["detailed-status"] = \
3652 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3653
3654 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3655 if db_vnfr.get("additionalParamsForVnf"):
3656 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3657
3658 # look for primitive
3659 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3660 if config_primitive["name"] == vnf_config_primitive:
3661 break
3662 else:
3663 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3664 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3665 "match any vnf-configuration:config-primitive".format(scaling_group,
3666 config_primitive))
3667 scale_process = "VCA"
3668 db_nsr_update["config-status"] = "configuring post-scaling"
3669 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3670
3671 # Post-scale reintent check: Check if this sub-operation has been executed before
3672 op_index = self._check_or_add_scale_suboperation(
3673 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3674 if op_index == self.SUBOPERATION_STATUS_SKIP:
3675 # Skip sub-operation
3676 result = 'COMPLETED'
3677 result_detail = 'Done'
3678 self.logger.debug(logging_text +
3679 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3680 format(vnf_config_primitive, result, result_detail))
3681 else:
3682 if op_index == self.SUBOPERATION_STATUS_NEW:
3683 # New sub-operation: Get index of this sub-operation
3684 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3685 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3686 format(vnf_config_primitive))
3687 else:
3688 # Reintent: Get registered params for this existing sub-operation
3689 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3690 vnf_index = op.get('member_vnf_index')
3691 vnf_config_primitive = op.get('primitive')
3692 primitive_params = op.get('primitive_params')
3693 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3694 format(vnf_config_primitive))
3695 # Execute the primitive, either with new (first-time) or registered (reintent) args
3696 result, result_detail = await self._ns_execute_primitive(
3697 self._look_for_deployed_vca(nsr_deployed["VCA"],
3698 member_vnf_index=vnf_index,
3699 vdu_id=None,
3700 vdu_count_index=None),
3701 vnf_config_primitive, primitive_params)
3702 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3703 vnf_config_primitive, result, result_detail))
3704 # Update operationState = COMPLETED | FAILED
3705 self._update_suboperation_status(
3706 db_nslcmop, op_index, result, result_detail)
3707
3708 if result == "FAILED":
3709 raise LcmException(result_detail)
3710 db_nsr_update["config-status"] = old_config_status
3711 scale_process = None
3712 # POST-SCALE END
3713
3714 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3715 db_nslcmop_update["statusEnteredTime"] = time()
3716 db_nslcmop_update["detailed-status"] = "done"
3717 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3718 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3719 else old_operational_status
3720 db_nsr_update["config-status"] = old_config_status
3721 return
3722 except (ROclient.ROClientException, DbException, LcmException) as e:
3723 self.logger.error(logging_text + "Exit Exception {}".format(e))
3724 exc = e
3725 except asyncio.CancelledError:
3726 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3727 exc = "Operation was cancelled"
3728 except Exception as e:
3729 exc = traceback.format_exc()
3730 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3731 finally:
3732 self._write_ns_status(
3733 nsr_id=nsr_id,
3734 ns_state=None,
3735 current_operation="IDLE",
3736 current_operation_id=None
3737 )
3738 if exc:
3739 if db_nslcmop:
3740 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3741 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3742 db_nslcmop_update["statusEnteredTime"] = time()
3743 if db_nsr:
3744 db_nsr_update["operational-status"] = old_operational_status
3745 db_nsr_update["config-status"] = old_config_status
3746 db_nsr_update["detailed-status"] = ""
3747 if scale_process:
3748 if "VCA" in scale_process:
3749 db_nsr_update["config-status"] = "failed"
3750 if "RO" in scale_process:
3751 db_nsr_update["operational-status"] = "failed"
3752 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3753 exc)
3754 try:
3755 if db_nslcmop and db_nslcmop_update:
3756 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3757 if db_nsr:
3758 self._write_ns_status(
3759 nsr_id=nsr_id,
3760 ns_state=None,
3761 current_operation="IDLE",
3762 current_operation_id=None,
3763 other_update=db_nsr_update
3764 )
3765
3766 except DbException as e:
3767 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3768 if nslcmop_operation_state:
3769 try:
3770 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3771 "operationState": nslcmop_operation_state},
3772 loop=self.loop)
3773 # if cooldown_time:
3774 # await asyncio.sleep(cooldown_time, loop=self.loop)
3775 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3776 except Exception as e:
3777 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3778 self.logger.debug(logging_text + "Exit")
3779 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")