2e0ca3981f989c4cf04f69ee4c50f5f9d85f313d
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107 if 'enableosupgrade' in self.vca_config:
108 if self.vca_config['enableosupgrade'].lower() == 'false':
109 self.vca_config['enable_os_upgrade'] = False
110 elif self.vca_config['enableosupgrade'].lower() == 'true':
111 self.vca_config['enable_os_upgrade'] = True
112 if 'aptmirror' in self.vca_config:
113 self.vca_config['apt_mirror'] = self.vca_config['aptmirror']
114
115 # create N2VC connector
116 self.n2vc = N2VCJujuConnector(
117 db=self.db,
118 fs=self.fs,
119 log=self.logger,
120 loop=self.loop,
121 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
122 username=self.vca_config.get('user', None),
123 vca_config=self.vca_config,
124 on_update_db=self._on_update_n2vc_db
125 )
126
127 self.k8sclusterhelm = K8sHelmConnector(
128 kubectl_command=self.vca_config.get("kubectlpath"),
129 helm_command=self.vca_config.get("helmpath"),
130 fs=self.fs,
131 log=self.logger,
132 db=self.db,
133 on_update_db=None,
134 )
135
136 self.k8sclusterjuju = K8sJujuConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 juju_command=self.vca_config.get("jujupath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 # create RO client
146 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
147
148 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
149
150 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
151
152 try:
153 # TODO filter RO descriptor fields...
154
155 # write to database
156 db_dict = dict()
157 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
158 db_dict['deploymentStatus'] = ro_descriptor
159 self.update_db_2("nsrs", nsrs_id, db_dict)
160
161 except Exception as e:
162 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
163
164 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
165
166 # remove last dot from path (if exists)
167 if path.endswith('.'):
168 path = path[:-1]
169
170 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
171 # .format(table, filter, path, updated_data))
172
173 try:
174
175 nsr_id = filter.get('_id')
176
177 # read ns record from database
178 nsr = self.db.get_one(table='nsrs', q_filter=filter)
179 current_ns_status = nsr.get('nsState')
180
181 # get vca status for NS
182 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
183
184 # vcaStatus
185 db_dict = dict()
186 db_dict['vcaStatus'] = status_dict
187
188 # update configurationStatus for this VCA
189 try:
190 vca_index = int(path[path.rfind(".")+1:])
191
192 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
193 vca_status = vca_list[vca_index].get('status')
194
195 configuration_status_list = nsr.get('configurationStatus')
196 config_status = configuration_status_list[vca_index].get('status')
197
198 if config_status == 'BROKEN' and vca_status != 'failed':
199 db_dict['configurationStatus'][vca_index] = 'READY'
200 elif config_status != 'BROKEN' and vca_status == 'failed':
201 db_dict['configurationStatus'][vca_index] = 'BROKEN'
202 except Exception as e:
203 # not update configurationStatus
204 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
205
206 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
207 # if nsState = 'DEGRADED' check if all is OK
208 is_degraded = False
209 if current_ns_status in ('READY', 'DEGRADED'):
210 error_description = ''
211 # check machines
212 if status_dict.get('machines'):
213 for machine_id in status_dict.get('machines'):
214 machine = status_dict.get('machines').get(machine_id)
215 # check machine agent-status
216 if machine.get('agent-status'):
217 s = machine.get('agent-status').get('status')
218 if s != 'started':
219 is_degraded = True
220 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
221 # check machine instance status
222 if machine.get('instance-status'):
223 s = machine.get('instance-status').get('status')
224 if s != 'running':
225 is_degraded = True
226 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
227 # check applications
228 if status_dict.get('applications'):
229 for app_id in status_dict.get('applications'):
230 app = status_dict.get('applications').get(app_id)
231 # check application status
232 if app.get('status'):
233 s = app.get('status').get('status')
234 if s != 'active':
235 is_degraded = True
236 error_description += 'application {} status={} ; '.format(app_id, s)
237
238 if error_description:
239 db_dict['errorDescription'] = error_description
240 if current_ns_status == 'READY' and is_degraded:
241 db_dict['nsState'] = 'DEGRADED'
242 if current_ns_status == 'DEGRADED' and not is_degraded:
243 db_dict['nsState'] = 'READY'
244
245 # write to database
246 self.update_db_2("nsrs", nsr_id, db_dict)
247
248 except Exception as e:
249 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
250
251 return
252
253 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
254 """
255 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
256 :param vnfd: input vnfd
257 :param new_id: overrides vnf id if provided
258 :param additionalParams: Instantiation params for VNFs provided
259 :param nsrId: Id of the NSR
260 :return: copy of vnfd
261 """
262 try:
263 vnfd_RO = deepcopy(vnfd)
264 # remove unused by RO configuration, monitoring, scaling and internal keys
265 vnfd_RO.pop("_id", None)
266 vnfd_RO.pop("_admin", None)
267 vnfd_RO.pop("vnf-configuration", None)
268 vnfd_RO.pop("monitoring-param", None)
269 vnfd_RO.pop("scaling-group-descriptor", None)
270 vnfd_RO.pop("kdu", None)
271 vnfd_RO.pop("k8s-cluster", None)
272 if new_id:
273 vnfd_RO["id"] = new_id
274
275 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
276 for vdu in get_iterable(vnfd_RO, "vdu"):
277 cloud_init_file = None
278 if vdu.get("cloud-init-file"):
279 base_folder = vnfd["_admin"]["storage"]
280 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
281 vdu["cloud-init-file"])
282 with self.fs.file_open(cloud_init_file, "r") as ci_file:
283 cloud_init_content = ci_file.read()
284 vdu.pop("cloud-init-file", None)
285 elif vdu.get("cloud-init"):
286 cloud_init_content = vdu["cloud-init"]
287 else:
288 continue
289
290 env = Environment()
291 ast = env.parse(cloud_init_content)
292 mandatory_vars = meta.find_undeclared_variables(ast)
293 if mandatory_vars:
294 for var in mandatory_vars:
295 if not additionalParams or var not in additionalParams.keys():
296 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
297 "file, must be provided in the instantiation parameters inside the "
298 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
299 template = Template(cloud_init_content)
300 cloud_init_content = template.render(additionalParams or {})
301 vdu["cloud-init"] = cloud_init_content
302
303 return vnfd_RO
304 except FsException as e:
305 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
306 format(vnfd["id"], vdu["id"], cloud_init_file, e))
307 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
308 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
309 format(vnfd["id"], vdu["id"], e))
310
311 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
312 """
313 Creates a RO ns descriptor from OSM ns_instantiate params
314 :param ns_params: OSM instantiate params
315 :return: The RO ns descriptor
316 """
317 vim_2_RO = {}
318 wim_2_RO = {}
319 # TODO feature 1417: Check that no instantiation is set over PDU
320 # check if PDU forces a concrete vim-network-id and add it
321 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
322
323 def vim_account_2_RO(vim_account):
324 if vim_account in vim_2_RO:
325 return vim_2_RO[vim_account]
326
327 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
328 if db_vim["_admin"]["operationalState"] != "ENABLED":
329 raise LcmException("VIM={} is not available. operationalState={}".format(
330 vim_account, db_vim["_admin"]["operationalState"]))
331 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
332 vim_2_RO[vim_account] = RO_vim_id
333 return RO_vim_id
334
335 def wim_account_2_RO(wim_account):
336 if isinstance(wim_account, str):
337 if wim_account in wim_2_RO:
338 return wim_2_RO[wim_account]
339
340 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
341 if db_wim["_admin"]["operationalState"] != "ENABLED":
342 raise LcmException("WIM={} is not available. operationalState={}".format(
343 wim_account, db_wim["_admin"]["operationalState"]))
344 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
345 wim_2_RO[wim_account] = RO_wim_id
346 return RO_wim_id
347 else:
348 return wim_account
349
350 def ip_profile_2_RO(ip_profile):
351 RO_ip_profile = deepcopy((ip_profile))
352 if "dns-server" in RO_ip_profile:
353 if isinstance(RO_ip_profile["dns-server"], list):
354 RO_ip_profile["dns-address"] = []
355 for ds in RO_ip_profile.pop("dns-server"):
356 RO_ip_profile["dns-address"].append(ds['address'])
357 else:
358 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
359 if RO_ip_profile.get("ip-version") == "ipv4":
360 RO_ip_profile["ip-version"] = "IPv4"
361 if RO_ip_profile.get("ip-version") == "ipv6":
362 RO_ip_profile["ip-version"] = "IPv6"
363 if "dhcp-params" in RO_ip_profile:
364 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
365 return RO_ip_profile
366
367 if not ns_params:
368 return None
369 RO_ns_params = {
370 # "name": ns_params["nsName"],
371 # "description": ns_params.get("nsDescription"),
372 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
373 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
374 # "scenario": ns_params["nsdId"],
375 }
376
377 n2vc_key_list = n2vc_key_list or []
378 for vnfd_ref, vnfd in vnfd_dict.items():
379 vdu_needed_access = []
380 mgmt_cp = None
381 if vnfd.get("vnf-configuration"):
382 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
383 if ssh_required and vnfd.get("mgmt-interface"):
384 if vnfd["mgmt-interface"].get("vdu-id"):
385 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
386 elif vnfd["mgmt-interface"].get("cp"):
387 mgmt_cp = vnfd["mgmt-interface"]["cp"]
388
389 for vdu in vnfd.get("vdu", ()):
390 if vdu.get("vdu-configuration"):
391 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
392 if ssh_required:
393 vdu_needed_access.append(vdu["id"])
394 elif mgmt_cp:
395 for vdu_interface in vdu.get("interface"):
396 if vdu_interface.get("external-connection-point-ref") and \
397 vdu_interface["external-connection-point-ref"] == mgmt_cp:
398 vdu_needed_access.append(vdu["id"])
399 mgmt_cp = None
400 break
401
402 if vdu_needed_access:
403 for vnf_member in nsd.get("constituent-vnfd"):
404 if vnf_member["vnfd-id-ref"] != vnfd_ref:
405 continue
406 for vdu in vdu_needed_access:
407 populate_dict(RO_ns_params,
408 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
409 n2vc_key_list)
410
411 if ns_params.get("vduImage"):
412 RO_ns_params["vduImage"] = ns_params["vduImage"]
413
414 if ns_params.get("ssh_keys"):
415 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
416 for vnf_params in get_iterable(ns_params, "vnf"):
417 for constituent_vnfd in nsd["constituent-vnfd"]:
418 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
419 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
420 break
421 else:
422 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
423 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
424 if vnf_params.get("vimAccountId"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
426 vim_account_2_RO(vnf_params["vimAccountId"]))
427
428 for vdu_params in get_iterable(vnf_params, "vdu"):
429 # TODO feature 1417: check that this VDU exist and it is not a PDU
430 if vdu_params.get("volume"):
431 for volume_params in vdu_params["volume"]:
432 if volume_params.get("vim-volume-id"):
433 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
434 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
435 volume_params["vim-volume-id"])
436 if vdu_params.get("interface"):
437 for interface_params in vdu_params["interface"]:
438 if interface_params.get("ip-address"):
439 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
440 vdu_params["id"], "interfaces", interface_params["name"],
441 "ip_address"),
442 interface_params["ip-address"])
443 if interface_params.get("mac-address"):
444 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
445 vdu_params["id"], "interfaces", interface_params["name"],
446 "mac_address"),
447 interface_params["mac-address"])
448 if interface_params.get("floating-ip-required"):
449 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
450 vdu_params["id"], "interfaces", interface_params["name"],
451 "floating-ip"),
452 interface_params["floating-ip-required"])
453
454 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
455 if internal_vld_params.get("vim-network-name"):
456 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
457 internal_vld_params["name"], "vim-network-name"),
458 internal_vld_params["vim-network-name"])
459 if internal_vld_params.get("vim-network-id"):
460 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
461 internal_vld_params["name"], "vim-network-id"),
462 internal_vld_params["vim-network-id"])
463 if internal_vld_params.get("ip-profile"):
464 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
465 internal_vld_params["name"], "ip-profile"),
466 ip_profile_2_RO(internal_vld_params["ip-profile"]))
467 if internal_vld_params.get("provider-network"):
468
469 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
470 internal_vld_params["name"], "provider-network"),
471 internal_vld_params["provider-network"].copy())
472
473 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
474 # look for interface
475 iface_found = False
476 for vdu_descriptor in vnf_descriptor["vdu"]:
477 for vdu_interface in vdu_descriptor["interface"]:
478 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
479 if icp_params.get("ip-address"):
480 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
481 vdu_descriptor["id"], "interfaces",
482 vdu_interface["name"], "ip_address"),
483 icp_params["ip-address"])
484
485 if icp_params.get("mac-address"):
486 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
487 vdu_descriptor["id"], "interfaces",
488 vdu_interface["name"], "mac_address"),
489 icp_params["mac-address"])
490 iface_found = True
491 break
492 if iface_found:
493 break
494 else:
495 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
496 "internal-vld:id-ref={} is not present at vnfd:internal-"
497 "connection-point".format(vnf_params["member-vnf-index"],
498 icp_params["id-ref"]))
499
500 for vld_params in get_iterable(ns_params, "vld"):
501 if "ip-profile" in vld_params:
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
503 ip_profile_2_RO(vld_params["ip-profile"]))
504
505 if vld_params.get("provider-network"):
506
507 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
508 vld_params["provider-network"].copy())
509
510 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
511 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
512 wim_account_2_RO(vld_params["wimAccountId"])),
513 if vld_params.get("vim-network-name"):
514 RO_vld_sites = []
515 if isinstance(vld_params["vim-network-name"], dict):
516 for vim_account, vim_net in vld_params["vim-network-name"].items():
517 RO_vld_sites.append({
518 "netmap-use": vim_net,
519 "datacenter": vim_account_2_RO(vim_account)
520 })
521 else: # isinstance str
522 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
523 if RO_vld_sites:
524 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
525
526 if vld_params.get("vim-network-id"):
527 RO_vld_sites = []
528 if isinstance(vld_params["vim-network-id"], dict):
529 for vim_account, vim_net in vld_params["vim-network-id"].items():
530 RO_vld_sites.append({
531 "netmap-use": vim_net,
532 "datacenter": vim_account_2_RO(vim_account)
533 })
534 else: # isinstance str
535 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
536 if RO_vld_sites:
537 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
538 if vld_params.get("ns-net"):
539 if isinstance(vld_params["ns-net"], dict):
540 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
541 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
542 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
543 if "vnfd-connection-point-ref" in vld_params:
544 for cp_params in vld_params["vnfd-connection-point-ref"]:
545 # look for interface
546 for constituent_vnfd in nsd["constituent-vnfd"]:
547 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
548 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
549 break
550 else:
551 raise LcmException(
552 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
553 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
554 match_cp = False
555 for vdu_descriptor in vnf_descriptor["vdu"]:
556 for interface_descriptor in vdu_descriptor["interface"]:
557 if interface_descriptor.get("external-connection-point-ref") == \
558 cp_params["vnfd-connection-point-ref"]:
559 match_cp = True
560 break
561 if match_cp:
562 break
563 else:
564 raise LcmException(
565 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
566 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
567 cp_params["member-vnf-index-ref"],
568 cp_params["vnfd-connection-point-ref"],
569 vnf_descriptor["id"]))
570 if cp_params.get("ip-address"):
571 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
572 vdu_descriptor["id"], "interfaces",
573 interface_descriptor["name"], "ip_address"),
574 cp_params["ip-address"])
575 if cp_params.get("mac-address"):
576 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
577 vdu_descriptor["id"], "interfaces",
578 interface_descriptor["name"], "mac_address"),
579 cp_params["mac-address"])
580 return RO_ns_params
581
582 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
583 # make a copy to do not change
584 vdu_create = copy(vdu_create)
585 vdu_delete = copy(vdu_delete)
586
587 vdurs = db_vnfr.get("vdur")
588 if vdurs is None:
589 vdurs = []
590 vdu_index = len(vdurs)
591 while vdu_index:
592 vdu_index -= 1
593 vdur = vdurs[vdu_index]
594 if vdur.get("pdu-type"):
595 continue
596 vdu_id_ref = vdur["vdu-id-ref"]
597 if vdu_create and vdu_create.get(vdu_id_ref):
598 for index in range(0, vdu_create[vdu_id_ref]):
599 vdur = deepcopy(vdur)
600 vdur["_id"] = str(uuid4())
601 vdur["count-index"] += 1
602 vdurs.insert(vdu_index+1+index, vdur)
603 del vdu_create[vdu_id_ref]
604 if vdu_delete and vdu_delete.get(vdu_id_ref):
605 del vdurs[vdu_index]
606 vdu_delete[vdu_id_ref] -= 1
607 if not vdu_delete[vdu_id_ref]:
608 del vdu_delete[vdu_id_ref]
609 # check all operations are done
610 if vdu_create or vdu_delete:
611 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
612 vdu_create))
613 if vdu_delete:
614 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
615 vdu_delete))
616
617 vnfr_update = {"vdur": vdurs}
618 db_vnfr["vdur"] = vdurs
619 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
620
621 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
622 """
623 Updates database nsr with the RO info for the created vld
624 :param ns_update_nsr: dictionary to be filled with the updated info
625 :param db_nsr: content of db_nsr. This is also modified
626 :param nsr_desc_RO: nsr descriptor from RO
627 :return: Nothing, LcmException is raised on errors
628 """
629
630 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
631 for net_RO in get_iterable(nsr_desc_RO, "nets"):
632 if vld["id"] != net_RO.get("ns_net_osm_id"):
633 continue
634 vld["vim-id"] = net_RO.get("vim_net_id")
635 vld["name"] = net_RO.get("vim_name")
636 vld["status"] = net_RO.get("status")
637 vld["status-detailed"] = net_RO.get("error_msg")
638 ns_update_nsr["vld.{}".format(vld_index)] = vld
639 break
640 else:
641 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
642
643 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
644 """
645 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
646 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
647 :param nsr_desc_RO: nsr descriptor from RO
648 :return: Nothing, LcmException is raised on errors
649 """
650 for vnf_index, db_vnfr in db_vnfrs.items():
651 for vnf_RO in nsr_desc_RO["vnfs"]:
652 if vnf_RO["member_vnf_index"] != vnf_index:
653 continue
654 vnfr_update = {}
655 if vnf_RO.get("ip_address"):
656 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
657 elif not db_vnfr.get("ip-address"):
658 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
659
660 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
661 vdur_RO_count_index = 0
662 if vdur.get("pdu-type"):
663 continue
664 for vdur_RO in get_iterable(vnf_RO, "vms"):
665 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
666 continue
667 if vdur["count-index"] != vdur_RO_count_index:
668 vdur_RO_count_index += 1
669 continue
670 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
671 if vdur_RO.get("ip_address"):
672 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
673 else:
674 vdur["ip-address"] = None
675 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
676 vdur["name"] = vdur_RO.get("vim_name")
677 vdur["status"] = vdur_RO.get("status")
678 vdur["status-detailed"] = vdur_RO.get("error_msg")
679 for ifacer in get_iterable(vdur, "interfaces"):
680 for interface_RO in get_iterable(vdur_RO, "interfaces"):
681 if ifacer["name"] == interface_RO.get("internal_name"):
682 ifacer["ip-address"] = interface_RO.get("ip_address")
683 ifacer["mac-address"] = interface_RO.get("mac_address")
684 break
685 else:
686 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
687 "from VIM info"
688 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
689 vnfr_update["vdur.{}".format(vdu_index)] = vdur
690 break
691 else:
692 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
693 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
694
695 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
696 for net_RO in get_iterable(nsr_desc_RO, "nets"):
697 if vld["id"] != net_RO.get("vnf_net_osm_id"):
698 continue
699 vld["vim-id"] = net_RO.get("vim_net_id")
700 vld["name"] = net_RO.get("vim_name")
701 vld["status"] = net_RO.get("status")
702 vld["status-detailed"] = net_RO.get("error_msg")
703 vnfr_update["vld.{}".format(vld_index)] = vld
704 break
705 else:
706 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
707 vnf_index, vld["id"]))
708
709 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
710 break
711
712 else:
713 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
714
715 def _get_ns_config_info(self, nsr_id):
716 """
717 Generates a mapping between vnf,vdu elements and the N2VC id
718 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
719 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
720 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
721 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
722 """
723 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
724 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
725 mapping = {}
726 ns_config_info = {"osm-config-mapping": mapping}
727 for vca in vca_deployed_list:
728 if not vca["member-vnf-index"]:
729 continue
730 if not vca["vdu_id"]:
731 mapping[vca["member-vnf-index"]] = vca["application"]
732 else:
733 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
734 vca["application"]
735 return ns_config_info
736
737 @staticmethod
738 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
739 """
740 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
741 primitives as verify-ssh-credentials, or config when needed
742 :param desc_primitive_list: information of the descriptor
743 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
744 this element contains a ssh public key
745 :return: The modified list. Can ba an empty list, but always a list
746 """
747 if desc_primitive_list:
748 primitive_list = desc_primitive_list.copy()
749 else:
750 primitive_list = []
751 # look for primitive config, and get the position. None if not present
752 config_position = None
753 for index, primitive in enumerate(primitive_list):
754 if primitive["name"] == "config":
755 config_position = index
756 break
757
758 # for NS, add always a config primitive if not present (bug 874)
759 if not vca_deployed["member-vnf-index"] and config_position is None:
760 primitive_list.insert(0, {"name": "config", "parameter": []})
761 config_position = 0
762 # for VNF/VDU add verify-ssh-credentials after config
763 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
764 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
765 return primitive_list
766
767 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
768 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
769
770 db_nsr_update = {}
771 RO_descriptor_number = 0 # number of descriptors created at RO
772 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
773 start_deploy = time()
774 vdu_flag = False # If any of the VNFDs has VDUs
775 ns_params = db_nslcmop.get("operationParams")
776
777 # deploy RO
778
779 # get vnfds, instantiate at RO
780
781 for c_vnf in nsd.get("constituent-vnfd", ()):
782 member_vnf_index = c_vnf["member-vnf-index"]
783 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
784 if vnfd.get("vdu"):
785 vdu_flag = True
786 vnfd_ref = vnfd["id"]
787 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
788 " RO".format(vnfd_ref, member_vnf_index)
789 # self.logger.debug(logging_text + step)
790 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
791 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
792 RO_descriptor_number += 1
793
794 # look position at deployed.RO.vnfd if not present it will be appended at the end
795 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
796 if vnf_deployed["member-vnf-index"] == member_vnf_index:
797 break
798 else:
799 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
800 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
801
802 # look if present
803 RO_update = {"member-vnf-index": member_vnf_index}
804 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
805 if vnfd_list:
806 RO_update["id"] = vnfd_list[0]["uuid"]
807 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
808 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
809 else:
810 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
811 get("additionalParamsForVnf"), nsr_id)
812 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
813 RO_update["id"] = desc["uuid"]
814 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
815 vnfd_ref, member_vnf_index, desc["uuid"]))
816 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
817 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
819
820 # create nsd at RO
821 nsd_ref = nsd["id"]
822 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
823 # self.logger.debug(logging_text + step)
824
825 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
826 RO_descriptor_number += 1
827 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
828 if nsd_list:
829 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
830 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
831 nsd_ref, RO_nsd_uuid))
832 else:
833 nsd_RO = deepcopy(nsd)
834 nsd_RO["id"] = RO_osm_nsd_id
835 nsd_RO.pop("_id", None)
836 nsd_RO.pop("_admin", None)
837 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
838 member_vnf_index = c_vnf["member-vnf-index"]
839 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
840 for c_vld in nsd_RO.get("vld", ()):
841 for cp in c_vld.get("vnfd-connection-point-ref", ()):
842 member_vnf_index = cp["member-vnf-index-ref"]
843 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
844
845 desc = await self.RO.create("nsd", descriptor=nsd_RO)
846 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
847 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
848 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
849 self.update_db_2("nsrs", nsr_id, db_nsr_update)
850
851 # Crate ns at RO
852 # if present use it unless in error status
853 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
854 if RO_nsr_id:
855 try:
856 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
857 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
858 desc = await self.RO.show("ns", RO_nsr_id)
859
860 except ROclient.ROClientException as e:
861 if e.http_code != HTTPStatus.NOT_FOUND:
862 raise
863 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
864 if RO_nsr_id:
865 ns_status, ns_status_info = self.RO.check_ns_status(desc)
866 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
867 if ns_status == "ERROR":
868 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
869 .format(RO_nsr_id)
870 self.logger.debug(logging_text + step)
871 await self.RO.delete("ns", RO_nsr_id)
872 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
873 if not RO_nsr_id:
874 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
875 # self.logger.debug(logging_text + step)
876
877 # check if VIM is creating and wait look if previous tasks in process
878 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
879 if task_dependency:
880 step = "Waiting for related tasks to be completed: {}".format(task_name)
881 self.logger.debug(logging_text + step)
882 await asyncio.wait(task_dependency, timeout=3600)
883 if ns_params.get("vnf"):
884 for vnf in ns_params["vnf"]:
885 if "vimAccountId" in vnf:
886 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
887 vnf["vimAccountId"])
888 if task_dependency:
889 step = "Waiting for related tasks to be completed: {}".format(task_name)
890 self.logger.debug(logging_text + step)
891 await asyncio.wait(task_dependency, timeout=3600)
892
893 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
894
895 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
896
897 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
898 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
899 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
900 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
901 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
902 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
903 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905
906 # wait until NS is ready
907 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
908 detailed_status_old = None
909 self.logger.debug(logging_text + step)
910
911 old_desc = None
912 while time() <= start_deploy + self.total_deploy_timeout:
913 desc = await self.RO.show("ns", RO_nsr_id)
914
915 # deploymentStatus
916 if desc != old_desc:
917 # desc has changed => update db
918 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
919 old_desc = desc
920
921 ns_status, ns_status_info = self.RO.check_ns_status(desc)
922 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
923 if ns_status == "ERROR":
924 raise ROclient.ROClientException(ns_status_info)
925 elif ns_status == "BUILD":
926 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
927 elif ns_status == "ACTIVE":
928 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
929 try:
930 if vdu_flag:
931 self.ns_update_vnfr(db_vnfrs, desc)
932 break
933 except LcmExceptionNoMgmtIP:
934 pass
935 else:
936 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
937 if detailed_status != detailed_status_old:
938 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
939 self.update_db_2("nsrs", nsr_id, db_nsr_update)
940 await asyncio.sleep(5, loop=self.loop)
941 else: # total_deploy_timeout
942 raise ROclient.ROClientException("Timeout waiting ns to be ready")
943
944 step = "Updating NSR"
945 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
946
947 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
948 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
949 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
950 self.update_db_2("nsrs", nsr_id, db_nsr_update)
951 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
952
953 step = "Deployed at VIM"
954 self.logger.debug(logging_text + step)
955
956 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
957 """
958 Wait for ip addres at RO, and optionally, insert public key in virtual machine
959 :param logging_text: prefix use for logging
960 :param nsr_id:
961 :param vnfr_id:
962 :param vdu_id:
963 :param vdu_index:
964 :param pub_key: public ssh key to inject, None to skip
965 :param user: user to apply the public ssh key
966 :return: IP address
967 """
968
969 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
970 ro_nsr_id = None
971 ip_address = None
972 nb_tries = 0
973 target_vdu_id = None
974 ro_retries = 0
975
976 while True:
977
978 ro_retries += 1
979 if ro_retries >= 360: # 1 hour
980 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
981
982 await asyncio.sleep(10, loop=self.loop)
983 # wait until NS is deployed at RO
984 if not ro_nsr_id:
985 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
986 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
987 if not ro_nsr_id:
988 continue
989
990 # get ip address
991 if not target_vdu_id:
992 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
993
994 if not vdu_id: # for the VNF case
995 ip_address = db_vnfr.get("ip-address")
996 if not ip_address:
997 continue
998 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
999 else: # VDU case
1000 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1001 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1002
1003 if not vdur:
1004 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
1005 vnfr_id, vdu_id, vdu_index
1006 ))
1007
1008 if vdur.get("status") == "ACTIVE":
1009 ip_address = vdur.get("ip-address")
1010 if not ip_address:
1011 continue
1012 target_vdu_id = vdur["vdu-id-ref"]
1013 elif vdur.get("status") == "ERROR":
1014 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1015
1016 if not target_vdu_id:
1017 continue
1018
1019 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
1020
1021 # inject public key into machine
1022 if pub_key and user:
1023 # self.logger.debug(logging_text + "Inserting RO key")
1024 try:
1025 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1026 result_dict = await self.RO.create_action(
1027 item="ns",
1028 item_id_name=ro_nsr_id,
1029 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1030 )
1031 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1032 if not result_dict or not isinstance(result_dict, dict):
1033 raise LcmException("Unknown response from RO when injecting key")
1034 for result in result_dict.values():
1035 if result.get("vim_result") == 200:
1036 break
1037 else:
1038 raise ROclient.ROClientException("error injecting key: {}".format(
1039 result.get("description")))
1040 break
1041 except ROclient.ROClientException as e:
1042 if not nb_tries:
1043 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1044 format(e, 20*10))
1045 nb_tries += 1
1046 if nb_tries >= 20:
1047 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1048 else:
1049 break
1050
1051 return ip_address
1052
1053 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1054 """
1055 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1056 """
1057 my_vca = vca_deployed_list[vca_index]
1058 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1059 # vdu or kdu: no dependencies
1060 return
1061 timeout = 300
1062 while timeout >= 0:
1063 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1064 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1065 configuration_status_list = db_nsr["configurationStatus"]
1066 for index, vca_deployed in enumerate(configuration_status_list):
1067 if index == vca_index:
1068 # myself
1069 continue
1070 if not my_vca.get("member-vnf-index") or \
1071 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1072 internal_status = configuration_status_list[index].get("status")
1073 if internal_status == 'READY':
1074 continue
1075 elif internal_status == 'BROKEN':
1076 raise LcmException("Configuration aborted because dependent charm/s has failed")
1077 else:
1078 break
1079 else:
1080 # no dependencies, return
1081 return
1082 await asyncio.sleep(10)
1083 timeout -= 1
1084
1085 raise LcmException("Configuration aborted because dependent charm/s timeout")
1086
1087 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1088 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1089 nsr_id = db_nsr["_id"]
1090 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1091 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1092 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1093 db_dict = {
1094 'collection': 'nsrs',
1095 'filter': {'_id': nsr_id},
1096 'path': db_update_entry
1097 }
1098 step = ""
1099 try:
1100
1101 element_type = 'NS'
1102 element_under_configuration = nsr_id
1103
1104 vnfr_id = None
1105 if db_vnfr:
1106 vnfr_id = db_vnfr["_id"]
1107
1108 namespace = "{nsi}.{ns}".format(
1109 nsi=nsi_id if nsi_id else "",
1110 ns=nsr_id)
1111
1112 if vnfr_id:
1113 element_type = 'VNF'
1114 element_under_configuration = vnfr_id
1115 namespace += ".{}".format(vnfr_id)
1116 if vdu_id:
1117 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1118 element_type = 'VDU'
1119 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1120
1121 # Get artifact path
1122 artifact_path = "{}/{}/charms/{}".format(
1123 base_folder["folder"],
1124 base_folder["pkg-dir"],
1125 config_descriptor["juju"]["charm"]
1126 )
1127
1128 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1129 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1130 is_proxy_charm = False
1131
1132 # n2vc_redesign STEP 3.1
1133
1134 # find old ee_id if exists
1135 ee_id = vca_deployed.get("ee_id")
1136
1137 # create or register execution environment in VCA
1138 if is_proxy_charm:
1139
1140 await self._write_configuration_status(
1141 nsr_id=nsr_id,
1142 vca_index=vca_index,
1143 status='CREATING',
1144 element_under_configuration=element_under_configuration,
1145 element_type=element_type
1146 )
1147
1148 step = "create execution environment"
1149 self.logger.debug(logging_text + step)
1150 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1151 reuse_ee_id=ee_id,
1152 db_dict=db_dict)
1153
1154 else:
1155 step = "Waiting to VM being up and getting IP address"
1156 self.logger.debug(logging_text + step)
1157 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1158 user=None, pub_key=None)
1159 credentials = {"hostname": rw_mgmt_ip}
1160 # get username
1161 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1162 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1163 # merged. Meanwhile let's get username from initial-config-primitive
1164 if not username and config_descriptor.get("initial-config-primitive"):
1165 for config_primitive in config_descriptor["initial-config-primitive"]:
1166 for param in config_primitive.get("parameter", ()):
1167 if param["name"] == "ssh-username":
1168 username = param["value"]
1169 break
1170 if not username:
1171 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1172 "'config-access.ssh-access.default-user'")
1173 credentials["username"] = username
1174 # n2vc_redesign STEP 3.2
1175
1176 await self._write_configuration_status(
1177 nsr_id=nsr_id,
1178 vca_index=vca_index,
1179 status='REGISTERING',
1180 element_under_configuration=element_under_configuration,
1181 element_type=element_type
1182 )
1183
1184 step = "register execution environment {}".format(credentials)
1185 self.logger.debug(logging_text + step)
1186 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1187 db_dict=db_dict)
1188
1189 # for compatibility with MON/POL modules, the need model and application name at database
1190 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1191 ee_id_parts = ee_id.split('.')
1192 model_name = ee_id_parts[0]
1193 application_name = ee_id_parts[1]
1194 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1195 db_update_entry + "application": application_name,
1196 db_update_entry + "ee_id": ee_id})
1197
1198 # n2vc_redesign STEP 3.3
1199
1200 step = "Install configuration Software"
1201
1202 await self._write_configuration_status(
1203 nsr_id=nsr_id,
1204 vca_index=vca_index,
1205 status='INSTALLING SW',
1206 element_under_configuration=element_under_configuration,
1207 element_type=element_type
1208 )
1209
1210 # TODO check if already done
1211 self.logger.debug(logging_text + step)
1212 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1213
1214 # if SSH access is required, then get execution environment SSH public
1215 if is_proxy_charm: # if native charm we have waited already to VM be UP
1216 pub_key = None
1217 user = None
1218 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1219 # Needed to inject a ssh key
1220 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1221 step = "Install configuration Software, getting public ssh key"
1222 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1223
1224 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1225 else:
1226 step = "Waiting to VM being up and getting IP address"
1227 self.logger.debug(logging_text + step)
1228
1229 # n2vc_redesign STEP 5.1
1230 # wait for RO (ip-address) Insert pub_key into VM
1231 if vnfr_id:
1232 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1233 user=user, pub_key=pub_key)
1234 else:
1235 rw_mgmt_ip = None # This is for a NS configuration
1236
1237 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1238
1239 # store rw_mgmt_ip in deploy params for later replacement
1240 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1241
1242 # n2vc_redesign STEP 6 Execute initial config primitive
1243 step = 'execute initial config primitive'
1244 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1245
1246 # sort initial config primitives by 'seq'
1247 try:
1248 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1249 except Exception as e:
1250 self.logger.error(logging_text + step + ": " + str(e))
1251
1252 # add config if not present for NS charm
1253 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1254 vca_deployed)
1255
1256 # wait for dependent primitives execution (NS -> VNF -> VDU)
1257 if initial_config_primitive_list:
1258 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1259
1260 # stage, in function of element type: vdu, kdu, vnf or ns
1261 my_vca = vca_deployed_list[vca_index]
1262 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1263 # VDU or KDU
1264 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1265 elif my_vca.get("member-vnf-index"):
1266 # VNF
1267 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1268 else:
1269 # NS
1270 stage = 'Stage 5/5: running Day-1 primitives for NS'
1271
1272 await self._write_configuration_status(
1273 nsr_id=nsr_id,
1274 vca_index=vca_index,
1275 status='EXECUTING PRIMITIVE'
1276 )
1277
1278 self._write_op_status(
1279 op_id=nslcmop_id,
1280 stage=stage
1281 )
1282
1283 for initial_config_primitive in initial_config_primitive_list:
1284 # adding information on the vca_deployed if it is a NS execution environment
1285 if not vca_deployed["member-vnf-index"]:
1286 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1287 # TODO check if already done
1288 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1289
1290 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1291 self.logger.debug(logging_text + step)
1292 await self.n2vc.exec_primitive(
1293 ee_id=ee_id,
1294 primitive_name=initial_config_primitive["name"],
1295 params_dict=primitive_params_,
1296 db_dict=db_dict
1297 )
1298
1299 # TODO register in database that primitive is done
1300
1301 step = "instantiated at VCA"
1302 self.logger.debug(logging_text + step)
1303
1304 await self._write_configuration_status(
1305 nsr_id=nsr_id,
1306 vca_index=vca_index,
1307 status='READY'
1308 )
1309
1310 except Exception as e: # TODO not use Exception but N2VC exception
1311 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1312 await self._write_configuration_status(
1313 nsr_id=nsr_id,
1314 vca_index=vca_index,
1315 status='BROKEN'
1316 )
1317 raise Exception("{} {}".format(step, e)) from e
1318 # TODO raise N2VC exception with 'step' extra information
1319
1320 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1321 error_description: str = None):
1322 try:
1323 db_dict = dict()
1324 if ns_state:
1325 db_dict["nsState"] = ns_state
1326 db_dict["currentOperation"] = current_operation
1327 db_dict["currentOperationID"] = current_operation_id
1328 db_dict["errorDescription"] = error_description
1329 self.update_db_2("nsrs", nsr_id, db_dict)
1330 except Exception as e:
1331 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1332
1333 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1334 try:
1335 db_dict = dict()
1336 db_dict['queuePosition'] = queuePosition
1337 db_dict['stage'] = stage
1338 if error_message:
1339 db_dict['errorMessage'] = error_message
1340 self.update_db_2("nslcmops", op_id, db_dict)
1341 except Exception as e:
1342 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1343
1344 def _write_all_config_status(self, nsr_id: str, status: str):
1345 try:
1346 # nsrs record
1347 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1348 # configurationStatus
1349 config_status = db_nsr.get('configurationStatus')
1350 if config_status:
1351 # update status
1352 db_dict = dict()
1353 db_dict['configurationStatus'] = list()
1354 for c in config_status:
1355 c['status'] = status
1356 db_dict['configurationStatus'].append(c)
1357 self.update_db_2("nsrs", nsr_id, db_dict)
1358
1359 except Exception as e:
1360 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1361
1362 async def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str,
1363 element_under_configuration: str = None, element_type: str = None):
1364
1365 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1366 # .format(vca_index, status))
1367
1368 try:
1369 db_path = 'configurationStatus.{}.'.format(vca_index)
1370 db_dict = dict()
1371 db_dict[db_path + 'status'] = status
1372 if element_under_configuration:
1373 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1374 if element_type:
1375 db_dict[db_path + 'elementType'] = element_type
1376 self.update_db_2("nsrs", nsr_id, db_dict)
1377 except Exception as e:
1378 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1379 .format(status, nsr_id, vca_index, e))
1380
1381 async def instantiate(self, nsr_id, nslcmop_id):
1382 """
1383
1384 :param nsr_id: ns instance to deploy
1385 :param nslcmop_id: operation to run
1386 :return:
1387 """
1388
1389 # Try to lock HA task here
1390 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1391 if not task_is_locked_by_me:
1392 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1393 return
1394
1395 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1396 self.logger.debug(logging_text + "Enter")
1397
1398 # get all needed from database
1399
1400 # database nsrs record
1401 db_nsr = None
1402
1403 # database nslcmops record
1404 db_nslcmop = None
1405
1406 # update operation on nsrs
1407 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1408 "_admin.current-operation": nslcmop_id,
1409 "_admin.operation-type": "instantiate"}
1410 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1411
1412 # update operation on nslcmops
1413 db_nslcmop_update = {}
1414
1415 nslcmop_operation_state = None
1416 db_vnfrs = {} # vnf's info indexed by member-index
1417 # n2vc_info = {}
1418 task_instantiation_list = []
1419 task_instantiation_info = {} # from task to info text
1420 exc = None
1421 try:
1422 # wait for any previous tasks in process
1423 step = "Waiting for previous operations to terminate"
1424 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1425
1426 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1427
1428 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1429 self._write_ns_status(
1430 nsr_id=nsr_id,
1431 ns_state="BUILDING",
1432 current_operation="INSTANTIATING",
1433 current_operation_id=nslcmop_id
1434 )
1435
1436 # read from db: operation
1437 step = "Getting nslcmop={} from db".format(nslcmop_id)
1438 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1439
1440 # read from db: ns
1441 step = "Getting nsr={} from db".format(nsr_id)
1442 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1443 # nsd is replicated into ns (no db read)
1444 nsd = db_nsr["nsd"]
1445 # nsr_name = db_nsr["name"] # TODO short-name??
1446
1447 # read from db: vnf's of this ns
1448 step = "Getting vnfrs from db"
1449 self.logger.debug(logging_text + step)
1450 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1451
1452 # read from db: vnfd's for every vnf
1453 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1454 db_vnfds = {} # every vnfd data indexed by vnf id
1455 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1456
1457 self._write_op_status(
1458 op_id=nslcmop_id,
1459 stage='Stage 1/5: preparation of the environment',
1460 queuePosition=0
1461 )
1462
1463 # for each vnf in ns, read vnfd
1464 for vnfr in db_vnfrs_list:
1465 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1466 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1467 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1468 # if we haven't this vnfd, read it from db
1469 if vnfd_id not in db_vnfds:
1470 # read from cb
1471 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1472 self.logger.debug(logging_text + step)
1473 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1474
1475 # store vnfd
1476 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1477 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1478 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1479
1480 # Get or generates the _admin.deployed.VCA list
1481 vca_deployed_list = None
1482 if db_nsr["_admin"].get("deployed"):
1483 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1484 if vca_deployed_list is None:
1485 vca_deployed_list = []
1486 configuration_status_list = []
1487 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1488 db_nsr_update["configurationStatus"] = configuration_status_list
1489 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1490 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1491 elif isinstance(vca_deployed_list, dict):
1492 # maintain backward compatibility. Change a dict to list at database
1493 vca_deployed_list = list(vca_deployed_list.values())
1494 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1495 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1496
1497 db_nsr_update["detailed-status"] = "creating"
1498 db_nsr_update["operational-status"] = "init"
1499
1500 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1501 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1502 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1503
1504 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1505 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1506 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1507
1508 # n2vc_redesign STEP 2 Deploy Network Scenario
1509
1510 self._write_op_status(
1511 op_id=nslcmop_id,
1512 stage='Stage 2/5: deployment of VMs and execution environments'
1513 )
1514
1515 self.logger.debug(logging_text + "Before deploy_kdus")
1516 # Call to deploy_kdus in case exists the "vdu:kdu" param
1517 task_kdu = asyncio.ensure_future(
1518 self.deploy_kdus(
1519 logging_text=logging_text,
1520 nsr_id=nsr_id,
1521 db_nsr=db_nsr,
1522 db_vnfrs=db_vnfrs,
1523 )
1524 )
1525 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1526 task_instantiation_info[task_kdu] = "Deploy KDUs"
1527 task_instantiation_list.append(task_kdu)
1528 # n2vc_redesign STEP 1 Get VCA public ssh-key
1529 # feature 1429. Add n2vc public key to needed VMs
1530 n2vc_key = self.n2vc.get_public_key()
1531 n2vc_key_list = [n2vc_key]
1532 if self.vca_config.get("public_key"):
1533 n2vc_key_list.append(self.vca_config["public_key"])
1534
1535 task_ro = asyncio.ensure_future(
1536 self.instantiate_RO(
1537 logging_text=logging_text,
1538 nsr_id=nsr_id,
1539 nsd=nsd,
1540 db_nsr=db_nsr,
1541 db_nslcmop=db_nslcmop,
1542 db_vnfrs=db_vnfrs,
1543 db_vnfds_ref=db_vnfds_ref,
1544 n2vc_key_list=n2vc_key_list
1545 )
1546 )
1547 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1548 task_instantiation_info[task_ro] = "Deploy at VIM"
1549 task_instantiation_list.append(task_ro)
1550
1551 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1552 step = "Looking for needed vnfd to configure with proxy charm"
1553 self.logger.debug(logging_text + step)
1554
1555 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1556 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1557 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1558 vnfd_id = c_vnf["vnfd-id-ref"]
1559 vnfd = db_vnfds_ref[vnfd_id]
1560 member_vnf_index = str(c_vnf["member-vnf-index"])
1561 db_vnfr = db_vnfrs[member_vnf_index]
1562 base_folder = vnfd["_admin"]["storage"]
1563 vdu_id = None
1564 vdu_index = 0
1565 vdu_name = None
1566 kdu_name = None
1567
1568 # Get additional parameters
1569 deploy_params = {}
1570 if db_vnfr.get("additionalParamsForVnf"):
1571 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1572
1573 descriptor_config = vnfd.get("vnf-configuration")
1574 if descriptor_config and descriptor_config.get("juju"):
1575 self._deploy_n2vc(
1576 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1577 db_nsr=db_nsr,
1578 db_vnfr=db_vnfr,
1579 nslcmop_id=nslcmop_id,
1580 nsr_id=nsr_id,
1581 nsi_id=nsi_id,
1582 vnfd_id=vnfd_id,
1583 vdu_id=vdu_id,
1584 kdu_name=kdu_name,
1585 member_vnf_index=member_vnf_index,
1586 vdu_index=vdu_index,
1587 vdu_name=vdu_name,
1588 deploy_params=deploy_params,
1589 descriptor_config=descriptor_config,
1590 base_folder=base_folder,
1591 task_instantiation_list=task_instantiation_list,
1592 task_instantiation_info=task_instantiation_info
1593 )
1594
1595 # Deploy charms for each VDU that supports one.
1596 for vdud in get_iterable(vnfd, 'vdu'):
1597 vdu_id = vdud["id"]
1598 descriptor_config = vdud.get('vdu-configuration')
1599 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1600 if vdur.get("additionalParams"):
1601 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1602 else:
1603 deploy_params_vdu = deploy_params
1604 if descriptor_config and descriptor_config.get("juju"):
1605 # look for vdu index in the db_vnfr["vdu"] section
1606 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1607 # if vdur["vdu-id-ref"] == vdu_id:
1608 # break
1609 # else:
1610 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1611 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1612 # vdu_name = vdur.get("name")
1613 vdu_name = None
1614 kdu_name = None
1615 for vdu_index in range(int(vdud.get("count", 1))):
1616 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1617 self._deploy_n2vc(
1618 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1619 member_vnf_index, vdu_id, vdu_index),
1620 db_nsr=db_nsr,
1621 db_vnfr=db_vnfr,
1622 nslcmop_id=nslcmop_id,
1623 nsr_id=nsr_id,
1624 nsi_id=nsi_id,
1625 vnfd_id=vnfd_id,
1626 vdu_id=vdu_id,
1627 kdu_name=kdu_name,
1628 member_vnf_index=member_vnf_index,
1629 vdu_index=vdu_index,
1630 vdu_name=vdu_name,
1631 deploy_params=deploy_params_vdu,
1632 descriptor_config=descriptor_config,
1633 base_folder=base_folder,
1634 task_instantiation_list=task_instantiation_list,
1635 task_instantiation_info=task_instantiation_info
1636 )
1637 for kdud in get_iterable(vnfd, 'kdu'):
1638 kdu_name = kdud["name"]
1639 descriptor_config = kdud.get('kdu-configuration')
1640 if descriptor_config and descriptor_config.get("juju"):
1641 vdu_id = None
1642 vdu_index = 0
1643 vdu_name = None
1644 # look for vdu index in the db_vnfr["vdu"] section
1645 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1646 # if vdur["vdu-id-ref"] == vdu_id:
1647 # break
1648 # else:
1649 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1650 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1651 # vdu_name = vdur.get("name")
1652 # vdu_name = None
1653
1654 self._deploy_n2vc(
1655 logging_text=logging_text,
1656 db_nsr=db_nsr,
1657 db_vnfr=db_vnfr,
1658 nslcmop_id=nslcmop_id,
1659 nsr_id=nsr_id,
1660 nsi_id=nsi_id,
1661 vnfd_id=vnfd_id,
1662 vdu_id=vdu_id,
1663 kdu_name=kdu_name,
1664 member_vnf_index=member_vnf_index,
1665 vdu_index=vdu_index,
1666 vdu_name=vdu_name,
1667 deploy_params=deploy_params,
1668 descriptor_config=descriptor_config,
1669 base_folder=base_folder,
1670 task_instantiation_list=task_instantiation_list,
1671 task_instantiation_info=task_instantiation_info
1672 )
1673
1674 # Check if this NS has a charm configuration
1675 descriptor_config = nsd.get("ns-configuration")
1676 if descriptor_config and descriptor_config.get("juju"):
1677 vnfd_id = None
1678 db_vnfr = None
1679 member_vnf_index = None
1680 vdu_id = None
1681 kdu_name = None
1682 vdu_index = 0
1683 vdu_name = None
1684
1685 # Get additional parameters
1686 deploy_params = {}
1687 if db_nsr.get("additionalParamsForNs"):
1688 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1689 base_folder = nsd["_admin"]["storage"]
1690 self._deploy_n2vc(
1691 logging_text=logging_text,
1692 db_nsr=db_nsr,
1693 db_vnfr=db_vnfr,
1694 nslcmop_id=nslcmop_id,
1695 nsr_id=nsr_id,
1696 nsi_id=nsi_id,
1697 vnfd_id=vnfd_id,
1698 vdu_id=vdu_id,
1699 kdu_name=kdu_name,
1700 member_vnf_index=member_vnf_index,
1701 vdu_index=vdu_index,
1702 vdu_name=vdu_name,
1703 deploy_params=deploy_params,
1704 descriptor_config=descriptor_config,
1705 base_folder=base_folder,
1706 task_instantiation_list=task_instantiation_list,
1707 task_instantiation_info=task_instantiation_info
1708 )
1709
1710 # Wait until all tasks of "task_instantiation_list" have been finished
1711
1712 # while time() <= start_deploy + self.total_deploy_timeout:
1713 error_text_list = []
1714 timeout = 3600
1715
1716 # let's begin with all OK
1717 instantiated_ok = True
1718 # let's begin with RO 'running' status (later we can change it)
1719 db_nsr_update["operational-status"] = "running"
1720 # let's begin with VCA 'configured' status (later we can change it)
1721 db_nsr_update["config-status"] = "configured"
1722
1723 if task_instantiation_list:
1724 # wait for all tasks completion
1725 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1726
1727 for task in pending:
1728 instantiated_ok = False
1729 if task == task_ro:
1730 # RO task is pending
1731 db_nsr_update["operational-status"] = "failed"
1732 elif task == task_kdu:
1733 # KDU task is pending
1734 db_nsr_update["operational-status"] = "failed"
1735 else:
1736 # A N2VC task is pending
1737 db_nsr_update["config-status"] = "failed"
1738 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1739 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1740 for task in done:
1741 if task.cancelled():
1742 instantiated_ok = False
1743 if task == task_ro:
1744 # RO task was cancelled
1745 db_nsr_update["operational-status"] = "failed"
1746 elif task == task_kdu:
1747 # KDU task was cancelled
1748 db_nsr_update["operational-status"] = "failed"
1749 else:
1750 # A N2VC was cancelled
1751 db_nsr_update["config-status"] = "failed"
1752 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1753 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1754 else:
1755 exc = task.exception()
1756 if exc:
1757 instantiated_ok = False
1758 if task == task_ro:
1759 # RO task raised an exception
1760 db_nsr_update["operational-status"] = "failed"
1761 elif task == task_kdu:
1762 # KDU task raised an exception
1763 db_nsr_update["operational-status"] = "failed"
1764 else:
1765 # A N2VC task raised an exception
1766 db_nsr_update["config-status"] = "failed"
1767 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1768
1769 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1770 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1771 else:
1772 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1773 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1774 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1775 else:
1776 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1777
1778 if error_text_list:
1779 error_text = "\n".join(error_text_list)
1780 db_nsr_update["detailed-status"] = error_text
1781 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1782 db_nslcmop_update["detailed-status"] = error_text
1783 db_nslcmop_update["statusEnteredTime"] = time()
1784 else:
1785 # all is done
1786 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1787 db_nslcmop_update["statusEnteredTime"] = time()
1788 db_nslcmop_update["detailed-status"] = "done"
1789 db_nsr_update["detailed-status"] = "done"
1790
1791 except (ROclient.ROClientException, DbException, LcmException) as e:
1792 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1793 exc = e
1794 except asyncio.CancelledError:
1795 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1796 exc = "Operation was cancelled"
1797 except Exception as e:
1798 exc = traceback.format_exc()
1799 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1800 exc_info=True)
1801 finally:
1802 if exc:
1803 if db_nsr:
1804 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1805 db_nsr_update["operational-status"] = "failed"
1806 db_nsr_update["config-status"] = "failed"
1807 if db_nslcmop:
1808 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1809 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1810 db_nslcmop_update["statusEnteredTime"] = time()
1811 try:
1812 if db_nsr:
1813 db_nsr_update["_admin.nslcmop"] = None
1814 db_nsr_update["_admin.current-operation"] = None
1815 db_nsr_update["_admin.operation-type"] = None
1816 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1817
1818 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1819 ns_state = None
1820 error_description = None
1821 if instantiated_ok:
1822 ns_state = "READY"
1823 else:
1824 ns_state = "BROKEN"
1825 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1826
1827 self._write_ns_status(
1828 nsr_id=nsr_id,
1829 ns_state=ns_state,
1830 current_operation="IDLE",
1831 current_operation_id=None,
1832 error_description=error_description
1833 )
1834
1835 self._write_op_status(
1836 op_id=nslcmop_id,
1837 error_message=error_description
1838 )
1839
1840 if db_nslcmop_update:
1841 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1842
1843 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1844
1845 except DbException as e:
1846 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1847
1848 if nslcmop_operation_state:
1849 try:
1850 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1851 "operationState": nslcmop_operation_state},
1852 loop=self.loop)
1853 except Exception as e:
1854 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1855
1856 self.logger.debug(logging_text + "Exit")
1857 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1858
1859 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1860 # Launch kdus if present in the descriptor
1861
1862 deployed_ok = True
1863
1864 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1865
1866 def _get_cluster_id(cluster_id, cluster_type):
1867 nonlocal k8scluster_id_2_uuic
1868 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1869 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1870
1871 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1872 if not db_k8scluster:
1873 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1874 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1875 if not k8s_id:
1876 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1877 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1878 return k8s_id
1879
1880 logging_text += "Deploy kdus: "
1881 try:
1882 db_nsr_update = {"_admin.deployed.K8s": []}
1883 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1884
1885 # Look for all vnfds
1886 pending_tasks = {}
1887 index = 0
1888 for vnfr_data in db_vnfrs.values():
1889 for kdur in get_iterable(vnfr_data, "kdur"):
1890 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1891 kdumodel = None
1892 k8sclustertype = None
1893 error_text = None
1894 cluster_uuid = None
1895 if kdur.get("helm-chart"):
1896 kdumodel = kdur["helm-chart"]
1897 k8sclustertype = "chart"
1898 k8sclustertype_full = "helm-chart"
1899 elif kdur.get("juju-bundle"):
1900 kdumodel = kdur["juju-bundle"]
1901 k8sclustertype = "juju"
1902 k8sclustertype_full = "juju-bundle"
1903 else:
1904 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
1905 " running"
1906 try:
1907 if not error_text:
1908 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1909 except LcmException as e:
1910 error_text = str(e)
1911 deployed_ok = False
1912
1913 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1914
1915 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1916 "k8scluster-type": k8sclustertype,
1917 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1918 if error_text:
1919 k8s_instace_info["detailed-status"] = error_text
1920 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1921 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1922 if error_text:
1923 continue
1924
1925 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1926 "{}".format(index)}
1927 if k8sclustertype == "chart":
1928 task = asyncio.ensure_future(
1929 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1930 params=desc_params, db_dict=db_dict, timeout=3600)
1931 )
1932 else:
1933 task = asyncio.ensure_future(
1934 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1935 atomic=True, params=desc_params,
1936 db_dict=db_dict, timeout=600)
1937 )
1938
1939 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1940 index += 1
1941 if not pending_tasks:
1942 return
1943 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1944 pending_list = list(pending_tasks.keys())
1945 while pending_list:
1946 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1947 return_when=asyncio.FIRST_COMPLETED)
1948 if not done_list: # timeout
1949 for task in pending_list:
1950 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1951 deployed_ok = False
1952 break
1953 for task in done_list:
1954 exc = task.exception()
1955 if exc:
1956 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1957 deployed_ok = False
1958 else:
1959 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1960
1961 if not deployed_ok:
1962 raise LcmException('Cannot deploy KDUs')
1963
1964 except Exception as e:
1965 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1966 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1967 finally:
1968 if db_nsr_update:
1969 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1970
1971 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1972 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1973 base_folder, task_instantiation_list, task_instantiation_info):
1974 # launch instantiate_N2VC in a asyncio task and register task object
1975 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1976 # if not found, create one entry and update database
1977
1978 # fill db_nsr._admin.deployed.VCA.<index>
1979 vca_index = -1
1980 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1981 if not vca_deployed:
1982 continue
1983 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1984 vca_deployed.get("vdu_id") == vdu_id and \
1985 vca_deployed.get("kdu_name") == kdu_name and \
1986 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1987 break
1988 else:
1989 # not found, create one.
1990 vca_deployed = {
1991 "member-vnf-index": member_vnf_index,
1992 "vdu_id": vdu_id,
1993 "kdu_name": kdu_name,
1994 "vdu_count_index": vdu_index,
1995 "operational-status": "init", # TODO revise
1996 "detailed-status": "", # TODO revise
1997 "step": "initial-deploy", # TODO revise
1998 "vnfd_id": vnfd_id,
1999 "vdu_name": vdu_name,
2000 }
2001 vca_index += 1
2002
2003 # create VCA and configurationStatus in db
2004 db_dict = {
2005 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2006 "configurationStatus.{}".format(vca_index): dict()
2007 }
2008 self.update_db_2("nsrs", nsr_id, db_dict)
2009
2010 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2011
2012 # Launch task
2013 task_n2vc = asyncio.ensure_future(
2014 self.instantiate_N2VC(
2015 logging_text=logging_text,
2016 vca_index=vca_index,
2017 nsi_id=nsi_id,
2018 db_nsr=db_nsr,
2019 db_vnfr=db_vnfr,
2020 vdu_id=vdu_id,
2021 kdu_name=kdu_name,
2022 vdu_index=vdu_index,
2023 deploy_params=deploy_params,
2024 config_descriptor=descriptor_config,
2025 base_folder=base_folder,
2026 nslcmop_id=nslcmop_id
2027 )
2028 )
2029 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2030 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2031 task_instantiation_list.append(task_n2vc)
2032
2033 # Check if this VNFD has a configured terminate action
2034 def _has_terminate_config_primitive(self, vnfd):
2035 vnf_config = vnfd.get("vnf-configuration")
2036 if vnf_config and vnf_config.get("terminate-config-primitive"):
2037 return True
2038 else:
2039 return False
2040
2041 @staticmethod
2042 def _get_terminate_config_primitive_seq_list(vnfd):
2043 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2044 # No need to check for existing primitive twice, already done before
2045 vnf_config = vnfd.get("vnf-configuration")
2046 seq_list = vnf_config.get("terminate-config-primitive")
2047 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2048 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2049 return seq_list_sorted
2050
2051 @staticmethod
2052 def _create_nslcmop(nsr_id, operation, params):
2053 """
2054 Creates a ns-lcm-opp content to be stored at database.
2055 :param nsr_id: internal id of the instance
2056 :param operation: instantiate, terminate, scale, action, ...
2057 :param params: user parameters for the operation
2058 :return: dictionary following SOL005 format
2059 """
2060 # Raise exception if invalid arguments
2061 if not (nsr_id and operation and params):
2062 raise LcmException(
2063 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2064 now = time()
2065 _id = str(uuid4())
2066 nslcmop = {
2067 "id": _id,
2068 "_id": _id,
2069 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2070 "operationState": "PROCESSING",
2071 "statusEnteredTime": now,
2072 "nsInstanceId": nsr_id,
2073 "lcmOperationType": operation,
2074 "startTime": now,
2075 "isAutomaticInvocation": False,
2076 "operationParams": params,
2077 "isCancelPending": False,
2078 "links": {
2079 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2080 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2081 }
2082 }
2083 return nslcmop
2084
2085 def _format_additional_params(self, params):
2086 params = params or {}
2087 for key, value in params.items():
2088 if str(value).startswith("!!yaml "):
2089 params[key] = yaml.safe_load(value[7:])
2090 return params
2091
2092 def _get_terminate_primitive_params(self, seq, vnf_index):
2093 primitive = seq.get('name')
2094 primitive_params = {}
2095 params = {
2096 "member_vnf_index": vnf_index,
2097 "primitive": primitive,
2098 "primitive_params": primitive_params,
2099 }
2100 desc_params = {}
2101 return self._map_primitive_params(seq, params, desc_params)
2102
2103 # sub-operations
2104
2105 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2106 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2107 if (op.get('operationState') == 'COMPLETED'):
2108 # b. Skip sub-operation
2109 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2110 return self.SUBOPERATION_STATUS_SKIP
2111 else:
2112 # c. Reintent executing sub-operation
2113 # The sub-operation exists, and operationState != 'COMPLETED'
2114 # Update operationState = 'PROCESSING' to indicate a reintent.
2115 operationState = 'PROCESSING'
2116 detailed_status = 'In progress'
2117 self._update_suboperation_status(
2118 db_nslcmop, op_index, operationState, detailed_status)
2119 # Return the sub-operation index
2120 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2121 # with arguments extracted from the sub-operation
2122 return op_index
2123
2124 # Find a sub-operation where all keys in a matching dictionary must match
2125 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2126 def _find_suboperation(self, db_nslcmop, match):
2127 if (db_nslcmop and match):
2128 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2129 for i, op in enumerate(op_list):
2130 if all(op.get(k) == match[k] for k in match):
2131 return i
2132 return self.SUBOPERATION_STATUS_NOT_FOUND
2133
2134 # Update status for a sub-operation given its index
2135 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2136 # Update DB for HA tasks
2137 q_filter = {'_id': db_nslcmop['_id']}
2138 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2139 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2140 self.db.set_one("nslcmops",
2141 q_filter=q_filter,
2142 update_dict=update_dict,
2143 fail_on_empty=False)
2144
2145 # Add sub-operation, return the index of the added sub-operation
2146 # Optionally, set operationState, detailed-status, and operationType
2147 # Status and type are currently set for 'scale' sub-operations:
2148 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2149 # 'detailed-status' : status message
2150 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2151 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2152 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2153 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2154 RO_nsr_id=None, RO_scaling_info=None):
2155 if not (db_nslcmop):
2156 return self.SUBOPERATION_STATUS_NOT_FOUND
2157 # Get the "_admin.operations" list, if it exists
2158 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2159 op_list = db_nslcmop_admin.get('operations')
2160 # Create or append to the "_admin.operations" list
2161 new_op = {'member_vnf_index': vnf_index,
2162 'vdu_id': vdu_id,
2163 'vdu_count_index': vdu_count_index,
2164 'primitive': primitive,
2165 'primitive_params': mapped_primitive_params}
2166 if operationState:
2167 new_op['operationState'] = operationState
2168 if detailed_status:
2169 new_op['detailed-status'] = detailed_status
2170 if operationType:
2171 new_op['lcmOperationType'] = operationType
2172 if RO_nsr_id:
2173 new_op['RO_nsr_id'] = RO_nsr_id
2174 if RO_scaling_info:
2175 new_op['RO_scaling_info'] = RO_scaling_info
2176 if not op_list:
2177 # No existing operations, create key 'operations' with current operation as first list element
2178 db_nslcmop_admin.update({'operations': [new_op]})
2179 op_list = db_nslcmop_admin.get('operations')
2180 else:
2181 # Existing operations, append operation to list
2182 op_list.append(new_op)
2183
2184 db_nslcmop_update = {'_admin.operations': op_list}
2185 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2186 op_index = len(op_list) - 1
2187 return op_index
2188
2189 # Helper methods for scale() sub-operations
2190
2191 # pre-scale/post-scale:
2192 # Check for 3 different cases:
2193 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2194 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2195 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2196 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2197 operationType, RO_nsr_id=None, RO_scaling_info=None):
2198 # Find this sub-operation
2199 if (RO_nsr_id and RO_scaling_info):
2200 operationType = 'SCALE-RO'
2201 match = {
2202 'member_vnf_index': vnf_index,
2203 'RO_nsr_id': RO_nsr_id,
2204 'RO_scaling_info': RO_scaling_info,
2205 }
2206 else:
2207 match = {
2208 'member_vnf_index': vnf_index,
2209 'primitive': vnf_config_primitive,
2210 'primitive_params': primitive_params,
2211 'lcmOperationType': operationType
2212 }
2213 op_index = self._find_suboperation(db_nslcmop, match)
2214 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2215 # a. New sub-operation
2216 # The sub-operation does not exist, add it.
2217 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2218 # The following parameters are set to None for all kind of scaling:
2219 vdu_id = None
2220 vdu_count_index = None
2221 vdu_name = None
2222 if (RO_nsr_id and RO_scaling_info):
2223 vnf_config_primitive = None
2224 primitive_params = None
2225 else:
2226 RO_nsr_id = None
2227 RO_scaling_info = None
2228 # Initial status for sub-operation
2229 operationState = 'PROCESSING'
2230 detailed_status = 'In progress'
2231 # Add sub-operation for pre/post-scaling (zero or more operations)
2232 self._add_suboperation(db_nslcmop,
2233 vnf_index,
2234 vdu_id,
2235 vdu_count_index,
2236 vdu_name,
2237 vnf_config_primitive,
2238 primitive_params,
2239 operationState,
2240 detailed_status,
2241 operationType,
2242 RO_nsr_id,
2243 RO_scaling_info)
2244 return self.SUBOPERATION_STATUS_NEW
2245 else:
2246 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2247 # or op_index (operationState != 'COMPLETED')
2248 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2249
2250 # Function to return execution_environment id
2251
2252 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2253 for vca in vca_deployed_list:
2254 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2255 return vca["ee_id"]
2256
2257 # Helper methods for terminate()
2258
2259 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2260 """ Create a primitive with params from VNFD
2261 Called from terminate() before deleting instance
2262 Calls action() to execute the primitive """
2263 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2264 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2265 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2266 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2267 db_vnfds = {}
2268 # Loop over VNFRs
2269 for vnfr in db_vnfrs_list:
2270 vnfd_id = vnfr["vnfd-id"]
2271 vnf_index = vnfr["member-vnf-index-ref"]
2272 if vnfd_id not in db_vnfds:
2273 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2274 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2275 db_vnfds[vnfd_id] = vnfd
2276 vnfd = db_vnfds[vnfd_id]
2277 if not self._has_terminate_config_primitive(vnfd):
2278 continue
2279 # Get the primitive's sorted sequence list
2280 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2281 for seq in seq_list:
2282 # For each sequence in list, get primitive and call _ns_execute_primitive()
2283 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2284 vnf_index, seq.get("name"))
2285 self.logger.debug(logging_text + step)
2286 # Create the primitive for each sequence, i.e. "primitive": "touch"
2287 primitive = seq.get('name')
2288 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2289 # The following 3 parameters are currently set to None for 'terminate':
2290 # vdu_id, vdu_count_index, vdu_name
2291 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2292 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2293 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2294 # Add sub-operation
2295 self._add_suboperation(db_nslcmop,
2296 nslcmop_id,
2297 vnf_index,
2298 vdu_id,
2299 vdu_count_index,
2300 vdu_name,
2301 primitive,
2302 mapped_primitive_params)
2303 # Sub-operations: Call _ns_execute_primitive() instead of action()
2304 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2305 # nsr_deployed = db_nsr["_admin"]["deployed"]
2306
2307 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2308 # nsr_id, nslcmop_terminate_action_id)
2309 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2310 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2311 # if result not in result_ok:
2312 # raise LcmException(
2313 # "terminate_primitive_action for vnf_member_index={}",
2314 # " primitive={} fails with error {}".format(
2315 # vnf_index, seq.get("name"), result_detail))
2316
2317 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2318 try:
2319 await self.n2vc.exec_primitive(
2320 ee_id=ee_id,
2321 primitive_name=primitive,
2322 params_dict=mapped_primitive_params
2323 )
2324 except Exception as e:
2325 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2326 raise LcmException(
2327 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2328 .format(vnf_index, seq.get("name"), e),
2329 )
2330
2331 async def _delete_N2VC(self, nsr_id: str):
2332 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2333 namespace = "." + nsr_id
2334 await self.n2vc.delete_namespace(namespace=namespace)
2335 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2336
2337 async def terminate(self, nsr_id, nslcmop_id):
2338
2339 # Try to lock HA task here
2340 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2341 if not task_is_locked_by_me:
2342 return
2343
2344 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2345 self.logger.debug(logging_text + "Enter")
2346 db_nsr = None
2347 db_nslcmop = None
2348 exc = None
2349 failed_detail = [] # annotates all failed error messages
2350 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2351 "_admin.current-operation": nslcmop_id,
2352 "_admin.operation-type": "terminate"}
2353 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2354 db_nslcmop_update = {}
2355 nslcmop_operation_state = None
2356 autoremove = False # autoremove after terminated
2357 pending_tasks = []
2358 try:
2359 # wait for any previous tasks in process
2360 step = "Waiting for previous operations to terminate"
2361 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2362
2363 self._write_ns_status(
2364 nsr_id=nsr_id,
2365 ns_state="TERMINATING",
2366 current_operation="TERMINATING",
2367 current_operation_id=nslcmop_id
2368 )
2369 self._write_op_status(
2370 op_id=nslcmop_id,
2371 queuePosition=0
2372 )
2373
2374 step = "Getting nslcmop={} from db".format(nslcmop_id)
2375 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2376 step = "Getting nsr={} from db".format(nsr_id)
2377 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2378 # nsd = db_nsr["nsd"]
2379 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2380 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2381 return
2382 # #TODO check if VIM is creating and wait
2383 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2384 # Call internal terminate action
2385 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2386
2387 pending_tasks = []
2388
2389 db_nsr_update["operational-status"] = "terminating"
2390 db_nsr_update["config-status"] = "terminating"
2391
2392 # remove NS
2393 try:
2394 step = "delete execution environment"
2395 self.logger.debug(logging_text + step)
2396
2397 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2398 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2399
2400 pending_tasks.append(task_delete_ee)
2401 except Exception as e:
2402 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2403 self.logger.error(msg)
2404 failed_detail.append(msg)
2405
2406 try:
2407 # Delete from k8scluster
2408 step = "delete kdus"
2409 self.logger.debug(logging_text + step)
2410 # print(nsr_deployed)
2411 if nsr_deployed:
2412 for kdu in nsr_deployed.get("K8s", ()):
2413 kdu_instance = kdu.get("kdu-instance")
2414 if not kdu_instance:
2415 continue
2416 if kdu.get("k8scluster-type") == "chart":
2417 task_delete_kdu_instance = asyncio.ensure_future(
2418 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2419 kdu_instance=kdu_instance))
2420 elif kdu.get("k8scluster-type") == "juju":
2421 task_delete_kdu_instance = asyncio.ensure_future(
2422 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2423 kdu_instance=kdu_instance))
2424 else:
2425 self.error(logging_text + "Unknown k8s deployment type {}".
2426 format(kdu.get("k8scluster-type")))
2427 continue
2428 pending_tasks.append(task_delete_kdu_instance)
2429 except LcmException as e:
2430 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2431 self.logger.error(msg)
2432 failed_detail.append(msg)
2433
2434 # remove from RO
2435 RO_fail = False
2436
2437 # Delete ns
2438 RO_nsr_id = RO_delete_action = None
2439 if nsr_deployed and nsr_deployed.get("RO"):
2440 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2441 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2442 try:
2443 if RO_nsr_id:
2444 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2445 "Deleting ns from VIM"
2446 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2447 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2448 self.logger.debug(logging_text + step)
2449 desc = await self.RO.delete("ns", RO_nsr_id)
2450 RO_delete_action = desc["action_id"]
2451 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2452 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2453 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2454 if RO_delete_action:
2455 # wait until NS is deleted from VIM
2456 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2457 format(RO_nsr_id, RO_delete_action)
2458 detailed_status_old = None
2459 self.logger.debug(logging_text + step)
2460
2461 delete_timeout = 20 * 60 # 20 minutes
2462 while delete_timeout > 0:
2463 desc = await self.RO.show(
2464 "ns",
2465 item_id_name=RO_nsr_id,
2466 extra_item="action",
2467 extra_item_id=RO_delete_action)
2468
2469 # deploymentStatus
2470 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2471
2472 ns_status, ns_status_info = self.RO.check_action_status(desc)
2473 if ns_status == "ERROR":
2474 raise ROclient.ROClientException(ns_status_info)
2475 elif ns_status == "BUILD":
2476 detailed_status = step + "; {}".format(ns_status_info)
2477 elif ns_status == "ACTIVE":
2478 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2479 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2480 break
2481 else:
2482 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2483 if detailed_status != detailed_status_old:
2484 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2485 db_nsr_update["detailed-status"] = detailed_status
2486 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2487 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2488 await asyncio.sleep(5, loop=self.loop)
2489 delete_timeout -= 5
2490 else: # delete_timeout <= 0:
2491 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2492
2493 except ROclient.ROClientException as e:
2494 if e.http_code == 404: # not found
2495 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2496 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2497 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2498 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2499 elif e.http_code == 409: # conflict
2500 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2501 self.logger.debug(logging_text + failed_detail[-1])
2502 RO_fail = True
2503 else:
2504 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2505 self.logger.error(logging_text + failed_detail[-1])
2506 RO_fail = True
2507
2508 # Delete nsd
2509 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2510 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2511 try:
2512 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2513 "Deleting nsd from RO"
2514 await self.RO.delete("nsd", RO_nsd_id)
2515 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2516 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2517 except ROclient.ROClientException as e:
2518 if e.http_code == 404: # not found
2519 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2520 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2521 elif e.http_code == 409: # conflict
2522 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2523 self.logger.debug(logging_text + failed_detail[-1])
2524 RO_fail = True
2525 else:
2526 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2527 self.logger.error(logging_text + failed_detail[-1])
2528 RO_fail = True
2529
2530 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2531 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2532 if not vnf_deployed or not vnf_deployed["id"]:
2533 continue
2534 try:
2535 RO_vnfd_id = vnf_deployed["id"]
2536 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2537 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2538 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2539 await self.RO.delete("vnfd", RO_vnfd_id)
2540 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2541 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2542 except ROclient.ROClientException as e:
2543 if e.http_code == 404: # not found
2544 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2545 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2546 elif e.http_code == 409: # conflict
2547 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2548 self.logger.debug(logging_text + failed_detail[-1])
2549 else:
2550 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2551 self.logger.error(logging_text + failed_detail[-1])
2552
2553 if failed_detail:
2554 terminate_ok = False
2555 self.logger.error(logging_text + " ;".join(failed_detail))
2556 db_nsr_update["operational-status"] = "failed"
2557 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2558 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2559 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2560 db_nslcmop_update["statusEnteredTime"] = time()
2561 else:
2562 terminate_ok = True
2563 db_nsr_update["operational-status"] = "terminated"
2564 db_nsr_update["detailed-status"] = "Done"
2565 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2566 db_nslcmop_update["detailed-status"] = "Done"
2567 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2568 db_nslcmop_update["statusEnteredTime"] = time()
2569 if db_nslcmop["operationParams"].get("autoremove"):
2570 autoremove = True
2571
2572 except (ROclient.ROClientException, DbException, LcmException) as e:
2573 self.logger.error(logging_text + "Exit Exception {}".format(e))
2574 exc = e
2575 except asyncio.CancelledError:
2576 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2577 exc = "Operation was cancelled"
2578 except Exception as e:
2579 exc = traceback.format_exc()
2580 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2581 finally:
2582 if exc and db_nslcmop:
2583 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2584 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2585 db_nslcmop_update["statusEnteredTime"] = time()
2586 try:
2587 if db_nslcmop and db_nslcmop_update:
2588 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2589 if db_nsr:
2590 db_nsr_update["_admin.nslcmop"] = None
2591 db_nsr_update["_admin.current-operation"] = None
2592 db_nsr_update["_admin.operation-type"] = None
2593 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2594
2595 if terminate_ok:
2596 ns_state = "IDLE"
2597 error_description = None
2598 error_detail = None
2599 else:
2600 ns_state = "BROKEN"
2601 error_detail = "; ".join(failed_detail)
2602 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2603 .format(nslcmop_id, step, error_detail)
2604
2605 self._write_ns_status(
2606 nsr_id=nsr_id,
2607 ns_state=ns_state,
2608 current_operation="IDLE",
2609 current_operation_id=None,
2610 error_description=error_description
2611 )
2612
2613 self._write_op_status(
2614 op_id=nslcmop_id,
2615 error_message=error_description
2616 )
2617
2618 except DbException as e:
2619 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2620 if nslcmop_operation_state:
2621 try:
2622 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2623 "operationState": nslcmop_operation_state,
2624 "autoremove": autoremove},
2625 loop=self.loop)
2626 except Exception as e:
2627 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2628
2629 # wait for pending tasks
2630 done = None
2631 pending = None
2632 if pending_tasks:
2633 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2634 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2635 if not pending:
2636 self.logger.debug(logging_text + 'All tasks finished...')
2637 else:
2638 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2639
2640 self.logger.debug(logging_text + "Exit")
2641 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2642
2643 @staticmethod
2644 def _map_primitive_params(primitive_desc, params, instantiation_params):
2645 """
2646 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2647 The default-value is used. If it is between < > it look for a value at instantiation_params
2648 :param primitive_desc: portion of VNFD/NSD that describes primitive
2649 :param params: Params provided by user
2650 :param instantiation_params: Instantiation params provided by user
2651 :return: a dictionary with the calculated params
2652 """
2653 calculated_params = {}
2654 for parameter in primitive_desc.get("parameter", ()):
2655 param_name = parameter["name"]
2656 if param_name in params:
2657 calculated_params[param_name] = params[param_name]
2658 elif "default-value" in parameter or "value" in parameter:
2659 if "value" in parameter:
2660 calculated_params[param_name] = parameter["value"]
2661 else:
2662 calculated_params[param_name] = parameter["default-value"]
2663 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2664 and calculated_params[param_name].endswith(">"):
2665 if calculated_params[param_name][1:-1] in instantiation_params:
2666 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2667 else:
2668 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2669 format(calculated_params[param_name], primitive_desc["name"]))
2670 else:
2671 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2672 format(param_name, primitive_desc["name"]))
2673
2674 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2675 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2676 width=256)
2677 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2678 calculated_params[param_name] = calculated_params[param_name][7:]
2679
2680 # add always ns_config_info if primitive name is config
2681 if primitive_desc["name"] == "config":
2682 if "ns_config_info" in instantiation_params:
2683 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2684 return calculated_params
2685
2686 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2687 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2688
2689 # find vca_deployed record for this action
2690 try:
2691 for vca_deployed in db_deployed["VCA"]:
2692 if not vca_deployed:
2693 continue
2694 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2695 continue
2696 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2697 continue
2698 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2699 continue
2700 break
2701 else:
2702 # vca_deployed not found
2703 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2704 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2705
2706 # get ee_id
2707 ee_id = vca_deployed.get("ee_id")
2708 if not ee_id:
2709 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2710 "execution environment"
2711 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2712
2713 if primitive == "config":
2714 primitive_params = {"params": primitive_params}
2715
2716 while retries >= 0:
2717 try:
2718 output = await self.n2vc.exec_primitive(
2719 ee_id=ee_id,
2720 primitive_name=primitive,
2721 params_dict=primitive_params
2722 )
2723 # execution was OK
2724 break
2725 except Exception as e:
2726 retries -= 1
2727 if retries >= 0:
2728 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2729 # wait and retry
2730 await asyncio.sleep(retries_interval, loop=self.loop)
2731 else:
2732 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2733
2734 return output, 'OK'
2735
2736 except Exception as e:
2737 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2738
2739 async def action(self, nsr_id, nslcmop_id):
2740
2741 # Try to lock HA task here
2742 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2743 if not task_is_locked_by_me:
2744 return
2745
2746 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2747 self.logger.debug(logging_text + "Enter")
2748 # get all needed from database
2749 db_nsr = None
2750 db_nslcmop = None
2751 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2752 "_admin.current-operation": nslcmop_id,
2753 "_admin.operation-type": "action"}
2754 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2755 db_nslcmop_update = {}
2756 nslcmop_operation_state = None
2757 nslcmop_operation_state_detail = None
2758 exc = None
2759 try:
2760 # wait for any previous tasks in process
2761 step = "Waiting for previous operations to terminate"
2762 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2763
2764 self._write_ns_status(
2765 nsr_id=nsr_id,
2766 ns_state=None,
2767 current_operation="RUNNING ACTION",
2768 current_operation_id=nslcmop_id
2769 )
2770
2771 step = "Getting information from database"
2772 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2773 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2774
2775 nsr_deployed = db_nsr["_admin"].get("deployed")
2776 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2777 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2778 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2779 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2780 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2781
2782 if vnf_index:
2783 step = "Getting vnfr from database"
2784 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2785 step = "Getting vnfd from database"
2786 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2787 else:
2788 if db_nsr.get("nsd"):
2789 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2790 else:
2791 step = "Getting nsd from database"
2792 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2793
2794 # for backward compatibility
2795 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2796 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2797 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2798 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2799
2800 primitive = db_nslcmop["operationParams"]["primitive"]
2801 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2802
2803 # look for primitive
2804 config_primitive_desc = None
2805 if vdu_id:
2806 for vdu in get_iterable(db_vnfd, "vdu"):
2807 if vdu_id == vdu["id"]:
2808 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2809 if config_primitive["name"] == primitive:
2810 config_primitive_desc = config_primitive
2811 break
2812 elif kdu_name:
2813 self.logger.debug(logging_text + "Checking actions in KDUs")
2814 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2815 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2816 if primitive_params:
2817 desc_params.update(primitive_params)
2818 # TODO Check if we will need something at vnf level
2819 index = 0
2820 for kdu in get_iterable(nsr_deployed, "K8s"):
2821 if kdu_name == kdu["kdu-name"]:
2822 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2823 "path": "_admin.deployed.K8s.{}".format(index)}
2824 if primitive == "upgrade":
2825 if desc_params.get("kdu_model"):
2826 kdu_model = desc_params.get("kdu_model")
2827 del desc_params["kdu_model"]
2828 else:
2829 kdu_model = kdu.get("kdu-model")
2830 parts = kdu_model.split(sep=":")
2831 if len(parts) == 2:
2832 kdu_model = parts[0]
2833
2834 if kdu.get("k8scluster-type") == "chart":
2835 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2836 kdu_instance=kdu.get("kdu-instance"),
2837 atomic=True, kdu_model=kdu_model,
2838 params=desc_params, db_dict=db_dict,
2839 timeout=300)
2840 elif kdu.get("k8scluster-type") == "juju":
2841 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2842 kdu_instance=kdu.get("kdu-instance"),
2843 atomic=True, kdu_model=kdu_model,
2844 params=desc_params, db_dict=db_dict,
2845 timeout=300)
2846
2847 else:
2848 msg = "k8scluster-type not defined"
2849 raise LcmException(msg)
2850
2851 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2852 break
2853 elif primitive == "rollback":
2854 if kdu.get("k8scluster-type") == "chart":
2855 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2856 kdu_instance=kdu.get("kdu-instance"),
2857 db_dict=db_dict)
2858 elif kdu.get("k8scluster-type") == "juju":
2859 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2860 kdu_instance=kdu.get("kdu-instance"),
2861 db_dict=db_dict)
2862 else:
2863 msg = "k8scluster-type not defined"
2864 raise LcmException(msg)
2865 break
2866 elif primitive == "status":
2867 if kdu.get("k8scluster-type") == "chart":
2868 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2869 kdu_instance=kdu.get("kdu-instance"))
2870 elif kdu.get("k8scluster-type") == "juju":
2871 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2872 kdu_instance=kdu.get("kdu-instance"))
2873 else:
2874 msg = "k8scluster-type not defined"
2875 raise LcmException(msg)
2876 break
2877 index += 1
2878
2879 else:
2880 raise LcmException("KDU '{}' not found".format(kdu_name))
2881 if output:
2882 db_nslcmop_update["detailed-status"] = output
2883 db_nslcmop_update["operationState"] = 'COMPLETED'
2884 db_nslcmop_update["statusEnteredTime"] = time()
2885 else:
2886 db_nslcmop_update["detailed-status"] = ''
2887 db_nslcmop_update["operationState"] = 'FAILED'
2888 db_nslcmop_update["statusEnteredTime"] = time()
2889 return
2890 elif vnf_index:
2891 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2892 if config_primitive["name"] == primitive:
2893 config_primitive_desc = config_primitive
2894 break
2895 else:
2896 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2897 if config_primitive["name"] == primitive:
2898 config_primitive_desc = config_primitive
2899 break
2900
2901 if not config_primitive_desc:
2902 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2903 format(primitive))
2904
2905 desc_params = {}
2906 if vnf_index:
2907 if db_vnfr.get("additionalParamsForVnf"):
2908 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2909 if vdu_id:
2910 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2911 if vdur.get("additionalParams"):
2912 desc_params = self._format_additional_params(vdur["additionalParams"])
2913 else:
2914 if db_nsr.get("additionalParamsForNs"):
2915 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2916
2917 # TODO check if ns is in a proper status
2918 output, detail = await self._ns_execute_primitive(
2919 db_deployed=nsr_deployed,
2920 member_vnf_index=vnf_index,
2921 vdu_id=vdu_id,
2922 vdu_name=vdu_name,
2923 vdu_count_index=vdu_count_index,
2924 primitive=primitive,
2925 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2926
2927 detailed_status = output
2928 if detail == 'OK':
2929 result = 'COMPLETED'
2930 else:
2931 result = 'FAILED'
2932
2933 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2934 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2935 db_nslcmop_update["statusEnteredTime"] = time()
2936 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2937 return # database update is called inside finally
2938
2939 except (DbException, LcmException) as e:
2940 self.logger.error(logging_text + "Exit Exception {}".format(e))
2941 exc = e
2942 except asyncio.CancelledError:
2943 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2944 exc = "Operation was cancelled"
2945 except Exception as e:
2946 exc = traceback.format_exc()
2947 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2948 finally:
2949 if exc and db_nslcmop:
2950 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2951 "FAILED {}: {}".format(step, exc)
2952 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2953 db_nslcmop_update["statusEnteredTime"] = time()
2954 try:
2955 if db_nslcmop_update:
2956 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2957 if db_nsr:
2958 db_nsr_update["_admin.nslcmop"] = None
2959 db_nsr_update["_admin.operation-type"] = None
2960 db_nsr_update["_admin.nslcmop"] = None
2961 db_nsr_update["_admin.current-operation"] = None
2962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2963 self._write_ns_status(
2964 nsr_id=nsr_id,
2965 ns_state=None,
2966 current_operation="IDLE",
2967 current_operation_id=None
2968 )
2969 if exc:
2970 self._write_op_status(
2971 op_id=nslcmop_id,
2972 error_message=nslcmop_operation_state_detail
2973 )
2974 except DbException as e:
2975 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2976 self.logger.debug(logging_text + "Exit")
2977 if nslcmop_operation_state:
2978 try:
2979 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2980 "operationState": nslcmop_operation_state},
2981 loop=self.loop)
2982 except Exception as e:
2983 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2984 self.logger.debug(logging_text + "Exit")
2985 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2986 return nslcmop_operation_state, nslcmop_operation_state_detail
2987
2988 async def scale(self, nsr_id, nslcmop_id):
2989
2990 # Try to lock HA task here
2991 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2992 if not task_is_locked_by_me:
2993 return
2994
2995 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2996 self.logger.debug(logging_text + "Enter")
2997 # get all needed from database
2998 db_nsr = None
2999 db_nslcmop = None
3000 db_nslcmop_update = {}
3001 nslcmop_operation_state = None
3002 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3003 "_admin.current-operation": nslcmop_id,
3004 "_admin.operation-type": "scale"}
3005 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3006 exc = None
3007 # in case of error, indicates what part of scale was failed to put nsr at error status
3008 scale_process = None
3009 old_operational_status = ""
3010 old_config_status = ""
3011 vnfr_scaled = False
3012 try:
3013 # wait for any previous tasks in process
3014 step = "Waiting for previous operations to terminate"
3015 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3016
3017 self._write_ns_status(
3018 nsr_id=nsr_id,
3019 ns_state=None,
3020 current_operation="SCALING",
3021 current_operation_id=nslcmop_id
3022 )
3023
3024 step = "Getting nslcmop from database"
3025 self.logger.debug(step + " after having waited for previous tasks to be completed")
3026 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3027 step = "Getting nsr from database"
3028 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3029
3030 old_operational_status = db_nsr["operational-status"]
3031 old_config_status = db_nsr["config-status"]
3032 step = "Parsing scaling parameters"
3033 # self.logger.debug(step)
3034 db_nsr_update["operational-status"] = "scaling"
3035 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3036 nsr_deployed = db_nsr["_admin"].get("deployed")
3037
3038 #######
3039 nsr_deployed = db_nsr["_admin"].get("deployed")
3040 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3041 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3042 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3043 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3044 #######
3045
3046 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3047 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3048 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3049 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3050 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3051
3052 # for backward compatibility
3053 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3054 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3055 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3056 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3057
3058 step = "Getting vnfr from database"
3059 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3060 step = "Getting vnfd from database"
3061 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3062
3063 step = "Getting scaling-group-descriptor"
3064 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3065 if scaling_descriptor["name"] == scaling_group:
3066 break
3067 else:
3068 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3069 "at vnfd:scaling-group-descriptor".format(scaling_group))
3070
3071 # cooldown_time = 0
3072 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3073 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3074 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3075 # break
3076
3077 # TODO check if ns is in a proper status
3078 step = "Sending scale order to VIM"
3079 nb_scale_op = 0
3080 if not db_nsr["_admin"].get("scaling-group"):
3081 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3082 admin_scale_index = 0
3083 else:
3084 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3085 if admin_scale_info["name"] == scaling_group:
3086 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3087 break
3088 else: # not found, set index one plus last element and add new entry with the name
3089 admin_scale_index += 1
3090 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3091 RO_scaling_info = []
3092 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3093 if scaling_type == "SCALE_OUT":
3094 # count if max-instance-count is reached
3095 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3096 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3097 if nb_scale_op >= max_instance_count:
3098 raise LcmException("reached the limit of {} (max-instance-count) "
3099 "scaling-out operations for the "
3100 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3101
3102 nb_scale_op += 1
3103 vdu_scaling_info["scaling_direction"] = "OUT"
3104 vdu_scaling_info["vdu-create"] = {}
3105 for vdu_scale_info in scaling_descriptor["vdu"]:
3106 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3107 "type": "create", "count": vdu_scale_info.get("count", 1)})
3108 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3109
3110 elif scaling_type == "SCALE_IN":
3111 # count if min-instance-count is reached
3112 min_instance_count = 0
3113 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3114 min_instance_count = int(scaling_descriptor["min-instance-count"])
3115 if nb_scale_op <= min_instance_count:
3116 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3117 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3118 nb_scale_op -= 1
3119 vdu_scaling_info["scaling_direction"] = "IN"
3120 vdu_scaling_info["vdu-delete"] = {}
3121 for vdu_scale_info in scaling_descriptor["vdu"]:
3122 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3123 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3124 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3125
3126 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3127 vdu_create = vdu_scaling_info.get("vdu-create")
3128 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3129 if vdu_scaling_info["scaling_direction"] == "IN":
3130 for vdur in reversed(db_vnfr["vdur"]):
3131 if vdu_delete.get(vdur["vdu-id-ref"]):
3132 vdu_delete[vdur["vdu-id-ref"]] -= 1
3133 vdu_scaling_info["vdu"].append({
3134 "name": vdur["name"],
3135 "vdu_id": vdur["vdu-id-ref"],
3136 "interface": []
3137 })
3138 for interface in vdur["interfaces"]:
3139 vdu_scaling_info["vdu"][-1]["interface"].append({
3140 "name": interface["name"],
3141 "ip_address": interface["ip-address"],
3142 "mac_address": interface.get("mac-address"),
3143 })
3144 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3145
3146 # PRE-SCALE BEGIN
3147 step = "Executing pre-scale vnf-config-primitive"
3148 if scaling_descriptor.get("scaling-config-action"):
3149 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3150 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3151 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3152 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3153 step = db_nslcmop_update["detailed-status"] = \
3154 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3155
3156 # look for primitive
3157 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3158 if config_primitive["name"] == vnf_config_primitive:
3159 break
3160 else:
3161 raise LcmException(
3162 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3163 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3164 "primitive".format(scaling_group, config_primitive))
3165
3166 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3167 if db_vnfr.get("additionalParamsForVnf"):
3168 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3169
3170 scale_process = "VCA"
3171 db_nsr_update["config-status"] = "configuring pre-scaling"
3172 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3173
3174 # Pre-scale reintent check: Check if this sub-operation has been executed before
3175 op_index = self._check_or_add_scale_suboperation(
3176 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3177 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3178 # Skip sub-operation
3179 result = 'COMPLETED'
3180 result_detail = 'Done'
3181 self.logger.debug(logging_text +
3182 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3183 vnf_config_primitive, result, result_detail))
3184 else:
3185 if (op_index == self.SUBOPERATION_STATUS_NEW):
3186 # New sub-operation: Get index of this sub-operation
3187 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3188 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3189 format(vnf_config_primitive))
3190 else:
3191 # Reintent: Get registered params for this existing sub-operation
3192 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3193 vnf_index = op.get('member_vnf_index')
3194 vnf_config_primitive = op.get('primitive')
3195 primitive_params = op.get('primitive_params')
3196 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3197 format(vnf_config_primitive))
3198 # Execute the primitive, either with new (first-time) or registered (reintent) args
3199 result, result_detail = await self._ns_execute_primitive(
3200 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3201 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3202 vnf_config_primitive, result, result_detail))
3203 # Update operationState = COMPLETED | FAILED
3204 self._update_suboperation_status(
3205 db_nslcmop, op_index, result, result_detail)
3206
3207 if result == "FAILED":
3208 raise LcmException(result_detail)
3209 db_nsr_update["config-status"] = old_config_status
3210 scale_process = None
3211 # PRE-SCALE END
3212
3213 # SCALE RO - BEGIN
3214 # Should this block be skipped if 'RO_nsr_id' == None ?
3215 # if (RO_nsr_id and RO_scaling_info):
3216 if RO_scaling_info:
3217 scale_process = "RO"
3218 # Scale RO reintent check: Check if this sub-operation has been executed before
3219 op_index = self._check_or_add_scale_suboperation(
3220 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3221 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3222 # Skip sub-operation
3223 result = 'COMPLETED'
3224 result_detail = 'Done'
3225 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3226 result, result_detail))
3227 else:
3228 if (op_index == self.SUBOPERATION_STATUS_NEW):
3229 # New sub-operation: Get index of this sub-operation
3230 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3231 self.logger.debug(logging_text + "New sub-operation RO")
3232 else:
3233 # Reintent: Get registered params for this existing sub-operation
3234 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3235 RO_nsr_id = op.get('RO_nsr_id')
3236 RO_scaling_info = op.get('RO_scaling_info')
3237 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3238 vnf_config_primitive))
3239
3240 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3241 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3242 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3243 # wait until ready
3244 RO_nslcmop_id = RO_desc["instance_action_id"]
3245 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3246
3247 RO_task_done = False
3248 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3249 detailed_status_old = None
3250 self.logger.debug(logging_text + step)
3251
3252 deployment_timeout = 1 * 3600 # One hour
3253 while deployment_timeout > 0:
3254 if not RO_task_done:
3255 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3256 extra_item_id=RO_nslcmop_id)
3257
3258 # deploymentStatus
3259 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3260
3261 ns_status, ns_status_info = self.RO.check_action_status(desc)
3262 if ns_status == "ERROR":
3263 raise ROclient.ROClientException(ns_status_info)
3264 elif ns_status == "BUILD":
3265 detailed_status = step + "; {}".format(ns_status_info)
3266 elif ns_status == "ACTIVE":
3267 RO_task_done = True
3268 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3269 self.logger.debug(logging_text + step)
3270 else:
3271 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3272 else:
3273
3274 if ns_status == "ERROR":
3275 raise ROclient.ROClientException(ns_status_info)
3276 elif ns_status == "BUILD":
3277 detailed_status = step + "; {}".format(ns_status_info)
3278 elif ns_status == "ACTIVE":
3279 step = detailed_status = \
3280 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3281 if not vnfr_scaled:
3282 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3283 vnfr_scaled = True
3284 try:
3285 desc = await self.RO.show("ns", RO_nsr_id)
3286
3287 # deploymentStatus
3288 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3289
3290 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3291 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3292 break
3293 except LcmExceptionNoMgmtIP:
3294 pass
3295 else:
3296 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3297 if detailed_status != detailed_status_old:
3298 self._update_suboperation_status(
3299 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3300 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3301 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3302
3303 await asyncio.sleep(5, loop=self.loop)
3304 deployment_timeout -= 5
3305 if deployment_timeout <= 0:
3306 self._update_suboperation_status(
3307 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3308 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3309
3310 # update VDU_SCALING_INFO with the obtained ip_addresses
3311 if vdu_scaling_info["scaling_direction"] == "OUT":
3312 for vdur in reversed(db_vnfr["vdur"]):
3313 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3314 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3315 vdu_scaling_info["vdu"].append({
3316 "name": vdur["name"],
3317 "vdu_id": vdur["vdu-id-ref"],
3318 "interface": []
3319 })
3320 for interface in vdur["interfaces"]:
3321 vdu_scaling_info["vdu"][-1]["interface"].append({
3322 "name": interface["name"],
3323 "ip_address": interface["ip-address"],
3324 "mac_address": interface.get("mac-address"),
3325 })
3326 del vdu_scaling_info["vdu-create"]
3327
3328 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3329 # SCALE RO - END
3330
3331 scale_process = None
3332 if db_nsr_update:
3333 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3334
3335 # POST-SCALE BEGIN
3336 # execute primitive service POST-SCALING
3337 step = "Executing post-scale vnf-config-primitive"
3338 if scaling_descriptor.get("scaling-config-action"):
3339 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3340 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3341 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3342 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3343 step = db_nslcmop_update["detailed-status"] = \
3344 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3345
3346 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3347 if db_vnfr.get("additionalParamsForVnf"):
3348 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3349
3350 # look for primitive
3351 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3352 if config_primitive["name"] == vnf_config_primitive:
3353 break
3354 else:
3355 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3356 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3357 "match any vnf-configuration:config-primitive".format(scaling_group,
3358 config_primitive))
3359 scale_process = "VCA"
3360 db_nsr_update["config-status"] = "configuring post-scaling"
3361 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3362
3363 # Post-scale reintent check: Check if this sub-operation has been executed before
3364 op_index = self._check_or_add_scale_suboperation(
3365 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3366 if op_index == self.SUBOPERATION_STATUS_SKIP:
3367 # Skip sub-operation
3368 result = 'COMPLETED'
3369 result_detail = 'Done'
3370 self.logger.debug(logging_text +
3371 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3372 format(vnf_config_primitive, result, result_detail))
3373 else:
3374 if op_index == self.SUBOPERATION_STATUS_NEW:
3375 # New sub-operation: Get index of this sub-operation
3376 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3377 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3378 format(vnf_config_primitive))
3379 else:
3380 # Reintent: Get registered params for this existing sub-operation
3381 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3382 vnf_index = op.get('member_vnf_index')
3383 vnf_config_primitive = op.get('primitive')
3384 primitive_params = op.get('primitive_params')
3385 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3386 format(vnf_config_primitive))
3387 # Execute the primitive, either with new (first-time) or registered (reintent) args
3388 result, result_detail = await self._ns_execute_primitive(
3389 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3390 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3391 vnf_config_primitive, result, result_detail))
3392 # Update operationState = COMPLETED | FAILED
3393 self._update_suboperation_status(
3394 db_nslcmop, op_index, result, result_detail)
3395
3396 if result == "FAILED":
3397 raise LcmException(result_detail)
3398 db_nsr_update["config-status"] = old_config_status
3399 scale_process = None
3400 # POST-SCALE END
3401
3402 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3403 db_nslcmop_update["statusEnteredTime"] = time()
3404 db_nslcmop_update["detailed-status"] = "done"
3405 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3406 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3407 else old_operational_status
3408 db_nsr_update["config-status"] = old_config_status
3409 return
3410 except (ROclient.ROClientException, DbException, LcmException) as e:
3411 self.logger.error(logging_text + "Exit Exception {}".format(e))
3412 exc = e
3413 except asyncio.CancelledError:
3414 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3415 exc = "Operation was cancelled"
3416 except Exception as e:
3417 exc = traceback.format_exc()
3418 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3419 finally:
3420 self._write_ns_status(
3421 nsr_id=nsr_id,
3422 ns_state=None,
3423 current_operation="IDLE",
3424 current_operation_id=None
3425 )
3426 if exc:
3427 if db_nslcmop:
3428 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3429 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3430 db_nslcmop_update["statusEnteredTime"] = time()
3431 if db_nsr:
3432 db_nsr_update["operational-status"] = old_operational_status
3433 db_nsr_update["config-status"] = old_config_status
3434 db_nsr_update["detailed-status"] = ""
3435 db_nsr_update["_admin.nslcmop"] = None
3436 if scale_process:
3437 if "VCA" in scale_process:
3438 db_nsr_update["config-status"] = "failed"
3439 if "RO" in scale_process:
3440 db_nsr_update["operational-status"] = "failed"
3441 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3442 exc)
3443 try:
3444 if db_nslcmop and db_nslcmop_update:
3445 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3446 if db_nsr:
3447 db_nsr_update["_admin.current-operation"] = None
3448 db_nsr_update["_admin.operation-type"] = None
3449 db_nsr_update["_admin.nslcmop"] = None
3450 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3451
3452 self._write_ns_status(
3453 nsr_id=nsr_id,
3454 ns_state=None,
3455 current_operation="IDLE",
3456 current_operation_id=None
3457 )
3458
3459 except DbException as e:
3460 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3461 if nslcmop_operation_state:
3462 try:
3463 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3464 "operationState": nslcmop_operation_state},
3465 loop=self.loop)
3466 # if cooldown_time:
3467 # await asyncio.sleep(cooldown_time, loop=self.loop)
3468 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3469 except Exception as e:
3470 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3471 self.logger.debug(logging_text + "Exit")
3472 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")