Fix bug 970. Changes in NS and operation status
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107 if 'enableosupgrade' in self.vca_config:
108 if self.vca_config['enableosupgrade'].lower() == 'false':
109 self.vca_config['enable_os_upgrade'] = False
110 elif self.vca_config['enableosupgrade'].lower() == 'true':
111 self.vca_config['enable_os_upgrade'] = True
112 if 'aptmirror' in self.vca_config:
113 self.vca_config['apt_mirror'] = self.vca_config['aptmirror']
114
115 # create N2VC connector
116 self.n2vc = N2VCJujuConnector(
117 db=self.db,
118 fs=self.fs,
119 log=self.logger,
120 loop=self.loop,
121 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
122 username=self.vca_config.get('user', None),
123 vca_config=self.vca_config,
124 on_update_db=self._on_update_n2vc_db
125 )
126
127 self.k8sclusterhelm = K8sHelmConnector(
128 kubectl_command=self.vca_config.get("kubectlpath"),
129 helm_command=self.vca_config.get("helmpath"),
130 fs=self.fs,
131 log=self.logger,
132 db=self.db,
133 on_update_db=None,
134 )
135
136 self.k8sclusterjuju = K8sJujuConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 juju_command=self.vca_config.get("jujupath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 # create RO client
146 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
147
148 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
149
150 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
151
152 try:
153 # TODO filter RO descriptor fields...
154
155 # write to database
156 db_dict = dict()
157 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
158 db_dict['deploymentStatus'] = ro_descriptor
159 self.update_db_2("nsrs", nsrs_id, db_dict)
160
161 except Exception as e:
162 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
163
164 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
165
166 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
167 # .format(table, filter, path, updated_data))
168
169 try:
170
171 nsr_id = filter.get('_id')
172
173 # read ns record from database
174 nsr = self.db.get_one(table='nsrs', q_filter=filter)
175 current_ns_status = nsr.get('nsState')
176
177 # get vca status for NS
178 # status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
179 # TEMPORAL
180 status_dict = str(await self.n2vc.get_status(namespace='.' + nsr_id))
181
182 # vcaStatus
183 db_dict = dict()
184 db_dict['vcaStatus'] = status_dict
185
186 # update configurationStatus for this VCA
187 try:
188 vca_index = int(path[path.rfind(".")+1:])
189
190 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
191 vca_status = vca_list[vca_index].get('status')
192
193 configuration_status_list = nsr.get('configurationStatus')
194 config_status = configuration_status_list[vca_index].get('status')
195
196 if config_status == 'BROKEN' and vca_status != 'failed':
197 db_dict['configurationStatus'][vca_index] = 'READY'
198 elif config_status != 'BROKEN' and vca_status == 'failed':
199 db_dict['configurationStatus'][vca_index] = 'BROKEN'
200 except Exception as e:
201 # not update configurationStatus
202 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
203
204 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
205 # if nsState = 'DEGRADED' check if all is OK
206 is_degraded = False
207 if current_ns_status in ('READY', 'DEGRADED'):
208 error_description = ''
209 # check machines
210 if status_dict.get('machines'):
211 for machine_id in status_dict.get('machines'):
212 machine = status_dict.get('machines').get(machine_id)
213 # check machine agent-status
214 if machine.get('agent-status'):
215 s = machine.get('agent-status').get('status')
216 if s != 'started':
217 is_degraded = True
218 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
219 # check machine instance status
220 if machine.get('instance-status'):
221 s = machine.get('instance-status').get('status')
222 if s != 'running':
223 is_degraded = True
224 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
225 # check applications
226 if status_dict.get('applications'):
227 for app_id in status_dict.get('applications'):
228 app = status_dict.get('applications').get(app_id)
229 # check application status
230 if app.get('status'):
231 s = app.get('status').get('status')
232 if s != 'active':
233 is_degraded = True
234 error_description += 'application {} status={} ; '.format(app_id, s)
235
236 if error_description:
237 db_dict['errorDescription'] = error_description
238 if current_ns_status == 'READY' and is_degraded:
239 db_dict['nsState'] = 'DEGRADED'
240 if current_ns_status == 'DEGRADED' and not is_degraded:
241 db_dict['nsState'] = 'READY'
242
243 # write to database
244 self.update_db_2("nsrs", nsr_id, db_dict)
245
246 except Exception as e:
247 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
248
249 return
250
251 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
252 """
253 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
254 :param vnfd: input vnfd
255 :param new_id: overrides vnf id if provided
256 :param additionalParams: Instantiation params for VNFs provided
257 :param nsrId: Id of the NSR
258 :return: copy of vnfd
259 """
260 try:
261 vnfd_RO = deepcopy(vnfd)
262 # remove unused by RO configuration, monitoring, scaling and internal keys
263 vnfd_RO.pop("_id", None)
264 vnfd_RO.pop("_admin", None)
265 vnfd_RO.pop("vnf-configuration", None)
266 vnfd_RO.pop("monitoring-param", None)
267 vnfd_RO.pop("scaling-group-descriptor", None)
268 vnfd_RO.pop("kdu", None)
269 vnfd_RO.pop("k8s-cluster", None)
270 if new_id:
271 vnfd_RO["id"] = new_id
272
273 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
274 for vdu in get_iterable(vnfd_RO, "vdu"):
275 cloud_init_file = None
276 if vdu.get("cloud-init-file"):
277 base_folder = vnfd["_admin"]["storage"]
278 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
279 vdu["cloud-init-file"])
280 with self.fs.file_open(cloud_init_file, "r") as ci_file:
281 cloud_init_content = ci_file.read()
282 vdu.pop("cloud-init-file", None)
283 elif vdu.get("cloud-init"):
284 cloud_init_content = vdu["cloud-init"]
285 else:
286 continue
287
288 env = Environment()
289 ast = env.parse(cloud_init_content)
290 mandatory_vars = meta.find_undeclared_variables(ast)
291 if mandatory_vars:
292 for var in mandatory_vars:
293 if not additionalParams or var not in additionalParams.keys():
294 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
295 "file, must be provided in the instantiation parameters inside the "
296 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
297 template = Template(cloud_init_content)
298 cloud_init_content = template.render(additionalParams or {})
299 vdu["cloud-init"] = cloud_init_content
300
301 return vnfd_RO
302 except FsException as e:
303 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
304 format(vnfd["id"], vdu["id"], cloud_init_file, e))
305 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
306 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
307 format(vnfd["id"], vdu["id"], e))
308
309 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
310 """
311 Creates a RO ns descriptor from OSM ns_instantiate params
312 :param ns_params: OSM instantiate params
313 :return: The RO ns descriptor
314 """
315 vim_2_RO = {}
316 wim_2_RO = {}
317 # TODO feature 1417: Check that no instantiation is set over PDU
318 # check if PDU forces a concrete vim-network-id and add it
319 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
320
321 def vim_account_2_RO(vim_account):
322 if vim_account in vim_2_RO:
323 return vim_2_RO[vim_account]
324
325 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
326 if db_vim["_admin"]["operationalState"] != "ENABLED":
327 raise LcmException("VIM={} is not available. operationalState={}".format(
328 vim_account, db_vim["_admin"]["operationalState"]))
329 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
330 vim_2_RO[vim_account] = RO_vim_id
331 return RO_vim_id
332
333 def wim_account_2_RO(wim_account):
334 if isinstance(wim_account, str):
335 if wim_account in wim_2_RO:
336 return wim_2_RO[wim_account]
337
338 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
339 if db_wim["_admin"]["operationalState"] != "ENABLED":
340 raise LcmException("WIM={} is not available. operationalState={}".format(
341 wim_account, db_wim["_admin"]["operationalState"]))
342 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
343 wim_2_RO[wim_account] = RO_wim_id
344 return RO_wim_id
345 else:
346 return wim_account
347
348 def ip_profile_2_RO(ip_profile):
349 RO_ip_profile = deepcopy((ip_profile))
350 if "dns-server" in RO_ip_profile:
351 if isinstance(RO_ip_profile["dns-server"], list):
352 RO_ip_profile["dns-address"] = []
353 for ds in RO_ip_profile.pop("dns-server"):
354 RO_ip_profile["dns-address"].append(ds['address'])
355 else:
356 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
357 if RO_ip_profile.get("ip-version") == "ipv4":
358 RO_ip_profile["ip-version"] = "IPv4"
359 if RO_ip_profile.get("ip-version") == "ipv6":
360 RO_ip_profile["ip-version"] = "IPv6"
361 if "dhcp-params" in RO_ip_profile:
362 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
363 return RO_ip_profile
364
365 if not ns_params:
366 return None
367 RO_ns_params = {
368 # "name": ns_params["nsName"],
369 # "description": ns_params.get("nsDescription"),
370 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
371 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
372 # "scenario": ns_params["nsdId"],
373 }
374
375 n2vc_key_list = n2vc_key_list or []
376 for vnfd_ref, vnfd in vnfd_dict.items():
377 vdu_needed_access = []
378 mgmt_cp = None
379 if vnfd.get("vnf-configuration"):
380 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
381 if ssh_required and vnfd.get("mgmt-interface"):
382 if vnfd["mgmt-interface"].get("vdu-id"):
383 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
384 elif vnfd["mgmt-interface"].get("cp"):
385 mgmt_cp = vnfd["mgmt-interface"]["cp"]
386
387 for vdu in vnfd.get("vdu", ()):
388 if vdu.get("vdu-configuration"):
389 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
390 if ssh_required:
391 vdu_needed_access.append(vdu["id"])
392 elif mgmt_cp:
393 for vdu_interface in vdu.get("interface"):
394 if vdu_interface.get("external-connection-point-ref") and \
395 vdu_interface["external-connection-point-ref"] == mgmt_cp:
396 vdu_needed_access.append(vdu["id"])
397 mgmt_cp = None
398 break
399
400 if vdu_needed_access:
401 for vnf_member in nsd.get("constituent-vnfd"):
402 if vnf_member["vnfd-id-ref"] != vnfd_ref:
403 continue
404 for vdu in vdu_needed_access:
405 populate_dict(RO_ns_params,
406 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
407 n2vc_key_list)
408
409 if ns_params.get("vduImage"):
410 RO_ns_params["vduImage"] = ns_params["vduImage"]
411
412 if ns_params.get("ssh_keys"):
413 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
414 for vnf_params in get_iterable(ns_params, "vnf"):
415 for constituent_vnfd in nsd["constituent-vnfd"]:
416 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
417 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
418 break
419 else:
420 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
421 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
422 if vnf_params.get("vimAccountId"):
423 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
424 vim_account_2_RO(vnf_params["vimAccountId"]))
425
426 for vdu_params in get_iterable(vnf_params, "vdu"):
427 # TODO feature 1417: check that this VDU exist and it is not a PDU
428 if vdu_params.get("volume"):
429 for volume_params in vdu_params["volume"]:
430 if volume_params.get("vim-volume-id"):
431 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
432 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
433 volume_params["vim-volume-id"])
434 if vdu_params.get("interface"):
435 for interface_params in vdu_params["interface"]:
436 if interface_params.get("ip-address"):
437 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
438 vdu_params["id"], "interfaces", interface_params["name"],
439 "ip_address"),
440 interface_params["ip-address"])
441 if interface_params.get("mac-address"):
442 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
443 vdu_params["id"], "interfaces", interface_params["name"],
444 "mac_address"),
445 interface_params["mac-address"])
446 if interface_params.get("floating-ip-required"):
447 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
448 vdu_params["id"], "interfaces", interface_params["name"],
449 "floating-ip"),
450 interface_params["floating-ip-required"])
451
452 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
453 if internal_vld_params.get("vim-network-name"):
454 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
455 internal_vld_params["name"], "vim-network-name"),
456 internal_vld_params["vim-network-name"])
457 if internal_vld_params.get("vim-network-id"):
458 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
459 internal_vld_params["name"], "vim-network-id"),
460 internal_vld_params["vim-network-id"])
461 if internal_vld_params.get("ip-profile"):
462 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
463 internal_vld_params["name"], "ip-profile"),
464 ip_profile_2_RO(internal_vld_params["ip-profile"]))
465 if internal_vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
468 internal_vld_params["name"], "provider-network"),
469 internal_vld_params["provider-network"].copy())
470
471 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
472 # look for interface
473 iface_found = False
474 for vdu_descriptor in vnf_descriptor["vdu"]:
475 for vdu_interface in vdu_descriptor["interface"]:
476 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
477 if icp_params.get("ip-address"):
478 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
479 vdu_descriptor["id"], "interfaces",
480 vdu_interface["name"], "ip_address"),
481 icp_params["ip-address"])
482
483 if icp_params.get("mac-address"):
484 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
485 vdu_descriptor["id"], "interfaces",
486 vdu_interface["name"], "mac_address"),
487 icp_params["mac-address"])
488 iface_found = True
489 break
490 if iface_found:
491 break
492 else:
493 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
494 "internal-vld:id-ref={} is not present at vnfd:internal-"
495 "connection-point".format(vnf_params["member-vnf-index"],
496 icp_params["id-ref"]))
497
498 for vld_params in get_iterable(ns_params, "vld"):
499 if "ip-profile" in vld_params:
500 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
501 ip_profile_2_RO(vld_params["ip-profile"]))
502
503 if vld_params.get("provider-network"):
504
505 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
506 vld_params["provider-network"].copy())
507
508 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
509 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
510 wim_account_2_RO(vld_params["wimAccountId"])),
511 if vld_params.get("vim-network-name"):
512 RO_vld_sites = []
513 if isinstance(vld_params["vim-network-name"], dict):
514 for vim_account, vim_net in vld_params["vim-network-name"].items():
515 RO_vld_sites.append({
516 "netmap-use": vim_net,
517 "datacenter": vim_account_2_RO(vim_account)
518 })
519 else: # isinstance str
520 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
521 if RO_vld_sites:
522 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
523
524 if vld_params.get("vim-network-id"):
525 RO_vld_sites = []
526 if isinstance(vld_params["vim-network-id"], dict):
527 for vim_account, vim_net in vld_params["vim-network-id"].items():
528 RO_vld_sites.append({
529 "netmap-use": vim_net,
530 "datacenter": vim_account_2_RO(vim_account)
531 })
532 else: # isinstance str
533 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
534 if RO_vld_sites:
535 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
536 if vld_params.get("ns-net"):
537 if isinstance(vld_params["ns-net"], dict):
538 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
539 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
540 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
541 if "vnfd-connection-point-ref" in vld_params:
542 for cp_params in vld_params["vnfd-connection-point-ref"]:
543 # look for interface
544 for constituent_vnfd in nsd["constituent-vnfd"]:
545 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
546 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
547 break
548 else:
549 raise LcmException(
550 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
551 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
552 match_cp = False
553 for vdu_descriptor in vnf_descriptor["vdu"]:
554 for interface_descriptor in vdu_descriptor["interface"]:
555 if interface_descriptor.get("external-connection-point-ref") == \
556 cp_params["vnfd-connection-point-ref"]:
557 match_cp = True
558 break
559 if match_cp:
560 break
561 else:
562 raise LcmException(
563 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
564 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
565 cp_params["member-vnf-index-ref"],
566 cp_params["vnfd-connection-point-ref"],
567 vnf_descriptor["id"]))
568 if cp_params.get("ip-address"):
569 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
570 vdu_descriptor["id"], "interfaces",
571 interface_descriptor["name"], "ip_address"),
572 cp_params["ip-address"])
573 if cp_params.get("mac-address"):
574 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
575 vdu_descriptor["id"], "interfaces",
576 interface_descriptor["name"], "mac_address"),
577 cp_params["mac-address"])
578 return RO_ns_params
579
580 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
581 # make a copy to do not change
582 vdu_create = copy(vdu_create)
583 vdu_delete = copy(vdu_delete)
584
585 vdurs = db_vnfr.get("vdur")
586 if vdurs is None:
587 vdurs = []
588 vdu_index = len(vdurs)
589 while vdu_index:
590 vdu_index -= 1
591 vdur = vdurs[vdu_index]
592 if vdur.get("pdu-type"):
593 continue
594 vdu_id_ref = vdur["vdu-id-ref"]
595 if vdu_create and vdu_create.get(vdu_id_ref):
596 for index in range(0, vdu_create[vdu_id_ref]):
597 vdur = deepcopy(vdur)
598 vdur["_id"] = str(uuid4())
599 vdur["count-index"] += 1
600 vdurs.insert(vdu_index+1+index, vdur)
601 del vdu_create[vdu_id_ref]
602 if vdu_delete and vdu_delete.get(vdu_id_ref):
603 del vdurs[vdu_index]
604 vdu_delete[vdu_id_ref] -= 1
605 if not vdu_delete[vdu_id_ref]:
606 del vdu_delete[vdu_id_ref]
607 # check all operations are done
608 if vdu_create or vdu_delete:
609 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
610 vdu_create))
611 if vdu_delete:
612 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
613 vdu_delete))
614
615 vnfr_update = {"vdur": vdurs}
616 db_vnfr["vdur"] = vdurs
617 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
618
619 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
620 """
621 Updates database nsr with the RO info for the created vld
622 :param ns_update_nsr: dictionary to be filled with the updated info
623 :param db_nsr: content of db_nsr. This is also modified
624 :param nsr_desc_RO: nsr descriptor from RO
625 :return: Nothing, LcmException is raised on errors
626 """
627
628 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
629 for net_RO in get_iterable(nsr_desc_RO, "nets"):
630 if vld["id"] != net_RO.get("ns_net_osm_id"):
631 continue
632 vld["vim-id"] = net_RO.get("vim_net_id")
633 vld["name"] = net_RO.get("vim_name")
634 vld["status"] = net_RO.get("status")
635 vld["status-detailed"] = net_RO.get("error_msg")
636 ns_update_nsr["vld.{}".format(vld_index)] = vld
637 break
638 else:
639 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
640
641 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
642 """
643 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
644 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
645 :param nsr_desc_RO: nsr descriptor from RO
646 :return: Nothing, LcmException is raised on errors
647 """
648 for vnf_index, db_vnfr in db_vnfrs.items():
649 for vnf_RO in nsr_desc_RO["vnfs"]:
650 if vnf_RO["member_vnf_index"] != vnf_index:
651 continue
652 vnfr_update = {}
653 if vnf_RO.get("ip_address"):
654 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
655 elif not db_vnfr.get("ip-address"):
656 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
657
658 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
659 vdur_RO_count_index = 0
660 if vdur.get("pdu-type"):
661 continue
662 for vdur_RO in get_iterable(vnf_RO, "vms"):
663 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
664 continue
665 if vdur["count-index"] != vdur_RO_count_index:
666 vdur_RO_count_index += 1
667 continue
668 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
669 if vdur_RO.get("ip_address"):
670 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
671 else:
672 vdur["ip-address"] = None
673 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
674 vdur["name"] = vdur_RO.get("vim_name")
675 vdur["status"] = vdur_RO.get("status")
676 vdur["status-detailed"] = vdur_RO.get("error_msg")
677 for ifacer in get_iterable(vdur, "interfaces"):
678 for interface_RO in get_iterable(vdur_RO, "interfaces"):
679 if ifacer["name"] == interface_RO.get("internal_name"):
680 ifacer["ip-address"] = interface_RO.get("ip_address")
681 ifacer["mac-address"] = interface_RO.get("mac_address")
682 break
683 else:
684 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
685 "from VIM info"
686 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
687 vnfr_update["vdur.{}".format(vdu_index)] = vdur
688 break
689 else:
690 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
691 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
692
693 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
694 for net_RO in get_iterable(nsr_desc_RO, "nets"):
695 if vld["id"] != net_RO.get("vnf_net_osm_id"):
696 continue
697 vld["vim-id"] = net_RO.get("vim_net_id")
698 vld["name"] = net_RO.get("vim_name")
699 vld["status"] = net_RO.get("status")
700 vld["status-detailed"] = net_RO.get("error_msg")
701 vnfr_update["vld.{}".format(vld_index)] = vld
702 break
703 else:
704 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
705 vnf_index, vld["id"]))
706
707 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
708 break
709
710 else:
711 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
712
713 def _get_ns_config_info(self, nsr_id):
714 """
715 Generates a mapping between vnf,vdu elements and the N2VC id
716 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
717 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
718 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
719 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
720 """
721 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
722 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
723 mapping = {}
724 ns_config_info = {"osm-config-mapping": mapping}
725 for vca in vca_deployed_list:
726 if not vca["member-vnf-index"]:
727 continue
728 if not vca["vdu_id"]:
729 mapping[vca["member-vnf-index"]] = vca["application"]
730 else:
731 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
732 vca["application"]
733 return ns_config_info
734
735 @staticmethod
736 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
737 """
738 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
739 primitives as verify-ssh-credentials, or config when needed
740 :param desc_primitive_list: information of the descriptor
741 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
742 this element contains a ssh public key
743 :return: The modified list. Can ba an empty list, but always a list
744 """
745 if desc_primitive_list:
746 primitive_list = desc_primitive_list.copy()
747 else:
748 primitive_list = []
749 # look for primitive config, and get the position. None if not present
750 config_position = None
751 for index, primitive in enumerate(primitive_list):
752 if primitive["name"] == "config":
753 config_position = index
754 break
755
756 # for NS, add always a config primitive if not present (bug 874)
757 if not vca_deployed["member-vnf-index"] and config_position is None:
758 primitive_list.insert(0, {"name": "config", "parameter": []})
759 config_position = 0
760 # for VNF/VDU add verify-ssh-credentials after config
761 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
762 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
763 return primitive_list
764
765 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
766 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
767
768 db_nsr_update = {}
769 RO_descriptor_number = 0 # number of descriptors created at RO
770 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
771 start_deploy = time()
772 vdu_flag = False # If any of the VNFDs has VDUs
773 ns_params = db_nslcmop.get("operationParams")
774
775 # deploy RO
776
777 # get vnfds, instantiate at RO
778
779 for c_vnf in nsd.get("constituent-vnfd", ()):
780 member_vnf_index = c_vnf["member-vnf-index"]
781 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
782 if vnfd.get("vdu"):
783 vdu_flag = True
784 vnfd_ref = vnfd["id"]
785 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
786 " RO".format(vnfd_ref, member_vnf_index)
787 # self.logger.debug(logging_text + step)
788 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
789 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
790 RO_descriptor_number += 1
791
792 # look position at deployed.RO.vnfd if not present it will be appended at the end
793 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
794 if vnf_deployed["member-vnf-index"] == member_vnf_index:
795 break
796 else:
797 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
798 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
799
800 # look if present
801 RO_update = {"member-vnf-index": member_vnf_index}
802 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
803 if vnfd_list:
804 RO_update["id"] = vnfd_list[0]["uuid"]
805 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
806 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
807 else:
808 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
809 get("additionalParamsForVnf"), nsr_id)
810 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
811 RO_update["id"] = desc["uuid"]
812 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
813 vnfd_ref, member_vnf_index, desc["uuid"]))
814 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
815 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
816 self.update_db_2("nsrs", nsr_id, db_nsr_update)
817
818 # create nsd at RO
819 nsd_ref = nsd["id"]
820 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
821 # self.logger.debug(logging_text + step)
822
823 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
824 RO_descriptor_number += 1
825 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
826 if nsd_list:
827 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
828 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
829 nsd_ref, RO_nsd_uuid))
830 else:
831 nsd_RO = deepcopy(nsd)
832 nsd_RO["id"] = RO_osm_nsd_id
833 nsd_RO.pop("_id", None)
834 nsd_RO.pop("_admin", None)
835 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
836 member_vnf_index = c_vnf["member-vnf-index"]
837 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
838 for c_vld in nsd_RO.get("vld", ()):
839 for cp in c_vld.get("vnfd-connection-point-ref", ()):
840 member_vnf_index = cp["member-vnf-index-ref"]
841 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
842
843 desc = await self.RO.create("nsd", descriptor=nsd_RO)
844 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
845 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
846 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
847 self.update_db_2("nsrs", nsr_id, db_nsr_update)
848
849 # Crate ns at RO
850 # if present use it unless in error status
851 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
852 if RO_nsr_id:
853 try:
854 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
855 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
856 desc = await self.RO.show("ns", RO_nsr_id)
857
858 except ROclient.ROClientException as e:
859 if e.http_code != HTTPStatus.NOT_FOUND:
860 raise
861 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
862 if RO_nsr_id:
863 ns_status, ns_status_info = self.RO.check_ns_status(desc)
864 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
865 if ns_status == "ERROR":
866 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
867 .format(RO_nsr_id)
868 self.logger.debug(logging_text + step)
869 await self.RO.delete("ns", RO_nsr_id)
870 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
871 if not RO_nsr_id:
872 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
873 # self.logger.debug(logging_text + step)
874
875 # check if VIM is creating and wait look if previous tasks in process
876 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
877 if task_dependency:
878 step = "Waiting for related tasks to be completed: {}".format(task_name)
879 self.logger.debug(logging_text + step)
880 await asyncio.wait(task_dependency, timeout=3600)
881 if ns_params.get("vnf"):
882 for vnf in ns_params["vnf"]:
883 if "vimAccountId" in vnf:
884 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
885 vnf["vimAccountId"])
886 if task_dependency:
887 step = "Waiting for related tasks to be completed: {}".format(task_name)
888 self.logger.debug(logging_text + step)
889 await asyncio.wait(task_dependency, timeout=3600)
890
891 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
892
893 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
894
895 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
896 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
897 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
898 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
899 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
900 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
901 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
902 self.update_db_2("nsrs", nsr_id, db_nsr_update)
903
904 # wait until NS is ready
905 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
906 detailed_status_old = None
907 self.logger.debug(logging_text + step)
908
909 old_desc = None
910 while time() <= start_deploy + self.total_deploy_timeout:
911 desc = await self.RO.show("ns", RO_nsr_id)
912
913 # deploymentStatus
914 if desc != old_desc:
915 # desc has changed => update db
916 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
917 old_desc = desc
918
919 ns_status, ns_status_info = self.RO.check_ns_status(desc)
920 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
921 if ns_status == "ERROR":
922 raise ROclient.ROClientException(ns_status_info)
923 elif ns_status == "BUILD":
924 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
925 elif ns_status == "ACTIVE":
926 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
927 try:
928 if vdu_flag:
929 self.ns_update_vnfr(db_vnfrs, desc)
930 break
931 except LcmExceptionNoMgmtIP:
932 pass
933 else:
934 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
935 if detailed_status != detailed_status_old:
936 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
937 self.update_db_2("nsrs", nsr_id, db_nsr_update)
938 await asyncio.sleep(5, loop=self.loop)
939 else: # total_deploy_timeout
940 raise ROclient.ROClientException("Timeout waiting ns to be ready")
941
942 step = "Updating NSR"
943 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
944
945 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
946 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
947 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
948 self.update_db_2("nsrs", nsr_id, db_nsr_update)
949 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
950
951 step = "Deployed at VIM"
952 self.logger.debug(logging_text + step)
953
954 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
955 """
956 Wait for ip addres at RO, and optionally, insert public key in virtual machine
957 :param logging_text: prefix use for logging
958 :param nsr_id:
959 :param vnfr_id:
960 :param vdu_id:
961 :param vdu_index:
962 :param pub_key: public ssh key to inject, None to skip
963 :param user: user to apply the public ssh key
964 :return: IP address
965 """
966
967 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
968 ro_nsr_id = None
969 ip_address = None
970 nb_tries = 0
971 target_vdu_id = None
972 ro_retries = 0
973
974 while True:
975
976 ro_retries += 1
977 if ro_retries >= 360: # 1 hour
978 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
979
980 await asyncio.sleep(10, loop=self.loop)
981 # wait until NS is deployed at RO
982 if not ro_nsr_id:
983 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
984 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
985 if not ro_nsr_id:
986 continue
987
988 # get ip address
989 if not target_vdu_id:
990 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
991
992 if not vdu_id: # for the VNF case
993 ip_address = db_vnfr.get("ip-address")
994 if not ip_address:
995 continue
996 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
997 else: # VDU case
998 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
999 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1000
1001 if not vdur:
1002 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
1003 vnfr_id, vdu_id, vdu_index
1004 ))
1005
1006 if vdur.get("status") == "ACTIVE":
1007 ip_address = vdur.get("ip-address")
1008 if not ip_address:
1009 continue
1010 target_vdu_id = vdur["vdu-id-ref"]
1011 elif vdur.get("status") == "ERROR":
1012 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1013
1014 if not target_vdu_id:
1015 continue
1016
1017 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
1018
1019 # inject public key into machine
1020 if pub_key and user:
1021 # self.logger.debug(logging_text + "Inserting RO key")
1022 try:
1023 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1024 result_dict = await self.RO.create_action(
1025 item="ns",
1026 item_id_name=ro_nsr_id,
1027 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1028 )
1029 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1030 if not result_dict or not isinstance(result_dict, dict):
1031 raise LcmException("Unknown response from RO when injecting key")
1032 for result in result_dict.values():
1033 if result.get("vim_result") == 200:
1034 break
1035 else:
1036 raise ROclient.ROClientException("error injecting key: {}".format(
1037 result.get("description")))
1038 break
1039 except ROclient.ROClientException as e:
1040 if not nb_tries:
1041 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1042 format(e, 20*10))
1043 nb_tries += 1
1044 if nb_tries >= 20:
1045 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1046 else:
1047 break
1048
1049 return ip_address
1050
1051 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1052 """
1053 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1054 """
1055 my_vca = vca_deployed_list[vca_index]
1056 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1057 # vdu or kdu: no dependencies
1058 return
1059 timeout = 300
1060 while timeout >= 0:
1061 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1062 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1063 configuration_status_list = db_nsr["configurationStatus"]
1064 for index, vca_deployed in enumerate(configuration_status_list):
1065 if index == vca_index:
1066 # myself
1067 continue
1068 if not my_vca.get("member-vnf-index") or \
1069 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1070 internal_status = configuration_status_list[index].get("status")
1071 if internal_status == 'READY':
1072 continue
1073 elif internal_status == 'BROKEN':
1074 raise LcmException("Configuration aborted because dependent charm/s has failed")
1075 else:
1076 break
1077 else:
1078 # no dependencies, return
1079 return
1080 await asyncio.sleep(10)
1081 timeout -= 1
1082
1083 raise LcmException("Configuration aborted because dependent charm/s timeout")
1084
1085 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1086 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1087 nsr_id = db_nsr["_id"]
1088 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1089 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1090 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1091 db_dict = {
1092 'collection': 'nsrs',
1093 'filter': {'_id': nsr_id},
1094 'path': db_update_entry
1095 }
1096 step = ""
1097 try:
1098
1099 element_type = 'NS'
1100 element_under_configuration = nsr_id
1101
1102 vnfr_id = None
1103 if db_vnfr:
1104 vnfr_id = db_vnfr["_id"]
1105
1106 namespace = "{nsi}.{ns}".format(
1107 nsi=nsi_id if nsi_id else "",
1108 ns=nsr_id)
1109
1110 if vnfr_id:
1111 element_type = 'VNF'
1112 element_under_configuration = vnfr_id
1113 namespace += "." + vnfr_id
1114 if vdu_id:
1115 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1116 element_type = 'VDU'
1117 element_under_configuration = vdu_id + '-' + vdu_index
1118
1119 # Get artifact path
1120 artifact_path = "{}/{}/charms/{}".format(
1121 base_folder["folder"],
1122 base_folder["pkg-dir"],
1123 config_descriptor["juju"]["charm"]
1124 )
1125
1126 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1127 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1128 is_proxy_charm = False
1129
1130 # n2vc_redesign STEP 3.1
1131
1132 # find old ee_id if exists
1133 ee_id = vca_deployed.get("ee_id")
1134
1135 # create or register execution environment in VCA
1136 if is_proxy_charm:
1137
1138 await self._write_configuration_status(
1139 nsr_id=nsr_id,
1140 vca_index=vca_index,
1141 status='CREATING',
1142 element_under_configuration=element_under_configuration,
1143 element_type=element_type
1144 )
1145
1146 step = "create execution environment"
1147 self.logger.debug(logging_text + step)
1148 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1149 reuse_ee_id=ee_id,
1150 db_dict=db_dict)
1151
1152 else:
1153 step = "Waiting to VM being up and getting IP address"
1154 self.logger.debug(logging_text + step)
1155 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1156 user=None, pub_key=None)
1157 credentials = {"hostname": rw_mgmt_ip}
1158 # get username
1159 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1160 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1161 # merged. Meanwhile let's get username from initial-config-primitive
1162 if not username and config_descriptor.get("initial-config-primitive"):
1163 for config_primitive in config_descriptor["initial-config-primitive"]:
1164 for param in config_primitive.get("parameter", ()):
1165 if param["name"] == "ssh-username":
1166 username = param["value"]
1167 break
1168 if not username:
1169 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1170 "'config-access.ssh-access.default-user'")
1171 credentials["username"] = username
1172 # n2vc_redesign STEP 3.2
1173
1174 await self._write_configuration_status(
1175 nsr_id=nsr_id,
1176 vca_index=vca_index,
1177 status='REGISTERING',
1178 element_under_configuration=element_under_configuration,
1179 element_type=element_type
1180 )
1181
1182 step = "register execution environment {}".format(credentials)
1183 self.logger.debug(logging_text + step)
1184 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1185 db_dict=db_dict)
1186
1187 # for compatibility with MON/POL modules, the need model and application name at database
1188 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1189 ee_id_parts = ee_id.split('.')
1190 model_name = ee_id_parts[0]
1191 application_name = ee_id_parts[1]
1192 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1193 db_update_entry + "application": application_name,
1194 db_update_entry + "ee_id": ee_id})
1195
1196 # n2vc_redesign STEP 3.3
1197
1198 step = "Install configuration Software"
1199
1200 await self._write_configuration_status(
1201 nsr_id=nsr_id,
1202 vca_index=vca_index,
1203 status='INSTALLING SW',
1204 element_under_configuration=element_under_configuration,
1205 element_type=element_type
1206 )
1207
1208 # TODO check if already done
1209 self.logger.debug(logging_text + step)
1210 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1211
1212 # if SSH access is required, then get execution environment SSH public
1213 if is_proxy_charm: # if native charm we have waited already to VM be UP
1214 pub_key = None
1215 user = None
1216 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1217 # Needed to inject a ssh key
1218 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1219 step = "Install configuration Software, getting public ssh key"
1220 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1221
1222 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1223 else:
1224 step = "Waiting to VM being up and getting IP address"
1225 self.logger.debug(logging_text + step)
1226
1227 # n2vc_redesign STEP 5.1
1228 # wait for RO (ip-address) Insert pub_key into VM
1229 if vnfr_id:
1230 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1231 user=user, pub_key=pub_key)
1232 else:
1233 rw_mgmt_ip = None # This is for a NS configuration
1234
1235 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1236
1237 # store rw_mgmt_ip in deploy params for later replacement
1238 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1239
1240 # n2vc_redesign STEP 6 Execute initial config primitive
1241 step = 'execute initial config primitive'
1242 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1243
1244 # sort initial config primitives by 'seq'
1245 try:
1246 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1247 except Exception as e:
1248 self.logger.error(logging_text + step + ": " + str(e))
1249
1250 # add config if not present for NS charm
1251 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1252 vca_deployed)
1253
1254 # wait for dependent primitives execution (NS -> VNF -> VDU)
1255 if initial_config_primitive_list:
1256 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1257
1258 # stage, in function of element type: vdu, kdu, vnf or ns
1259 my_vca = vca_deployed_list[vca_index]
1260 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1261 # VDU or KDU
1262 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1263 elif my_vca.get("member-vnf-index"):
1264 # VNF
1265 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1266 else:
1267 # NS
1268 stage = 'Stage 5/5: running Day-1 primitives for NS'
1269
1270 await self._write_configuration_status(
1271 nsr_id=nsr_id,
1272 vca_index=vca_index,
1273 status='EXECUTING PRIMITIVE'
1274 )
1275
1276 self._write_op_status(
1277 op_id=nslcmop_id,
1278 stage=stage
1279 )
1280
1281 for initial_config_primitive in initial_config_primitive_list:
1282 # adding information on the vca_deployed if it is a NS execution environment
1283 if not vca_deployed["member-vnf-index"]:
1284 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1285 # TODO check if already done
1286 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1287
1288 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1289 self.logger.debug(logging_text + step)
1290 await self.n2vc.exec_primitive(
1291 ee_id=ee_id,
1292 primitive_name=initial_config_primitive["name"],
1293 params_dict=primitive_params_,
1294 db_dict=db_dict
1295 )
1296
1297 # TODO register in database that primitive is done
1298
1299 step = "instantiated at VCA"
1300 self.logger.debug(logging_text + step)
1301
1302 await self._write_configuration_status(
1303 nsr_id=nsr_id,
1304 vca_index=vca_index,
1305 status='READY'
1306 )
1307
1308 except Exception as e: # TODO not use Exception but N2VC exception
1309 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1310 await self._write_configuration_status(
1311 nsr_id=nsr_id,
1312 vca_index=vca_index,
1313 status='BROKEN'
1314 )
1315 raise Exception("{} {}".format(step, e)) from e
1316 # TODO raise N2VC exception with 'step' extra information
1317
1318 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1319 error_description: str = None):
1320 try:
1321 db_dict = dict()
1322 if ns_state:
1323 db_dict["nsState"] = ns_state
1324 db_dict["currentOperation"] = current_operation
1325 db_dict["currentOperationID"] = current_operation_id
1326 db_dict["errorDescription"] = error_description
1327 self.update_db_2("nsrs", nsr_id, db_dict)
1328 except Exception as e:
1329 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1330
1331 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1332 try:
1333 db_dict = dict()
1334 db_dict['queuePosition'] = queuePosition
1335 db_dict['stage'] = stage
1336 if error_message:
1337 db_dict['errorMessage'] = error_message
1338 self.update_db_2("nslcmops", op_id, db_dict)
1339 except Exception as e:
1340 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1341
1342 def _write_all_config_status(self, nsr_id: str, status: str):
1343 try:
1344 # nsrs record
1345 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1346 # configurationStatus
1347 config_status = db_nsr.get('configurationStatus')
1348 if config_status:
1349 # update status
1350 db_dict = dict()
1351 db_dict['configurationStatus'] = list()
1352 for c in config_status:
1353 c['status'] = status
1354 db_dict['configurationStatus'].append(c)
1355 self.update_db_2("nsrs", nsr_id, db_dict)
1356
1357 except Exception as e:
1358 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1359
1360 async def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str,
1361 element_under_configuration: str = None, element_type: str = None):
1362
1363 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1364 # .format(vca_index, status))
1365
1366 try:
1367 db_path = 'configurationStatus.{}.'.format(vca_index)
1368 db_dict = dict()
1369 db_dict[db_path + 'status'] = status
1370 if element_under_configuration:
1371 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1372 if element_type:
1373 db_dict[db_path + 'elementType'] = element_type
1374 self.update_db_2("nsrs", nsr_id, db_dict)
1375 except Exception as e:
1376 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1377 .format(status, nsr_id, vca_index, e))
1378
1379 async def instantiate(self, nsr_id, nslcmop_id):
1380 """
1381
1382 :param nsr_id: ns instance to deploy
1383 :param nslcmop_id: operation to run
1384 :return:
1385 """
1386
1387 # Try to lock HA task here
1388 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1389 if not task_is_locked_by_me:
1390 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1391 return
1392
1393 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1394 self.logger.debug(logging_text + "Enter")
1395
1396 # get all needed from database
1397
1398 # database nsrs record
1399 db_nsr = None
1400
1401 # database nslcmops record
1402 db_nslcmop = None
1403
1404 # update operation on nsrs
1405 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1406 "_admin.current-operation": nslcmop_id,
1407 "_admin.operation-type": "instantiate"}
1408 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1409
1410 # update operation on nslcmops
1411 db_nslcmop_update = {}
1412
1413 nslcmop_operation_state = None
1414 db_vnfrs = {} # vnf's info indexed by member-index
1415 # n2vc_info = {}
1416 task_instantiation_list = []
1417 task_instantiation_info = {} # from task to info text
1418 exc = None
1419 try:
1420 # wait for any previous tasks in process
1421 step = "Waiting for previous operations to terminate"
1422 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1423
1424 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1425
1426 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1427 self._write_ns_status(
1428 nsr_id=nsr_id,
1429 ns_state="BUILDING",
1430 current_operation="INSTANTIATING",
1431 current_operation_id=nslcmop_id
1432 )
1433
1434 # read from db: operation
1435 step = "Getting nslcmop={} from db".format(nslcmop_id)
1436 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1437
1438 # read from db: ns
1439 step = "Getting nsr={} from db".format(nsr_id)
1440 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1441 # nsd is replicated into ns (no db read)
1442 nsd = db_nsr["nsd"]
1443 # nsr_name = db_nsr["name"] # TODO short-name??
1444
1445 # read from db: vnf's of this ns
1446 step = "Getting vnfrs from db"
1447 self.logger.debug(logging_text + step)
1448 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1449
1450 # read from db: vnfd's for every vnf
1451 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1452 db_vnfds = {} # every vnfd data indexed by vnf id
1453 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1454
1455 self._write_op_status(
1456 op_id=nslcmop_id,
1457 stage='Stage 1/5: preparation of the environment',
1458 queuePosition=0
1459 )
1460
1461 # for each vnf in ns, read vnfd
1462 for vnfr in db_vnfrs_list:
1463 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1464 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1465 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1466 # if we haven't this vnfd, read it from db
1467 if vnfd_id not in db_vnfds:
1468 # read from cb
1469 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1470 self.logger.debug(logging_text + step)
1471 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1472
1473 # store vnfd
1474 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1475 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1476 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1477
1478 # Get or generates the _admin.deployed.VCA list
1479 vca_deployed_list = None
1480 if db_nsr["_admin"].get("deployed"):
1481 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1482 if vca_deployed_list is None:
1483 vca_deployed_list = []
1484 configuration_status_list = []
1485 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1486 db_nsr_update["configurationStatus"] = configuration_status_list
1487 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1488 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1489 elif isinstance(vca_deployed_list, dict):
1490 # maintain backward compatibility. Change a dict to list at database
1491 vca_deployed_list = list(vca_deployed_list.values())
1492 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1493 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1494
1495 db_nsr_update["detailed-status"] = "creating"
1496 db_nsr_update["operational-status"] = "init"
1497
1498 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1499 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1500 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1501
1502 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1503 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1504 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1505
1506 # n2vc_redesign STEP 2 Deploy Network Scenario
1507
1508 self._write_op_status(
1509 op_id=nslcmop_id,
1510 stage='Stage 2/5: deployment of VMs and execution environments'
1511 )
1512
1513 self.logger.debug(logging_text + "Before deploy_kdus")
1514 # Call to deploy_kdus in case exists the "vdu:kdu" param
1515 task_kdu = asyncio.ensure_future(
1516 self.deploy_kdus(
1517 logging_text=logging_text,
1518 nsr_id=nsr_id,
1519 db_nsr=db_nsr,
1520 db_vnfrs=db_vnfrs,
1521 )
1522 )
1523 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1524 task_instantiation_info[task_kdu] = "Deploy KDUs"
1525 task_instantiation_list.append(task_kdu)
1526 # n2vc_redesign STEP 1 Get VCA public ssh-key
1527 # feature 1429. Add n2vc public key to needed VMs
1528 n2vc_key = self.n2vc.get_public_key()
1529 n2vc_key_list = [n2vc_key]
1530 if self.vca_config.get("public_key"):
1531 n2vc_key_list.append(self.vca_config["public_key"])
1532
1533 task_ro = asyncio.ensure_future(
1534 self.instantiate_RO(
1535 logging_text=logging_text,
1536 nsr_id=nsr_id,
1537 nsd=nsd,
1538 db_nsr=db_nsr,
1539 db_nslcmop=db_nslcmop,
1540 db_vnfrs=db_vnfrs,
1541 db_vnfds_ref=db_vnfds_ref,
1542 n2vc_key_list=n2vc_key_list
1543 )
1544 )
1545 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1546 task_instantiation_info[task_ro] = "Deploy at VIM"
1547 task_instantiation_list.append(task_ro)
1548
1549 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1550 step = "Looking for needed vnfd to configure with proxy charm"
1551 self.logger.debug(logging_text + step)
1552
1553 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1554 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1555 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1556 vnfd_id = c_vnf["vnfd-id-ref"]
1557 vnfd = db_vnfds_ref[vnfd_id]
1558 member_vnf_index = str(c_vnf["member-vnf-index"])
1559 db_vnfr = db_vnfrs[member_vnf_index]
1560 base_folder = vnfd["_admin"]["storage"]
1561 vdu_id = None
1562 vdu_index = 0
1563 vdu_name = None
1564 kdu_name = None
1565
1566 # Get additional parameters
1567 deploy_params = {}
1568 if db_vnfr.get("additionalParamsForVnf"):
1569 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1570
1571 descriptor_config = vnfd.get("vnf-configuration")
1572 if descriptor_config and descriptor_config.get("juju"):
1573 self._deploy_n2vc(
1574 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1575 db_nsr=db_nsr,
1576 db_vnfr=db_vnfr,
1577 nslcmop_id=nslcmop_id,
1578 nsr_id=nsr_id,
1579 nsi_id=nsi_id,
1580 vnfd_id=vnfd_id,
1581 vdu_id=vdu_id,
1582 kdu_name=kdu_name,
1583 member_vnf_index=member_vnf_index,
1584 vdu_index=vdu_index,
1585 vdu_name=vdu_name,
1586 deploy_params=deploy_params,
1587 descriptor_config=descriptor_config,
1588 base_folder=base_folder,
1589 task_instantiation_list=task_instantiation_list,
1590 task_instantiation_info=task_instantiation_info
1591 )
1592
1593 # Deploy charms for each VDU that supports one.
1594 for vdud in get_iterable(vnfd, 'vdu'):
1595 vdu_id = vdud["id"]
1596 descriptor_config = vdud.get('vdu-configuration')
1597 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1598 if vdur.get("additionalParams"):
1599 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1600 else:
1601 deploy_params_vdu = deploy_params
1602 if descriptor_config and descriptor_config.get("juju"):
1603 # look for vdu index in the db_vnfr["vdu"] section
1604 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1605 # if vdur["vdu-id-ref"] == vdu_id:
1606 # break
1607 # else:
1608 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1609 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1610 # vdu_name = vdur.get("name")
1611 vdu_name = None
1612 kdu_name = None
1613 for vdu_index in range(int(vdud.get("count", 1))):
1614 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1615 self._deploy_n2vc(
1616 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1617 member_vnf_index, vdu_id, vdu_index),
1618 db_nsr=db_nsr,
1619 db_vnfr=db_vnfr,
1620 nslcmop_id=nslcmop_id,
1621 nsr_id=nsr_id,
1622 nsi_id=nsi_id,
1623 vnfd_id=vnfd_id,
1624 vdu_id=vdu_id,
1625 kdu_name=kdu_name,
1626 member_vnf_index=member_vnf_index,
1627 vdu_index=vdu_index,
1628 vdu_name=vdu_name,
1629 deploy_params=deploy_params_vdu,
1630 descriptor_config=descriptor_config,
1631 base_folder=base_folder,
1632 task_instantiation_list=task_instantiation_list,
1633 task_instantiation_info=task_instantiation_info
1634 )
1635 for kdud in get_iterable(vnfd, 'kdu'):
1636 kdu_name = kdud["name"]
1637 descriptor_config = kdud.get('kdu-configuration')
1638 if descriptor_config and descriptor_config.get("juju"):
1639 vdu_id = None
1640 vdu_index = 0
1641 vdu_name = None
1642 # look for vdu index in the db_vnfr["vdu"] section
1643 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1644 # if vdur["vdu-id-ref"] == vdu_id:
1645 # break
1646 # else:
1647 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1648 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1649 # vdu_name = vdur.get("name")
1650 # vdu_name = None
1651
1652 self._deploy_n2vc(
1653 logging_text=logging_text,
1654 db_nsr=db_nsr,
1655 db_vnfr=db_vnfr,
1656 nslcmop_id=nslcmop_id,
1657 nsr_id=nsr_id,
1658 nsi_id=nsi_id,
1659 vnfd_id=vnfd_id,
1660 vdu_id=vdu_id,
1661 kdu_name=kdu_name,
1662 member_vnf_index=member_vnf_index,
1663 vdu_index=vdu_index,
1664 vdu_name=vdu_name,
1665 deploy_params=deploy_params,
1666 descriptor_config=descriptor_config,
1667 base_folder=base_folder,
1668 task_instantiation_list=task_instantiation_list,
1669 task_instantiation_info=task_instantiation_info
1670 )
1671
1672 # Check if this NS has a charm configuration
1673 descriptor_config = nsd.get("ns-configuration")
1674 if descriptor_config and descriptor_config.get("juju"):
1675 vnfd_id = None
1676 db_vnfr = None
1677 member_vnf_index = None
1678 vdu_id = None
1679 kdu_name = None
1680 vdu_index = 0
1681 vdu_name = None
1682
1683 # Get additional parameters
1684 deploy_params = {}
1685 if db_nsr.get("additionalParamsForNs"):
1686 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1687 base_folder = nsd["_admin"]["storage"]
1688 self._deploy_n2vc(
1689 logging_text=logging_text,
1690 db_nsr=db_nsr,
1691 db_vnfr=db_vnfr,
1692 nslcmop_id=nslcmop_id,
1693 nsr_id=nsr_id,
1694 nsi_id=nsi_id,
1695 vnfd_id=vnfd_id,
1696 vdu_id=vdu_id,
1697 kdu_name=kdu_name,
1698 member_vnf_index=member_vnf_index,
1699 vdu_index=vdu_index,
1700 vdu_name=vdu_name,
1701 deploy_params=deploy_params,
1702 descriptor_config=descriptor_config,
1703 base_folder=base_folder,
1704 task_instantiation_list=task_instantiation_list,
1705 task_instantiation_info=task_instantiation_info
1706 )
1707
1708 # Wait until all tasks of "task_instantiation_list" have been finished
1709
1710 # while time() <= start_deploy + self.total_deploy_timeout:
1711 error_text_list = []
1712 timeout = 3600
1713
1714 # let's begin with all OK
1715 instantiated_ok = True
1716 # let's begin with RO 'running' status (later we can change it)
1717 db_nsr_update["operational-status"] = "running"
1718 # let's begin with VCA 'configured' status (later we can change it)
1719 db_nsr_update["config-status"] = "configured"
1720
1721 if task_instantiation_list:
1722 # wait for all tasks completion
1723 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1724
1725 for task in pending:
1726 instantiated_ok = False
1727 if task == task_ro:
1728 # RO task is pending
1729 db_nsr_update["operational-status"] = "failed"
1730 elif task == task_kdu:
1731 # KDU task is pending
1732 db_nsr_update["operational-status"] = "failed"
1733 else:
1734 # A N2VC task is pending
1735 db_nsr_update["config-status"] = "failed"
1736 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1737 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1738 for task in done:
1739 if task.cancelled():
1740 instantiated_ok = False
1741 if task == task_ro:
1742 # RO task was cancelled
1743 db_nsr_update["operational-status"] = "failed"
1744 elif task == task_kdu:
1745 # KDU task was cancelled
1746 db_nsr_update["operational-status"] = "failed"
1747 else:
1748 # A N2VC was cancelled
1749 db_nsr_update["config-status"] = "failed"
1750 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1751 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1752 else:
1753 exc = task.exception()
1754 if exc:
1755 instantiated_ok = False
1756 if task == task_ro:
1757 # RO task raised an exception
1758 db_nsr_update["operational-status"] = "failed"
1759 elif task == task_kdu:
1760 # KDU task raised an exception
1761 db_nsr_update["operational-status"] = "failed"
1762 else:
1763 # A N2VC task raised an exception
1764 db_nsr_update["config-status"] = "failed"
1765 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1766
1767 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1768 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1769 else:
1770 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1771 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1772 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1773 else:
1774 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1775
1776 if error_text_list:
1777 error_text = "\n".join(error_text_list)
1778 db_nsr_update["detailed-status"] = error_text
1779 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1780 db_nslcmop_update["detailed-status"] = error_text
1781 db_nslcmop_update["statusEnteredTime"] = time()
1782 else:
1783 # all is done
1784 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1785 db_nslcmop_update["statusEnteredTime"] = time()
1786 db_nslcmop_update["detailed-status"] = "done"
1787 db_nsr_update["detailed-status"] = "done"
1788
1789 except (ROclient.ROClientException, DbException, LcmException) as e:
1790 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1791 exc = e
1792 except asyncio.CancelledError:
1793 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1794 exc = "Operation was cancelled"
1795 except Exception as e:
1796 exc = traceback.format_exc()
1797 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1798 exc_info=True)
1799 finally:
1800 if exc:
1801 if db_nsr:
1802 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1803 db_nsr_update["operational-status"] = "failed"
1804 db_nsr_update["config-status"] = "failed"
1805 if db_nslcmop:
1806 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1807 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1808 db_nslcmop_update["statusEnteredTime"] = time()
1809 try:
1810 if db_nsr:
1811 db_nsr_update["_admin.nslcmop"] = None
1812 db_nsr_update["_admin.current-operation"] = None
1813 db_nsr_update["_admin.operation-type"] = None
1814 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1815
1816 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1817 ns_state = None
1818 error_description = None
1819 if instantiated_ok:
1820 ns_state = "READY"
1821 else:
1822 ns_state = "BROKEN"
1823 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1824
1825 self._write_ns_status(
1826 nsr_id=nsr_id,
1827 ns_state=ns_state,
1828 current_operation="IDLE",
1829 current_operation_id=None,
1830 error_description=error_description
1831 )
1832
1833 self._write_op_status(
1834 op_id=nslcmop_id,
1835 error_message=error_description
1836 )
1837
1838 if db_nslcmop_update:
1839 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1840
1841 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1842
1843 except DbException as e:
1844 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1845
1846 if nslcmop_operation_state:
1847 try:
1848 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1849 "operationState": nslcmop_operation_state},
1850 loop=self.loop)
1851 except Exception as e:
1852 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1853
1854 self.logger.debug(logging_text + "Exit")
1855 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1856
1857 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1858 # Launch kdus if present in the descriptor
1859
1860 deployed_ok = True
1861
1862 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1863
1864 def _get_cluster_id(cluster_id, cluster_type):
1865 nonlocal k8scluster_id_2_uuic
1866 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1867 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1868
1869 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1870 if not db_k8scluster:
1871 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1872 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1873 if not k8s_id:
1874 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1875 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1876 return k8s_id
1877
1878 logging_text += "Deploy kdus: "
1879 try:
1880 db_nsr_update = {"_admin.deployed.K8s": []}
1881 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1882
1883 # Look for all vnfds
1884 pending_tasks = {}
1885 index = 0
1886 for vnfr_data in db_vnfrs.values():
1887 for kdur in get_iterable(vnfr_data, "kdur"):
1888 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1889 kdumodel = None
1890 k8sclustertype = None
1891 error_text = None
1892 cluster_uuid = None
1893 if kdur.get("helm-chart"):
1894 kdumodel = kdur["helm-chart"]
1895 k8sclustertype = "chart"
1896 k8sclustertype_full = "helm-chart"
1897 elif kdur.get("juju-bundle"):
1898 kdumodel = kdur["juju-bundle"]
1899 k8sclustertype = "juju"
1900 k8sclustertype_full = "juju-bundle"
1901 else:
1902 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
1903 " running"
1904 try:
1905 if not error_text:
1906 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1907 except LcmException as e:
1908 error_text = str(e)
1909 deployed_ok = False
1910
1911 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1912
1913 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1914 "k8scluster-type": k8sclustertype,
1915 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1916 if error_text:
1917 k8s_instace_info["detailed-status"] = error_text
1918 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1919 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1920 if error_text:
1921 continue
1922
1923 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1924 "{}".format(index)}
1925 if k8sclustertype == "chart":
1926 task = asyncio.ensure_future(
1927 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1928 params=desc_params, db_dict=db_dict, timeout=3600)
1929 )
1930 else:
1931 task = asyncio.ensure_future(
1932 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1933 atomic=True, params=desc_params,
1934 db_dict=db_dict, timeout=600)
1935 )
1936
1937 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1938 index += 1
1939 if not pending_tasks:
1940 return
1941 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1942 pending_list = list(pending_tasks.keys())
1943 while pending_list:
1944 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1945 return_when=asyncio.FIRST_COMPLETED)
1946 if not done_list: # timeout
1947 for task in pending_list:
1948 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1949 deployed_ok = False
1950 break
1951 for task in done_list:
1952 exc = task.exception()
1953 if exc:
1954 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1955 deployed_ok = False
1956 else:
1957 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1958
1959 if not deployed_ok:
1960 raise LcmException('Cannot deploy KDUs')
1961
1962 except Exception as e:
1963 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1964 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1965 finally:
1966 if db_nsr_update:
1967 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1968
1969 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1970 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1971 base_folder, task_instantiation_list, task_instantiation_info):
1972 # launch instantiate_N2VC in a asyncio task and register task object
1973 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1974 # if not found, create one entry and update database
1975
1976 # fill db_nsr._admin.deployed.VCA.<index>
1977 vca_index = -1
1978 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1979 if not vca_deployed:
1980 continue
1981 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1982 vca_deployed.get("vdu_id") == vdu_id and \
1983 vca_deployed.get("kdu_name") == kdu_name and \
1984 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1985 break
1986 else:
1987 # not found, create one.
1988 vca_deployed = {
1989 "member-vnf-index": member_vnf_index,
1990 "vdu_id": vdu_id,
1991 "kdu_name": kdu_name,
1992 "vdu_count_index": vdu_index,
1993 "operational-status": "init", # TODO revise
1994 "detailed-status": "", # TODO revise
1995 "step": "initial-deploy", # TODO revise
1996 "vnfd_id": vnfd_id,
1997 "vdu_name": vdu_name,
1998 }
1999 vca_index += 1
2000
2001 # create VCA and configurationStatus in db
2002 db_dict = {
2003 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2004 "configurationStatus.{}".format(vca_index): dict()
2005 }
2006 self.update_db_2("nsrs", nsr_id, db_dict)
2007
2008 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2009
2010 # Launch task
2011 task_n2vc = asyncio.ensure_future(
2012 self.instantiate_N2VC(
2013 logging_text=logging_text,
2014 vca_index=vca_index,
2015 nsi_id=nsi_id,
2016 db_nsr=db_nsr,
2017 db_vnfr=db_vnfr,
2018 vdu_id=vdu_id,
2019 kdu_name=kdu_name,
2020 vdu_index=vdu_index,
2021 deploy_params=deploy_params,
2022 config_descriptor=descriptor_config,
2023 base_folder=base_folder,
2024 nslcmop_id=nslcmop_id
2025 )
2026 )
2027 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2028 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2029 task_instantiation_list.append(task_n2vc)
2030
2031 # Check if this VNFD has a configured terminate action
2032 def _has_terminate_config_primitive(self, vnfd):
2033 vnf_config = vnfd.get("vnf-configuration")
2034 if vnf_config and vnf_config.get("terminate-config-primitive"):
2035 return True
2036 else:
2037 return False
2038
2039 @staticmethod
2040 def _get_terminate_config_primitive_seq_list(vnfd):
2041 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2042 # No need to check for existing primitive twice, already done before
2043 vnf_config = vnfd.get("vnf-configuration")
2044 seq_list = vnf_config.get("terminate-config-primitive")
2045 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2046 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2047 return seq_list_sorted
2048
2049 @staticmethod
2050 def _create_nslcmop(nsr_id, operation, params):
2051 """
2052 Creates a ns-lcm-opp content to be stored at database.
2053 :param nsr_id: internal id of the instance
2054 :param operation: instantiate, terminate, scale, action, ...
2055 :param params: user parameters for the operation
2056 :return: dictionary following SOL005 format
2057 """
2058 # Raise exception if invalid arguments
2059 if not (nsr_id and operation and params):
2060 raise LcmException(
2061 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2062 now = time()
2063 _id = str(uuid4())
2064 nslcmop = {
2065 "id": _id,
2066 "_id": _id,
2067 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2068 "operationState": "PROCESSING",
2069 "statusEnteredTime": now,
2070 "nsInstanceId": nsr_id,
2071 "lcmOperationType": operation,
2072 "startTime": now,
2073 "isAutomaticInvocation": False,
2074 "operationParams": params,
2075 "isCancelPending": False,
2076 "links": {
2077 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2078 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2079 }
2080 }
2081 return nslcmop
2082
2083 def _format_additional_params(self, params):
2084 params = params or {}
2085 for key, value in params.items():
2086 if str(value).startswith("!!yaml "):
2087 params[key] = yaml.safe_load(value[7:])
2088 return params
2089
2090 def _get_terminate_primitive_params(self, seq, vnf_index):
2091 primitive = seq.get('name')
2092 primitive_params = {}
2093 params = {
2094 "member_vnf_index": vnf_index,
2095 "primitive": primitive,
2096 "primitive_params": primitive_params,
2097 }
2098 desc_params = {}
2099 return self._map_primitive_params(seq, params, desc_params)
2100
2101 # sub-operations
2102
2103 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2104 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2105 if (op.get('operationState') == 'COMPLETED'):
2106 # b. Skip sub-operation
2107 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2108 return self.SUBOPERATION_STATUS_SKIP
2109 else:
2110 # c. Reintent executing sub-operation
2111 # The sub-operation exists, and operationState != 'COMPLETED'
2112 # Update operationState = 'PROCESSING' to indicate a reintent.
2113 operationState = 'PROCESSING'
2114 detailed_status = 'In progress'
2115 self._update_suboperation_status(
2116 db_nslcmop, op_index, operationState, detailed_status)
2117 # Return the sub-operation index
2118 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2119 # with arguments extracted from the sub-operation
2120 return op_index
2121
2122 # Find a sub-operation where all keys in a matching dictionary must match
2123 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2124 def _find_suboperation(self, db_nslcmop, match):
2125 if (db_nslcmop and match):
2126 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2127 for i, op in enumerate(op_list):
2128 if all(op.get(k) == match[k] for k in match):
2129 return i
2130 return self.SUBOPERATION_STATUS_NOT_FOUND
2131
2132 # Update status for a sub-operation given its index
2133 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2134 # Update DB for HA tasks
2135 q_filter = {'_id': db_nslcmop['_id']}
2136 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2137 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2138 self.db.set_one("nslcmops",
2139 q_filter=q_filter,
2140 update_dict=update_dict,
2141 fail_on_empty=False)
2142
2143 # Add sub-operation, return the index of the added sub-operation
2144 # Optionally, set operationState, detailed-status, and operationType
2145 # Status and type are currently set for 'scale' sub-operations:
2146 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2147 # 'detailed-status' : status message
2148 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2149 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2150 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2151 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2152 RO_nsr_id=None, RO_scaling_info=None):
2153 if not (db_nslcmop):
2154 return self.SUBOPERATION_STATUS_NOT_FOUND
2155 # Get the "_admin.operations" list, if it exists
2156 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2157 op_list = db_nslcmop_admin.get('operations')
2158 # Create or append to the "_admin.operations" list
2159 new_op = {'member_vnf_index': vnf_index,
2160 'vdu_id': vdu_id,
2161 'vdu_count_index': vdu_count_index,
2162 'primitive': primitive,
2163 'primitive_params': mapped_primitive_params}
2164 if operationState:
2165 new_op['operationState'] = operationState
2166 if detailed_status:
2167 new_op['detailed-status'] = detailed_status
2168 if operationType:
2169 new_op['lcmOperationType'] = operationType
2170 if RO_nsr_id:
2171 new_op['RO_nsr_id'] = RO_nsr_id
2172 if RO_scaling_info:
2173 new_op['RO_scaling_info'] = RO_scaling_info
2174 if not op_list:
2175 # No existing operations, create key 'operations' with current operation as first list element
2176 db_nslcmop_admin.update({'operations': [new_op]})
2177 op_list = db_nslcmop_admin.get('operations')
2178 else:
2179 # Existing operations, append operation to list
2180 op_list.append(new_op)
2181
2182 db_nslcmop_update = {'_admin.operations': op_list}
2183 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2184 op_index = len(op_list) - 1
2185 return op_index
2186
2187 # Helper methods for scale() sub-operations
2188
2189 # pre-scale/post-scale:
2190 # Check for 3 different cases:
2191 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2192 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2193 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2194 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2195 operationType, RO_nsr_id=None, RO_scaling_info=None):
2196 # Find this sub-operation
2197 if (RO_nsr_id and RO_scaling_info):
2198 operationType = 'SCALE-RO'
2199 match = {
2200 'member_vnf_index': vnf_index,
2201 'RO_nsr_id': RO_nsr_id,
2202 'RO_scaling_info': RO_scaling_info,
2203 }
2204 else:
2205 match = {
2206 'member_vnf_index': vnf_index,
2207 'primitive': vnf_config_primitive,
2208 'primitive_params': primitive_params,
2209 'lcmOperationType': operationType
2210 }
2211 op_index = self._find_suboperation(db_nslcmop, match)
2212 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2213 # a. New sub-operation
2214 # The sub-operation does not exist, add it.
2215 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2216 # The following parameters are set to None for all kind of scaling:
2217 vdu_id = None
2218 vdu_count_index = None
2219 vdu_name = None
2220 if (RO_nsr_id and RO_scaling_info):
2221 vnf_config_primitive = None
2222 primitive_params = None
2223 else:
2224 RO_nsr_id = None
2225 RO_scaling_info = None
2226 # Initial status for sub-operation
2227 operationState = 'PROCESSING'
2228 detailed_status = 'In progress'
2229 # Add sub-operation for pre/post-scaling (zero or more operations)
2230 self._add_suboperation(db_nslcmop,
2231 vnf_index,
2232 vdu_id,
2233 vdu_count_index,
2234 vdu_name,
2235 vnf_config_primitive,
2236 primitive_params,
2237 operationState,
2238 detailed_status,
2239 operationType,
2240 RO_nsr_id,
2241 RO_scaling_info)
2242 return self.SUBOPERATION_STATUS_NEW
2243 else:
2244 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2245 # or op_index (operationState != 'COMPLETED')
2246 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2247
2248 # Function to return execution_environment id
2249
2250 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2251 for vca in vca_deployed_list:
2252 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2253 return vca["ee_id"]
2254
2255 # Helper methods for terminate()
2256
2257 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2258 """ Create a primitive with params from VNFD
2259 Called from terminate() before deleting instance
2260 Calls action() to execute the primitive """
2261 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2262 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2263 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2264 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2265 db_vnfds = {}
2266 # Loop over VNFRs
2267 for vnfr in db_vnfrs_list:
2268 vnfd_id = vnfr["vnfd-id"]
2269 vnf_index = vnfr["member-vnf-index-ref"]
2270 if vnfd_id not in db_vnfds:
2271 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2272 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2273 db_vnfds[vnfd_id] = vnfd
2274 vnfd = db_vnfds[vnfd_id]
2275 if not self._has_terminate_config_primitive(vnfd):
2276 continue
2277 # Get the primitive's sorted sequence list
2278 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2279 for seq in seq_list:
2280 # For each sequence in list, get primitive and call _ns_execute_primitive()
2281 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2282 vnf_index, seq.get("name"))
2283 self.logger.debug(logging_text + step)
2284 # Create the primitive for each sequence, i.e. "primitive": "touch"
2285 primitive = seq.get('name')
2286 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2287 # The following 3 parameters are currently set to None for 'terminate':
2288 # vdu_id, vdu_count_index, vdu_name
2289 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2290 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2291 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2292 # Add sub-operation
2293 self._add_suboperation(db_nslcmop,
2294 nslcmop_id,
2295 vnf_index,
2296 vdu_id,
2297 vdu_count_index,
2298 vdu_name,
2299 primitive,
2300 mapped_primitive_params)
2301 # Sub-operations: Call _ns_execute_primitive() instead of action()
2302 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2303 # nsr_deployed = db_nsr["_admin"]["deployed"]
2304
2305 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2306 # nsr_id, nslcmop_terminate_action_id)
2307 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2308 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2309 # if result not in result_ok:
2310 # raise LcmException(
2311 # "terminate_primitive_action for vnf_member_index={}",
2312 # " primitive={} fails with error {}".format(
2313 # vnf_index, seq.get("name"), result_detail))
2314
2315 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2316 try:
2317 await self.n2vc.exec_primitive(
2318 ee_id=ee_id,
2319 primitive_name=primitive,
2320 params_dict=mapped_primitive_params
2321 )
2322 except Exception as e:
2323 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2324 raise LcmException(
2325 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2326 .format(vnf_index, seq.get("name"), e),
2327 )
2328
2329 async def _delete_N2VC(self, nsr_id: str):
2330 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2331 namespace = "." + nsr_id
2332 await self.n2vc.delete_namespace(namespace=namespace)
2333 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2334
2335 async def terminate(self, nsr_id, nslcmop_id):
2336
2337 # Try to lock HA task here
2338 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2339 if not task_is_locked_by_me:
2340 return
2341
2342 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2343 self.logger.debug(logging_text + "Enter")
2344 db_nsr = None
2345 db_nslcmop = None
2346 exc = None
2347 failed_detail = [] # annotates all failed error messages
2348 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2349 "_admin.current-operation": nslcmop_id,
2350 "_admin.operation-type": "terminate"}
2351 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2352 db_nslcmop_update = {}
2353 nslcmop_operation_state = None
2354 autoremove = False # autoremove after terminated
2355 pending_tasks = []
2356 try:
2357 # wait for any previous tasks in process
2358 step = "Waiting for previous operations to terminate"
2359 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2360
2361 self._write_ns_status(
2362 nsr_id=nsr_id,
2363 ns_state="TERMINATING",
2364 current_operation="TERMINATING",
2365 current_operation_id=nslcmop_id
2366 )
2367 self._write_op_status(
2368 op_id=nslcmop_id,
2369 queuePosition=0
2370 )
2371
2372 step = "Getting nslcmop={} from db".format(nslcmop_id)
2373 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2374 step = "Getting nsr={} from db".format(nsr_id)
2375 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2376 # nsd = db_nsr["nsd"]
2377 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2378 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2379 return
2380 # #TODO check if VIM is creating and wait
2381 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2382 # Call internal terminate action
2383 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2384
2385 pending_tasks = []
2386
2387 db_nsr_update["operational-status"] = "terminating"
2388 db_nsr_update["config-status"] = "terminating"
2389
2390 # remove NS
2391 try:
2392 step = "delete execution environment"
2393 self.logger.debug(logging_text + step)
2394
2395 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2396 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2397
2398 pending_tasks.append(task_delete_ee)
2399 except Exception as e:
2400 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2401 self.logger.error(msg)
2402 failed_detail.append(msg)
2403
2404 try:
2405 # Delete from k8scluster
2406 step = "delete kdus"
2407 self.logger.debug(logging_text + step)
2408 # print(nsr_deployed)
2409 if nsr_deployed:
2410 for kdu in nsr_deployed.get("K8s", ()):
2411 kdu_instance = kdu.get("kdu-instance")
2412 if not kdu_instance:
2413 continue
2414 if kdu.get("k8scluster-type") == "chart":
2415 task_delete_kdu_instance = asyncio.ensure_future(
2416 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2417 kdu_instance=kdu_instance))
2418 elif kdu.get("k8scluster-type") == "juju":
2419 task_delete_kdu_instance = asyncio.ensure_future(
2420 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2421 kdu_instance=kdu_instance))
2422 else:
2423 self.error(logging_text + "Unknown k8s deployment type {}".
2424 format(kdu.get("k8scluster-type")))
2425 continue
2426 pending_tasks.append(task_delete_kdu_instance)
2427 except LcmException as e:
2428 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2429 self.logger.error(msg)
2430 failed_detail.append(msg)
2431
2432 # remove from RO
2433 RO_fail = False
2434
2435 # Delete ns
2436 RO_nsr_id = RO_delete_action = None
2437 if nsr_deployed and nsr_deployed.get("RO"):
2438 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2439 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2440 try:
2441 if RO_nsr_id:
2442 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2443 "Deleting ns from VIM"
2444 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2445 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2446 self.logger.debug(logging_text + step)
2447 desc = await self.RO.delete("ns", RO_nsr_id)
2448 RO_delete_action = desc["action_id"]
2449 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2450 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2451 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2452 if RO_delete_action:
2453 # wait until NS is deleted from VIM
2454 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2455 format(RO_nsr_id, RO_delete_action)
2456 detailed_status_old = None
2457 self.logger.debug(logging_text + step)
2458
2459 delete_timeout = 20 * 60 # 20 minutes
2460 while delete_timeout > 0:
2461 desc = await self.RO.show(
2462 "ns",
2463 item_id_name=RO_nsr_id,
2464 extra_item="action",
2465 extra_item_id=RO_delete_action)
2466
2467 # deploymentStatus
2468 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2469
2470 ns_status, ns_status_info = self.RO.check_action_status(desc)
2471 if ns_status == "ERROR":
2472 raise ROclient.ROClientException(ns_status_info)
2473 elif ns_status == "BUILD":
2474 detailed_status = step + "; {}".format(ns_status_info)
2475 elif ns_status == "ACTIVE":
2476 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2477 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2478 break
2479 else:
2480 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2481 if detailed_status != detailed_status_old:
2482 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2483 db_nsr_update["detailed-status"] = detailed_status
2484 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2485 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2486 await asyncio.sleep(5, loop=self.loop)
2487 delete_timeout -= 5
2488 else: # delete_timeout <= 0:
2489 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2490
2491 except ROclient.ROClientException as e:
2492 if e.http_code == 404: # not found
2493 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2494 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2495 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2496 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2497 elif e.http_code == 409: # conflict
2498 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2499 self.logger.debug(logging_text + failed_detail[-1])
2500 RO_fail = True
2501 else:
2502 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2503 self.logger.error(logging_text + failed_detail[-1])
2504 RO_fail = True
2505
2506 # Delete nsd
2507 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2508 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2509 try:
2510 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2511 "Deleting nsd from RO"
2512 await self.RO.delete("nsd", RO_nsd_id)
2513 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2514 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2515 except ROclient.ROClientException as e:
2516 if e.http_code == 404: # not found
2517 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2518 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2519 elif e.http_code == 409: # conflict
2520 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2521 self.logger.debug(logging_text + failed_detail[-1])
2522 RO_fail = True
2523 else:
2524 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2525 self.logger.error(logging_text + failed_detail[-1])
2526 RO_fail = True
2527
2528 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2529 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2530 if not vnf_deployed or not vnf_deployed["id"]:
2531 continue
2532 try:
2533 RO_vnfd_id = vnf_deployed["id"]
2534 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2535 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2536 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2537 await self.RO.delete("vnfd", RO_vnfd_id)
2538 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2539 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2540 except ROclient.ROClientException as e:
2541 if e.http_code == 404: # not found
2542 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2543 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2544 elif e.http_code == 409: # conflict
2545 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2546 self.logger.debug(logging_text + failed_detail[-1])
2547 else:
2548 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2549 self.logger.error(logging_text + failed_detail[-1])
2550
2551 if failed_detail:
2552 terminate_ok = False
2553 self.logger.error(logging_text + " ;".join(failed_detail))
2554 db_nsr_update["operational-status"] = "failed"
2555 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2556 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2557 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2558 db_nslcmop_update["statusEnteredTime"] = time()
2559 else:
2560 terminate_ok = True
2561 db_nsr_update["operational-status"] = "terminated"
2562 db_nsr_update["detailed-status"] = "Done"
2563 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2564 db_nslcmop_update["detailed-status"] = "Done"
2565 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2566 db_nslcmop_update["statusEnteredTime"] = time()
2567 if db_nslcmop["operationParams"].get("autoremove"):
2568 autoremove = True
2569
2570 except (ROclient.ROClientException, DbException, LcmException) as e:
2571 self.logger.error(logging_text + "Exit Exception {}".format(e))
2572 exc = e
2573 except asyncio.CancelledError:
2574 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2575 exc = "Operation was cancelled"
2576 except Exception as e:
2577 exc = traceback.format_exc()
2578 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2579 finally:
2580 if exc and db_nslcmop:
2581 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2582 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2583 db_nslcmop_update["statusEnteredTime"] = time()
2584 try:
2585 if db_nslcmop and db_nslcmop_update:
2586 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2587 if db_nsr:
2588 db_nsr_update["_admin.nslcmop"] = None
2589 db_nsr_update["_admin.current-operation"] = None
2590 db_nsr_update["_admin.operation-type"] = None
2591 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2592
2593 if terminate_ok:
2594 ns_state = "IDLE"
2595 error_description = None
2596 error_detail = None
2597 else:
2598 ns_state = "BROKEN"
2599 error_detail = "; ".join(failed_detail)
2600 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2601 .format(nslcmop_id, step, error_detail)
2602
2603 self._write_ns_status(
2604 nsr_id=nsr_id,
2605 ns_state=ns_state,
2606 current_operation="IDLE",
2607 current_operation_id=None,
2608 error_description=error_description
2609 )
2610
2611 self._write_op_status(
2612 op_id=nslcmop_id,
2613 error_message=error_description
2614 )
2615
2616 except DbException as e:
2617 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2618 if nslcmop_operation_state:
2619 try:
2620 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2621 "operationState": nslcmop_operation_state,
2622 "autoremove": autoremove},
2623 loop=self.loop)
2624 except Exception as e:
2625 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2626
2627 # wait for pending tasks
2628 done = None
2629 pending = None
2630 if pending_tasks:
2631 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2632 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2633 if not pending:
2634 self.logger.debug(logging_text + 'All tasks finished...')
2635 else:
2636 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2637
2638 self.logger.debug(logging_text + "Exit")
2639 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2640
2641 @staticmethod
2642 def _map_primitive_params(primitive_desc, params, instantiation_params):
2643 """
2644 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2645 The default-value is used. If it is between < > it look for a value at instantiation_params
2646 :param primitive_desc: portion of VNFD/NSD that describes primitive
2647 :param params: Params provided by user
2648 :param instantiation_params: Instantiation params provided by user
2649 :return: a dictionary with the calculated params
2650 """
2651 calculated_params = {}
2652 for parameter in primitive_desc.get("parameter", ()):
2653 param_name = parameter["name"]
2654 if param_name in params:
2655 calculated_params[param_name] = params[param_name]
2656 elif "default-value" in parameter or "value" in parameter:
2657 if "value" in parameter:
2658 calculated_params[param_name] = parameter["value"]
2659 else:
2660 calculated_params[param_name] = parameter["default-value"]
2661 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2662 and calculated_params[param_name].endswith(">"):
2663 if calculated_params[param_name][1:-1] in instantiation_params:
2664 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2665 else:
2666 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2667 format(calculated_params[param_name], primitive_desc["name"]))
2668 else:
2669 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2670 format(param_name, primitive_desc["name"]))
2671
2672 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2673 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2674 width=256)
2675 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2676 calculated_params[param_name] = calculated_params[param_name][7:]
2677
2678 # add always ns_config_info if primitive name is config
2679 if primitive_desc["name"] == "config":
2680 if "ns_config_info" in instantiation_params:
2681 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2682 return calculated_params
2683
2684 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2685 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2686
2687 # find vca_deployed record for this action
2688 try:
2689 for vca_deployed in db_deployed["VCA"]:
2690 if not vca_deployed:
2691 continue
2692 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2693 continue
2694 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2695 continue
2696 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2697 continue
2698 break
2699 else:
2700 # vca_deployed not found
2701 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2702 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2703
2704 # get ee_id
2705 ee_id = vca_deployed.get("ee_id")
2706 if not ee_id:
2707 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2708 "execution environment"
2709 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2710
2711 if primitive == "config":
2712 primitive_params = {"params": primitive_params}
2713
2714 while retries >= 0:
2715 try:
2716 output = await self.n2vc.exec_primitive(
2717 ee_id=ee_id,
2718 primitive_name=primitive,
2719 params_dict=primitive_params
2720 )
2721 # execution was OK
2722 break
2723 except Exception as e:
2724 retries -= 1
2725 if retries >= 0:
2726 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2727 # wait and retry
2728 await asyncio.sleep(retries_interval, loop=self.loop)
2729 else:
2730 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2731
2732 return output, 'OK'
2733
2734 except Exception as e:
2735 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2736
2737 async def action(self, nsr_id, nslcmop_id):
2738
2739 # Try to lock HA task here
2740 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2741 if not task_is_locked_by_me:
2742 return
2743
2744 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2745 self.logger.debug(logging_text + "Enter")
2746 # get all needed from database
2747 db_nsr = None
2748 db_nslcmop = None
2749 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2750 "_admin.current-operation": nslcmop_id,
2751 "_admin.operation-type": "action"}
2752 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2753 db_nslcmop_update = {}
2754 nslcmop_operation_state = None
2755 nslcmop_operation_state_detail = None
2756 exc = None
2757 try:
2758 # wait for any previous tasks in process
2759 step = "Waiting for previous operations to terminate"
2760 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2761
2762 self._write_ns_status(
2763 nsr_id=nsr_id,
2764 ns_state=None,
2765 current_operation="RUNNING ACTION",
2766 current_operation_id=nslcmop_id
2767 )
2768
2769 step = "Getting information from database"
2770 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2771 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2772
2773 nsr_deployed = db_nsr["_admin"].get("deployed")
2774 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2775 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2776 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2777 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2778 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2779
2780 if vnf_index:
2781 step = "Getting vnfr from database"
2782 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2783 step = "Getting vnfd from database"
2784 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2785 else:
2786 if db_nsr.get("nsd"):
2787 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2788 else:
2789 step = "Getting nsd from database"
2790 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2791
2792 # for backward compatibility
2793 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2794 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2795 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2796 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2797
2798 primitive = db_nslcmop["operationParams"]["primitive"]
2799 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2800
2801 # look for primitive
2802 config_primitive_desc = None
2803 if vdu_id:
2804 for vdu in get_iterable(db_vnfd, "vdu"):
2805 if vdu_id == vdu["id"]:
2806 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2807 if config_primitive["name"] == primitive:
2808 config_primitive_desc = config_primitive
2809 break
2810 elif kdu_name:
2811 self.logger.debug(logging_text + "Checking actions in KDUs")
2812 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2813 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2814 if primitive_params:
2815 desc_params.update(primitive_params)
2816 # TODO Check if we will need something at vnf level
2817 index = 0
2818 for kdu in get_iterable(nsr_deployed, "K8s"):
2819 if kdu_name == kdu["kdu-name"]:
2820 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2821 "path": "_admin.deployed.K8s.{}".format(index)}
2822 if primitive == "upgrade":
2823 if desc_params.get("kdu_model"):
2824 kdu_model = desc_params.get("kdu_model")
2825 del desc_params["kdu_model"]
2826 else:
2827 kdu_model = kdu.get("kdu-model")
2828 parts = kdu_model.split(sep=":")
2829 if len(parts) == 2:
2830 kdu_model = parts[0]
2831
2832 if kdu.get("k8scluster-type") == "chart":
2833 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2834 kdu_instance=kdu.get("kdu-instance"),
2835 atomic=True, kdu_model=kdu_model,
2836 params=desc_params, db_dict=db_dict,
2837 timeout=300)
2838 elif kdu.get("k8scluster-type") == "juju":
2839 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2840 kdu_instance=kdu.get("kdu-instance"),
2841 atomic=True, kdu_model=kdu_model,
2842 params=desc_params, db_dict=db_dict,
2843 timeout=300)
2844
2845 else:
2846 msg = "k8scluster-type not defined"
2847 raise LcmException(msg)
2848
2849 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2850 break
2851 elif primitive == "rollback":
2852 if kdu.get("k8scluster-type") == "chart":
2853 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2854 kdu_instance=kdu.get("kdu-instance"),
2855 db_dict=db_dict)
2856 elif kdu.get("k8scluster-type") == "juju":
2857 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2858 kdu_instance=kdu.get("kdu-instance"),
2859 db_dict=db_dict)
2860 else:
2861 msg = "k8scluster-type not defined"
2862 raise LcmException(msg)
2863 break
2864 elif primitive == "status":
2865 if kdu.get("k8scluster-type") == "chart":
2866 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2867 kdu_instance=kdu.get("kdu-instance"))
2868 elif kdu.get("k8scluster-type") == "juju":
2869 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2870 kdu_instance=kdu.get("kdu-instance"))
2871 else:
2872 msg = "k8scluster-type not defined"
2873 raise LcmException(msg)
2874 break
2875 index += 1
2876
2877 else:
2878 raise LcmException("KDU '{}' not found".format(kdu_name))
2879 if output:
2880 db_nslcmop_update["detailed-status"] = output
2881 db_nslcmop_update["operationState"] = 'COMPLETED'
2882 db_nslcmop_update["statusEnteredTime"] = time()
2883 else:
2884 db_nslcmop_update["detailed-status"] = ''
2885 db_nslcmop_update["operationState"] = 'FAILED'
2886 db_nslcmop_update["statusEnteredTime"] = time()
2887 return
2888 elif vnf_index:
2889 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2890 if config_primitive["name"] == primitive:
2891 config_primitive_desc = config_primitive
2892 break
2893 else:
2894 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2895 if config_primitive["name"] == primitive:
2896 config_primitive_desc = config_primitive
2897 break
2898
2899 if not config_primitive_desc:
2900 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2901 format(primitive))
2902
2903 desc_params = {}
2904 if vnf_index:
2905 if db_vnfr.get("additionalParamsForVnf"):
2906 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2907 if vdu_id:
2908 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2909 if vdur.get("additionalParams"):
2910 desc_params = self._format_additional_params(vdur["additionalParams"])
2911 else:
2912 if db_nsr.get("additionalParamsForNs"):
2913 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2914
2915 # TODO check if ns is in a proper status
2916 output, detail = await self._ns_execute_primitive(
2917 db_deployed=nsr_deployed,
2918 member_vnf_index=vnf_index,
2919 vdu_id=vdu_id,
2920 vdu_name=vdu_name,
2921 vdu_count_index=vdu_count_index,
2922 primitive=primitive,
2923 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2924
2925 detailed_status = output
2926 if detail == 'OK':
2927 result = 'COMPLETED'
2928 else:
2929 result = 'FAILED'
2930
2931 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2932 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2933 db_nslcmop_update["statusEnteredTime"] = time()
2934 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2935 return # database update is called inside finally
2936
2937 except (DbException, LcmException) as e:
2938 self.logger.error(logging_text + "Exit Exception {}".format(e))
2939 exc = e
2940 except asyncio.CancelledError:
2941 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2942 exc = "Operation was cancelled"
2943 except Exception as e:
2944 exc = traceback.format_exc()
2945 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2946 finally:
2947 if exc and db_nslcmop:
2948 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2949 "FAILED {}: {}".format(step, exc)
2950 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2951 db_nslcmop_update["statusEnteredTime"] = time()
2952 try:
2953 if db_nslcmop_update:
2954 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2955 if db_nsr:
2956 db_nsr_update["_admin.nslcmop"] = None
2957 db_nsr_update["_admin.operation-type"] = None
2958 db_nsr_update["_admin.nslcmop"] = None
2959 db_nsr_update["_admin.current-operation"] = None
2960 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2961 self._write_ns_status(
2962 nsr_id=nsr_id,
2963 ns_state=None,
2964 current_operation="IDLE",
2965 current_operation_id=None
2966 )
2967 if exc:
2968 self._write_op_status(
2969 op_id=nslcmop_id,
2970 error_message=nslcmop_operation_state_detail
2971 )
2972 except DbException as e:
2973 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2974 self.logger.debug(logging_text + "Exit")
2975 if nslcmop_operation_state:
2976 try:
2977 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2978 "operationState": nslcmop_operation_state},
2979 loop=self.loop)
2980 except Exception as e:
2981 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2982 self.logger.debug(logging_text + "Exit")
2983 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2984 return nslcmop_operation_state, nslcmop_operation_state_detail
2985
2986 async def scale(self, nsr_id, nslcmop_id):
2987
2988 # Try to lock HA task here
2989 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2990 if not task_is_locked_by_me:
2991 return
2992
2993 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2994 self.logger.debug(logging_text + "Enter")
2995 # get all needed from database
2996 db_nsr = None
2997 db_nslcmop = None
2998 db_nslcmop_update = {}
2999 nslcmop_operation_state = None
3000 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3001 "_admin.current-operation": nslcmop_id,
3002 "_admin.operation-type": "scale"}
3003 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3004 exc = None
3005 # in case of error, indicates what part of scale was failed to put nsr at error status
3006 scale_process = None
3007 old_operational_status = ""
3008 old_config_status = ""
3009 vnfr_scaled = False
3010 try:
3011 # wait for any previous tasks in process
3012 step = "Waiting for previous operations to terminate"
3013 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3014
3015 self._write_ns_status(
3016 nsr_id=nsr_id,
3017 ns_state=None,
3018 current_operation="SCALING",
3019 current_operation_id=nslcmop_id
3020 )
3021
3022 step = "Getting nslcmop from database"
3023 self.logger.debug(step + " after having waited for previous tasks to be completed")
3024 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3025 step = "Getting nsr from database"
3026 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3027
3028 old_operational_status = db_nsr["operational-status"]
3029 old_config_status = db_nsr["config-status"]
3030 step = "Parsing scaling parameters"
3031 # self.logger.debug(step)
3032 db_nsr_update["operational-status"] = "scaling"
3033 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3034 nsr_deployed = db_nsr["_admin"].get("deployed")
3035
3036 #######
3037 nsr_deployed = db_nsr["_admin"].get("deployed")
3038 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3039 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3040 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3041 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3042 #######
3043
3044 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3045 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3046 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3047 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3048 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3049
3050 # for backward compatibility
3051 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3052 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3053 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3054 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3055
3056 step = "Getting vnfr from database"
3057 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3058 step = "Getting vnfd from database"
3059 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3060
3061 step = "Getting scaling-group-descriptor"
3062 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3063 if scaling_descriptor["name"] == scaling_group:
3064 break
3065 else:
3066 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3067 "at vnfd:scaling-group-descriptor".format(scaling_group))
3068
3069 # cooldown_time = 0
3070 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3071 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3072 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3073 # break
3074
3075 # TODO check if ns is in a proper status
3076 step = "Sending scale order to VIM"
3077 nb_scale_op = 0
3078 if not db_nsr["_admin"].get("scaling-group"):
3079 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3080 admin_scale_index = 0
3081 else:
3082 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3083 if admin_scale_info["name"] == scaling_group:
3084 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3085 break
3086 else: # not found, set index one plus last element and add new entry with the name
3087 admin_scale_index += 1
3088 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3089 RO_scaling_info = []
3090 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3091 if scaling_type == "SCALE_OUT":
3092 # count if max-instance-count is reached
3093 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3094 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3095 if nb_scale_op >= max_instance_count:
3096 raise LcmException("reached the limit of {} (max-instance-count) "
3097 "scaling-out operations for the "
3098 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3099
3100 nb_scale_op += 1
3101 vdu_scaling_info["scaling_direction"] = "OUT"
3102 vdu_scaling_info["vdu-create"] = {}
3103 for vdu_scale_info in scaling_descriptor["vdu"]:
3104 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3105 "type": "create", "count": vdu_scale_info.get("count", 1)})
3106 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3107
3108 elif scaling_type == "SCALE_IN":
3109 # count if min-instance-count is reached
3110 min_instance_count = 0
3111 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3112 min_instance_count = int(scaling_descriptor["min-instance-count"])
3113 if nb_scale_op <= min_instance_count:
3114 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3115 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3116 nb_scale_op -= 1
3117 vdu_scaling_info["scaling_direction"] = "IN"
3118 vdu_scaling_info["vdu-delete"] = {}
3119 for vdu_scale_info in scaling_descriptor["vdu"]:
3120 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3121 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3122 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3123
3124 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3125 vdu_create = vdu_scaling_info.get("vdu-create")
3126 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3127 if vdu_scaling_info["scaling_direction"] == "IN":
3128 for vdur in reversed(db_vnfr["vdur"]):
3129 if vdu_delete.get(vdur["vdu-id-ref"]):
3130 vdu_delete[vdur["vdu-id-ref"]] -= 1
3131 vdu_scaling_info["vdu"].append({
3132 "name": vdur["name"],
3133 "vdu_id": vdur["vdu-id-ref"],
3134 "interface": []
3135 })
3136 for interface in vdur["interfaces"]:
3137 vdu_scaling_info["vdu"][-1]["interface"].append({
3138 "name": interface["name"],
3139 "ip_address": interface["ip-address"],
3140 "mac_address": interface.get("mac-address"),
3141 })
3142 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3143
3144 # PRE-SCALE BEGIN
3145 step = "Executing pre-scale vnf-config-primitive"
3146 if scaling_descriptor.get("scaling-config-action"):
3147 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3148 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3149 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3150 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3151 step = db_nslcmop_update["detailed-status"] = \
3152 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3153
3154 # look for primitive
3155 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3156 if config_primitive["name"] == vnf_config_primitive:
3157 break
3158 else:
3159 raise LcmException(
3160 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3161 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3162 "primitive".format(scaling_group, config_primitive))
3163
3164 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3165 if db_vnfr.get("additionalParamsForVnf"):
3166 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3167
3168 scale_process = "VCA"
3169 db_nsr_update["config-status"] = "configuring pre-scaling"
3170 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3171
3172 # Pre-scale reintent check: Check if this sub-operation has been executed before
3173 op_index = self._check_or_add_scale_suboperation(
3174 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3175 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3176 # Skip sub-operation
3177 result = 'COMPLETED'
3178 result_detail = 'Done'
3179 self.logger.debug(logging_text +
3180 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3181 vnf_config_primitive, result, result_detail))
3182 else:
3183 if (op_index == self.SUBOPERATION_STATUS_NEW):
3184 # New sub-operation: Get index of this sub-operation
3185 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3186 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3187 format(vnf_config_primitive))
3188 else:
3189 # Reintent: Get registered params for this existing sub-operation
3190 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3191 vnf_index = op.get('member_vnf_index')
3192 vnf_config_primitive = op.get('primitive')
3193 primitive_params = op.get('primitive_params')
3194 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3195 format(vnf_config_primitive))
3196 # Execute the primitive, either with new (first-time) or registered (reintent) args
3197 result, result_detail = await self._ns_execute_primitive(
3198 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3199 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3200 vnf_config_primitive, result, result_detail))
3201 # Update operationState = COMPLETED | FAILED
3202 self._update_suboperation_status(
3203 db_nslcmop, op_index, result, result_detail)
3204
3205 if result == "FAILED":
3206 raise LcmException(result_detail)
3207 db_nsr_update["config-status"] = old_config_status
3208 scale_process = None
3209 # PRE-SCALE END
3210
3211 # SCALE RO - BEGIN
3212 # Should this block be skipped if 'RO_nsr_id' == None ?
3213 # if (RO_nsr_id and RO_scaling_info):
3214 if RO_scaling_info:
3215 scale_process = "RO"
3216 # Scale RO reintent check: Check if this sub-operation has been executed before
3217 op_index = self._check_or_add_scale_suboperation(
3218 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3219 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3220 # Skip sub-operation
3221 result = 'COMPLETED'
3222 result_detail = 'Done'
3223 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3224 result, result_detail))
3225 else:
3226 if (op_index == self.SUBOPERATION_STATUS_NEW):
3227 # New sub-operation: Get index of this sub-operation
3228 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3229 self.logger.debug(logging_text + "New sub-operation RO")
3230 else:
3231 # Reintent: Get registered params for this existing sub-operation
3232 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3233 RO_nsr_id = op.get('RO_nsr_id')
3234 RO_scaling_info = op.get('RO_scaling_info')
3235 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3236 vnf_config_primitive))
3237
3238 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3239 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3240 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3241 # wait until ready
3242 RO_nslcmop_id = RO_desc["instance_action_id"]
3243 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3244
3245 RO_task_done = False
3246 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3247 detailed_status_old = None
3248 self.logger.debug(logging_text + step)
3249
3250 deployment_timeout = 1 * 3600 # One hour
3251 while deployment_timeout > 0:
3252 if not RO_task_done:
3253 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3254 extra_item_id=RO_nslcmop_id)
3255
3256 # deploymentStatus
3257 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3258
3259 ns_status, ns_status_info = self.RO.check_action_status(desc)
3260 if ns_status == "ERROR":
3261 raise ROclient.ROClientException(ns_status_info)
3262 elif ns_status == "BUILD":
3263 detailed_status = step + "; {}".format(ns_status_info)
3264 elif ns_status == "ACTIVE":
3265 RO_task_done = True
3266 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3267 self.logger.debug(logging_text + step)
3268 else:
3269 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3270 else:
3271
3272 if ns_status == "ERROR":
3273 raise ROclient.ROClientException(ns_status_info)
3274 elif ns_status == "BUILD":
3275 detailed_status = step + "; {}".format(ns_status_info)
3276 elif ns_status == "ACTIVE":
3277 step = detailed_status = \
3278 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3279 if not vnfr_scaled:
3280 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3281 vnfr_scaled = True
3282 try:
3283 desc = await self.RO.show("ns", RO_nsr_id)
3284
3285 # deploymentStatus
3286 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3287
3288 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3289 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3290 break
3291 except LcmExceptionNoMgmtIP:
3292 pass
3293 else:
3294 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3295 if detailed_status != detailed_status_old:
3296 self._update_suboperation_status(
3297 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3298 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3299 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3300
3301 await asyncio.sleep(5, loop=self.loop)
3302 deployment_timeout -= 5
3303 if deployment_timeout <= 0:
3304 self._update_suboperation_status(
3305 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3306 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3307
3308 # update VDU_SCALING_INFO with the obtained ip_addresses
3309 if vdu_scaling_info["scaling_direction"] == "OUT":
3310 for vdur in reversed(db_vnfr["vdur"]):
3311 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3312 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3313 vdu_scaling_info["vdu"].append({
3314 "name": vdur["name"],
3315 "vdu_id": vdur["vdu-id-ref"],
3316 "interface": []
3317 })
3318 for interface in vdur["interfaces"]:
3319 vdu_scaling_info["vdu"][-1]["interface"].append({
3320 "name": interface["name"],
3321 "ip_address": interface["ip-address"],
3322 "mac_address": interface.get("mac-address"),
3323 })
3324 del vdu_scaling_info["vdu-create"]
3325
3326 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3327 # SCALE RO - END
3328
3329 scale_process = None
3330 if db_nsr_update:
3331 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3332
3333 # POST-SCALE BEGIN
3334 # execute primitive service POST-SCALING
3335 step = "Executing post-scale vnf-config-primitive"
3336 if scaling_descriptor.get("scaling-config-action"):
3337 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3338 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3339 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3340 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3341 step = db_nslcmop_update["detailed-status"] = \
3342 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3343
3344 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3345 if db_vnfr.get("additionalParamsForVnf"):
3346 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3347
3348 # look for primitive
3349 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3350 if config_primitive["name"] == vnf_config_primitive:
3351 break
3352 else:
3353 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3354 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3355 "match any vnf-configuration:config-primitive".format(scaling_group,
3356 config_primitive))
3357 scale_process = "VCA"
3358 db_nsr_update["config-status"] = "configuring post-scaling"
3359 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3360
3361 # Post-scale reintent check: Check if this sub-operation has been executed before
3362 op_index = self._check_or_add_scale_suboperation(
3363 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3364 if op_index == self.SUBOPERATION_STATUS_SKIP:
3365 # Skip sub-operation
3366 result = 'COMPLETED'
3367 result_detail = 'Done'
3368 self.logger.debug(logging_text +
3369 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3370 format(vnf_config_primitive, result, result_detail))
3371 else:
3372 if op_index == self.SUBOPERATION_STATUS_NEW:
3373 # New sub-operation: Get index of this sub-operation
3374 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3375 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3376 format(vnf_config_primitive))
3377 else:
3378 # Reintent: Get registered params for this existing sub-operation
3379 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3380 vnf_index = op.get('member_vnf_index')
3381 vnf_config_primitive = op.get('primitive')
3382 primitive_params = op.get('primitive_params')
3383 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3384 format(vnf_config_primitive))
3385 # Execute the primitive, either with new (first-time) or registered (reintent) args
3386 result, result_detail = await self._ns_execute_primitive(
3387 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3388 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3389 vnf_config_primitive, result, result_detail))
3390 # Update operationState = COMPLETED | FAILED
3391 self._update_suboperation_status(
3392 db_nslcmop, op_index, result, result_detail)
3393
3394 if result == "FAILED":
3395 raise LcmException(result_detail)
3396 db_nsr_update["config-status"] = old_config_status
3397 scale_process = None
3398 # POST-SCALE END
3399
3400 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3401 db_nslcmop_update["statusEnteredTime"] = time()
3402 db_nslcmop_update["detailed-status"] = "done"
3403 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3404 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3405 else old_operational_status
3406 db_nsr_update["config-status"] = old_config_status
3407 return
3408 except (ROclient.ROClientException, DbException, LcmException) as e:
3409 self.logger.error(logging_text + "Exit Exception {}".format(e))
3410 exc = e
3411 except asyncio.CancelledError:
3412 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3413 exc = "Operation was cancelled"
3414 except Exception as e:
3415 exc = traceback.format_exc()
3416 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3417 finally:
3418 self._write_ns_status(
3419 nsr_id=nsr_id,
3420 ns_state=None,
3421 current_operation="IDLE",
3422 current_operation_id=None
3423 )
3424 if exc:
3425 if db_nslcmop:
3426 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3427 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3428 db_nslcmop_update["statusEnteredTime"] = time()
3429 if db_nsr:
3430 db_nsr_update["operational-status"] = old_operational_status
3431 db_nsr_update["config-status"] = old_config_status
3432 db_nsr_update["detailed-status"] = ""
3433 db_nsr_update["_admin.nslcmop"] = None
3434 if scale_process:
3435 if "VCA" in scale_process:
3436 db_nsr_update["config-status"] = "failed"
3437 if "RO" in scale_process:
3438 db_nsr_update["operational-status"] = "failed"
3439 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3440 exc)
3441 try:
3442 if db_nslcmop and db_nslcmop_update:
3443 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3444 if db_nsr:
3445 db_nsr_update["_admin.current-operation"] = None
3446 db_nsr_update["_admin.operation-type"] = None
3447 db_nsr_update["_admin.nslcmop"] = None
3448 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3449
3450 self._write_ns_status(
3451 nsr_id=nsr_id,
3452 ns_state=None,
3453 current_operation="IDLE",
3454 current_operation_id=None
3455 )
3456
3457 except DbException as e:
3458 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3459 if nslcmop_operation_state:
3460 try:
3461 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3462 "operationState": nslcmop_operation_state},
3463 loop=self.loop)
3464 # if cooldown_time:
3465 # await asyncio.sleep(cooldown_time, loop=self.loop)
3466 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3467 except Exception as e:
3468 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3469 self.logger.debug(logging_text + "Exit")
3470 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")