timeout parametrization to allow larger deployments
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
619
620 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
621 vdur_RO_count_index = 0
622 if vdur.get("pdu-type"):
623 continue
624 for vdur_RO in get_iterable(vnf_RO, "vms"):
625 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
626 continue
627 if vdur["count-index"] != vdur_RO_count_index:
628 vdur_RO_count_index += 1
629 continue
630 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
631 if vdur_RO.get("ip_address"):
632 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
633 else:
634 vdur["ip-address"] = None
635 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
636 vdur["name"] = vdur_RO.get("vim_name")
637 vdur["status"] = vdur_RO.get("status")
638 vdur["status-detailed"] = vdur_RO.get("error_msg")
639 for ifacer in get_iterable(vdur, "interfaces"):
640 for interface_RO in get_iterable(vdur_RO, "interfaces"):
641 if ifacer["name"] == interface_RO.get("internal_name"):
642 ifacer["ip-address"] = interface_RO.get("ip_address")
643 ifacer["mac-address"] = interface_RO.get("mac_address")
644 break
645 else:
646 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
647 "from VIM info"
648 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
649 vnfr_update["vdur.{}".format(vdu_index)] = vdur
650 break
651 else:
652 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
653 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
654
655 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
656 for net_RO in get_iterable(nsr_desc_RO, "nets"):
657 if vld["id"] != net_RO.get("vnf_net_osm_id"):
658 continue
659 vld["vim-id"] = net_RO.get("vim_net_id")
660 vld["name"] = net_RO.get("vim_name")
661 vld["status"] = net_RO.get("status")
662 vld["status-detailed"] = net_RO.get("error_msg")
663 vnfr_update["vld.{}".format(vld_index)] = vld
664 break
665 else:
666 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
667 vnf_index, vld["id"]))
668
669 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
670 break
671
672 else:
673 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
674
675 def _get_ns_config_info(self, nsr_id):
676 """
677 Generates a mapping between vnf,vdu elements and the N2VC id
678 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
679 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
680 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
681 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
682 """
683 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
684 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
685 mapping = {}
686 ns_config_info = {"osm-config-mapping": mapping}
687 for vca in vca_deployed_list:
688 if not vca["member-vnf-index"]:
689 continue
690 if not vca["vdu_id"]:
691 mapping[vca["member-vnf-index"]] = vca["application"]
692 else:
693 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
694 vca["application"]
695 return ns_config_info
696
697 @staticmethod
698 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
699 """
700 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
701 primitives as verify-ssh-credentials, or config when needed
702 :param desc_primitive_list: information of the descriptor
703 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
704 this element contains a ssh public key
705 :return: The modified list. Can ba an empty list, but always a list
706 """
707 if desc_primitive_list:
708 primitive_list = desc_primitive_list.copy()
709 else:
710 primitive_list = []
711 # look for primitive config, and get the position. None if not present
712 config_position = None
713 for index, primitive in enumerate(primitive_list):
714 if primitive["name"] == "config":
715 config_position = index
716 break
717
718 # for NS, add always a config primitive if not present (bug 874)
719 if not vca_deployed["member-vnf-index"] and config_position is None:
720 primitive_list.insert(0, {"name": "config", "parameter": []})
721 config_position = 0
722 # for VNF/VDU add verify-ssh-credentials after config
723 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
724 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
725 return primitive_list
726
727 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
728 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
729
730 db_nsr_update = {}
731 RO_descriptor_number = 0 # number of descriptors created at RO
732 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
733 start_deploy = time()
734 vdu_flag = False # If any of the VNFDs has VDUs
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # deploy RO
742
743 # get vnfds, instantiate at RO
744
745 for c_vnf in nsd.get("constituent-vnfd", ()):
746 member_vnf_index = c_vnf["member-vnf-index"]
747 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
748 if vnfd.get("vdu"):
749 vdu_flag = True
750 vnfd_ref = vnfd["id"]
751 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
752 " RO".format(vnfd_ref, member_vnf_index)
753 # self.logger.debug(logging_text + step)
754 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
755 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
756 RO_descriptor_number += 1
757
758 # look position at deployed.RO.vnfd if not present it will be appended at the end
759 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
760 if vnf_deployed["member-vnf-index"] == member_vnf_index:
761 break
762 else:
763 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
764 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
765
766 # look if present
767 RO_update = {"member-vnf-index": member_vnf_index}
768 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
769 if vnfd_list:
770 RO_update["id"] = vnfd_list[0]["uuid"]
771 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
772 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
773 else:
774 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
775 get("additionalParamsForVnf"), nsr_id)
776 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
777 RO_update["id"] = desc["uuid"]
778 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
779 vnfd_ref, member_vnf_index, desc["uuid"]))
780 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
781 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
782 self.update_db_2("nsrs", nsr_id, db_nsr_update)
783
784 # create nsd at RO
785 nsd_ref = nsd["id"]
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
787 # self.logger.debug(logging_text + step)
788
789 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
790 RO_descriptor_number += 1
791 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
792 if nsd_list:
793 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
794 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
795 nsd_ref, RO_nsd_uuid))
796 else:
797 nsd_RO = deepcopy(nsd)
798 nsd_RO["id"] = RO_osm_nsd_id
799 nsd_RO.pop("_id", None)
800 nsd_RO.pop("_admin", None)
801 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
802 member_vnf_index = c_vnf["member-vnf-index"]
803 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
804 for c_vld in nsd_RO.get("vld", ()):
805 for cp in c_vld.get("vnfd-connection-point-ref", ()):
806 member_vnf_index = cp["member-vnf-index-ref"]
807 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
808
809 desc = await self.RO.create("nsd", descriptor=nsd_RO)
810 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
811 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
812 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
813 self.update_db_2("nsrs", nsr_id, db_nsr_update)
814
815 # Crate ns at RO
816 # if present use it unless in error status
817 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
818 if RO_nsr_id:
819 try:
820 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
821 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
822 desc = await self.RO.show("ns", RO_nsr_id)
823
824 except ROclient.ROClientException as e:
825 if e.http_code != HTTPStatus.NOT_FOUND:
826 raise
827 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
828 if RO_nsr_id:
829 ns_status, ns_status_info = self.RO.check_ns_status(desc)
830 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
831 if ns_status == "ERROR":
832 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
833 .format(RO_nsr_id)
834 self.logger.debug(logging_text + step)
835 await self.RO.delete("ns", RO_nsr_id)
836 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
837 if not RO_nsr_id:
838 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
839 # self.logger.debug(logging_text + step)
840
841 # check if VIM is creating and wait look if previous tasks in process
842 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
843 if task_dependency:
844 step = "Waiting for related tasks to be completed: {}".format(task_name)
845 self.logger.debug(logging_text + step)
846 await asyncio.wait(task_dependency, timeout=3600)
847 if ns_params.get("vnf"):
848 for vnf in ns_params["vnf"]:
849 if "vimAccountId" in vnf:
850 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
851 vnf["vimAccountId"])
852 if task_dependency:
853 step = "Waiting for related tasks to be completed: {}".format(task_name)
854 self.logger.debug(logging_text + step)
855 await asyncio.wait(task_dependency, timeout=3600)
856
857 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
858
859 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
860
861 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
862 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
863 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
864 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
865 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
866 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
867 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
868 self.update_db_2("nsrs", nsr_id, db_nsr_update)
869
870 # wait until NS is ready
871 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
872 detailed_status_old = None
873 self.logger.debug(logging_text + step)
874
875 old_desc = None
876 while time() <= start_deploy + timeout_ns_deploy:
877 desc = await self.RO.show("ns", RO_nsr_id)
878
879 # deploymentStatus
880 if desc != old_desc:
881 # desc has changed => update db
882 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
883 old_desc = desc
884
885 ns_status, ns_status_info = self.RO.check_ns_status(desc)
886 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
887 if ns_status == "ERROR":
888 raise ROclient.ROClientException(ns_status_info)
889 elif ns_status == "BUILD":
890 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
891 elif ns_status == "ACTIVE":
892 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
893 try:
894 if vdu_flag:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur:
968 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
969 vnfr_id, vdu_id, vdu_index
970 ))
971
972 if vdur.get("status") == "ACTIVE":
973 ip_address = vdur.get("ip-address")
974 if not ip_address:
975 continue
976 target_vdu_id = vdur["vdu-id-ref"]
977 elif vdur.get("status") == "ERROR":
978 raise LcmException("Cannot inject ssh-key because target VM is in error state")
979
980 if not target_vdu_id:
981 continue
982
983 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
984
985 # inject public key into machine
986 if pub_key and user:
987 # self.logger.debug(logging_text + "Inserting RO key")
988 try:
989 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
990 result_dict = await self.RO.create_action(
991 item="ns",
992 item_id_name=ro_nsr_id,
993 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
994 )
995 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
996 if not result_dict or not isinstance(result_dict, dict):
997 raise LcmException("Unknown response from RO when injecting key")
998 for result in result_dict.values():
999 if result.get("vim_result") == 200:
1000 break
1001 else:
1002 raise ROclient.ROClientException("error injecting key: {}".format(
1003 result.get("description")))
1004 break
1005 except ROclient.ROClientException as e:
1006 if not nb_tries:
1007 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1008 format(e, 20*10))
1009 nb_tries += 1
1010 if nb_tries >= 20:
1011 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1012 else:
1013 break
1014
1015 return ip_address
1016
1017 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1018 """
1019 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1020 """
1021 my_vca = vca_deployed_list[vca_index]
1022 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1023 # vdu or kdu: no dependencies
1024 return
1025 timeout = 300
1026 while timeout >= 0:
1027 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1028 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1029 configuration_status_list = db_nsr["configurationStatus"]
1030 for index, vca_deployed in enumerate(configuration_status_list):
1031 if index == vca_index:
1032 # myself
1033 continue
1034 if not my_vca.get("member-vnf-index") or \
1035 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1036 internal_status = configuration_status_list[index].get("status")
1037 if internal_status == 'READY':
1038 continue
1039 elif internal_status == 'BROKEN':
1040 raise LcmException("Configuration aborted because dependent charm/s has failed")
1041 else:
1042 break
1043 else:
1044 # no dependencies, return
1045 return
1046 await asyncio.sleep(10)
1047 timeout -= 1
1048
1049 raise LcmException("Configuration aborted because dependent charm/s timeout")
1050
1051 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1052 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1053 nsr_id = db_nsr["_id"]
1054 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1055 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1056 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1057 db_dict = {
1058 'collection': 'nsrs',
1059 'filter': {'_id': nsr_id},
1060 'path': db_update_entry
1061 }
1062 step = ""
1063 try:
1064
1065 element_type = 'NS'
1066 element_under_configuration = nsr_id
1067
1068 vnfr_id = None
1069 if db_vnfr:
1070 vnfr_id = db_vnfr["_id"]
1071
1072 namespace = "{nsi}.{ns}".format(
1073 nsi=nsi_id if nsi_id else "",
1074 ns=nsr_id)
1075
1076 if vnfr_id:
1077 element_type = 'VNF'
1078 element_under_configuration = vnfr_id
1079 namespace += ".{}".format(vnfr_id)
1080 if vdu_id:
1081 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1082 element_type = 'VDU'
1083 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1084
1085 # Get artifact path
1086 artifact_path = "{}/{}/charms/{}".format(
1087 base_folder["folder"],
1088 base_folder["pkg-dir"],
1089 config_descriptor["juju"]["charm"]
1090 )
1091
1092 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1093 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1094 is_proxy_charm = False
1095
1096 # n2vc_redesign STEP 3.1
1097
1098 # find old ee_id if exists
1099 ee_id = vca_deployed.get("ee_id")
1100
1101 # create or register execution environment in VCA
1102 if is_proxy_charm:
1103
1104 await self._write_configuration_status(
1105 nsr_id=nsr_id,
1106 vca_index=vca_index,
1107 status='CREATING',
1108 element_under_configuration=element_under_configuration,
1109 element_type=element_type
1110 )
1111
1112 step = "create execution environment"
1113 self.logger.debug(logging_text + step)
1114 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1115 reuse_ee_id=ee_id,
1116 db_dict=db_dict)
1117
1118 else:
1119 step = "Waiting to VM being up and getting IP address"
1120 self.logger.debug(logging_text + step)
1121 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1122 user=None, pub_key=None)
1123 credentials = {"hostname": rw_mgmt_ip}
1124 # get username
1125 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1126 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1127 # merged. Meanwhile let's get username from initial-config-primitive
1128 if not username and config_descriptor.get("initial-config-primitive"):
1129 for config_primitive in config_descriptor["initial-config-primitive"]:
1130 for param in config_primitive.get("parameter", ()):
1131 if param["name"] == "ssh-username":
1132 username = param["value"]
1133 break
1134 if not username:
1135 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1136 "'config-access.ssh-access.default-user'")
1137 credentials["username"] = username
1138 # n2vc_redesign STEP 3.2
1139
1140 await self._write_configuration_status(
1141 nsr_id=nsr_id,
1142 vca_index=vca_index,
1143 status='REGISTERING',
1144 element_under_configuration=element_under_configuration,
1145 element_type=element_type
1146 )
1147
1148 step = "register execution environment {}".format(credentials)
1149 self.logger.debug(logging_text + step)
1150 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1151 db_dict=db_dict)
1152
1153 # for compatibility with MON/POL modules, the need model and application name at database
1154 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1155 ee_id_parts = ee_id.split('.')
1156 model_name = ee_id_parts[0]
1157 application_name = ee_id_parts[1]
1158 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1159 db_update_entry + "application": application_name,
1160 db_update_entry + "ee_id": ee_id})
1161
1162 # n2vc_redesign STEP 3.3
1163
1164 step = "Install configuration Software"
1165
1166 await self._write_configuration_status(
1167 nsr_id=nsr_id,
1168 vca_index=vca_index,
1169 status='INSTALLING SW',
1170 element_under_configuration=element_under_configuration,
1171 element_type=element_type
1172 )
1173
1174 # TODO check if already done
1175 self.logger.debug(logging_text + step)
1176 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1177
1178 # if SSH access is required, then get execution environment SSH public
1179 if is_proxy_charm: # if native charm we have waited already to VM be UP
1180 pub_key = None
1181 user = None
1182 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1183 # Needed to inject a ssh key
1184 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1185 step = "Install configuration Software, getting public ssh key"
1186 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1187
1188 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1189 else:
1190 step = "Waiting to VM being up and getting IP address"
1191 self.logger.debug(logging_text + step)
1192
1193 # n2vc_redesign STEP 5.1
1194 # wait for RO (ip-address) Insert pub_key into VM
1195 if vnfr_id:
1196 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1197 user=user, pub_key=pub_key)
1198 else:
1199 rw_mgmt_ip = None # This is for a NS configuration
1200
1201 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1202
1203 # store rw_mgmt_ip in deploy params for later replacement
1204 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1205
1206 # n2vc_redesign STEP 6 Execute initial config primitive
1207 step = 'execute initial config primitive'
1208 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1209
1210 # sort initial config primitives by 'seq'
1211 try:
1212 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1213 except Exception as e:
1214 self.logger.error(logging_text + step + ": " + str(e))
1215
1216 # add config if not present for NS charm
1217 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1218 vca_deployed)
1219
1220 # wait for dependent primitives execution (NS -> VNF -> VDU)
1221 if initial_config_primitive_list:
1222 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1223
1224 # stage, in function of element type: vdu, kdu, vnf or ns
1225 my_vca = vca_deployed_list[vca_index]
1226 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1227 # VDU or KDU
1228 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1229 elif my_vca.get("member-vnf-index"):
1230 # VNF
1231 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1232 else:
1233 # NS
1234 stage = 'Stage 5/5: running Day-1 primitives for NS'
1235
1236 await self._write_configuration_status(
1237 nsr_id=nsr_id,
1238 vca_index=vca_index,
1239 status='EXECUTING PRIMITIVE'
1240 )
1241
1242 self._write_op_status(
1243 op_id=nslcmop_id,
1244 stage=stage
1245 )
1246
1247 for initial_config_primitive in initial_config_primitive_list:
1248 # adding information on the vca_deployed if it is a NS execution environment
1249 if not vca_deployed["member-vnf-index"]:
1250 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1251 # TODO check if already done
1252 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1253
1254 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1255 self.logger.debug(logging_text + step)
1256 await self.n2vc.exec_primitive(
1257 ee_id=ee_id,
1258 primitive_name=initial_config_primitive["name"],
1259 params_dict=primitive_params_,
1260 db_dict=db_dict
1261 )
1262
1263 # TODO register in database that primitive is done
1264
1265 step = "instantiated at VCA"
1266 self.logger.debug(logging_text + step)
1267
1268 await self._write_configuration_status(
1269 nsr_id=nsr_id,
1270 vca_index=vca_index,
1271 status='READY'
1272 )
1273
1274 except Exception as e: # TODO not use Exception but N2VC exception
1275 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1276 await self._write_configuration_status(
1277 nsr_id=nsr_id,
1278 vca_index=vca_index,
1279 status='BROKEN'
1280 )
1281 raise Exception("{} {}".format(step, e)) from e
1282 # TODO raise N2VC exception with 'step' extra information
1283
1284 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1285 error_description: str = None):
1286 try:
1287 db_dict = dict()
1288 if ns_state:
1289 db_dict["nsState"] = ns_state
1290 db_dict["currentOperation"] = current_operation
1291 db_dict["currentOperationID"] = current_operation_id
1292 db_dict["errorDescription"] = error_description
1293 self.update_db_2("nsrs", nsr_id, db_dict)
1294 except Exception as e:
1295 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1296
1297 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1298 try:
1299 db_dict = dict()
1300 db_dict['queuePosition'] = queuePosition
1301 db_dict['stage'] = stage
1302 if error_message:
1303 db_dict['errorMessage'] = error_message
1304 self.update_db_2("nslcmops", op_id, db_dict)
1305 except Exception as e:
1306 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1307
1308 def _write_all_config_status(self, nsr_id: str, status: str):
1309 try:
1310 # nsrs record
1311 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1312 # configurationStatus
1313 config_status = db_nsr.get('configurationStatus')
1314 if config_status:
1315 # update status
1316 db_dict = dict()
1317 db_dict['configurationStatus'] = list()
1318 for c in config_status:
1319 c['status'] = status
1320 db_dict['configurationStatus'].append(c)
1321 self.update_db_2("nsrs", nsr_id, db_dict)
1322
1323 except Exception as e:
1324 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1325
1326 async def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str,
1327 element_under_configuration: str = None, element_type: str = None):
1328
1329 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1330 # .format(vca_index, status))
1331
1332 try:
1333 db_path = 'configurationStatus.{}.'.format(vca_index)
1334 db_dict = dict()
1335 db_dict[db_path + 'status'] = status
1336 if element_under_configuration:
1337 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1338 if element_type:
1339 db_dict[db_path + 'elementType'] = element_type
1340 self.update_db_2("nsrs", nsr_id, db_dict)
1341 except Exception as e:
1342 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1343 .format(status, nsr_id, vca_index, e))
1344
1345 async def instantiate(self, nsr_id, nslcmop_id):
1346 """
1347
1348 :param nsr_id: ns instance to deploy
1349 :param nslcmop_id: operation to run
1350 :return:
1351 """
1352
1353 # Try to lock HA task here
1354 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1355 if not task_is_locked_by_me:
1356 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1357 return
1358
1359 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1360 self.logger.debug(logging_text + "Enter")
1361
1362 # get all needed from database
1363
1364 # database nsrs record
1365 db_nsr = None
1366
1367 # database nslcmops record
1368 db_nslcmop = None
1369
1370 # update operation on nsrs
1371 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1372 "_admin.current-operation": nslcmop_id,
1373 "_admin.operation-type": "instantiate"}
1374 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1375
1376 # update operation on nslcmops
1377 db_nslcmop_update = {}
1378
1379 nslcmop_operation_state = None
1380 db_vnfrs = {} # vnf's info indexed by member-index
1381 # n2vc_info = {}
1382 task_instantiation_list = []
1383 task_instantiation_info = {} # from task to info text
1384 exc = None
1385 try:
1386 # wait for any previous tasks in process
1387 step = "Waiting for previous operations to terminate"
1388 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1389
1390 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1391
1392 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1393 self._write_ns_status(
1394 nsr_id=nsr_id,
1395 ns_state="BUILDING",
1396 current_operation="INSTANTIATING",
1397 current_operation_id=nslcmop_id
1398 )
1399
1400 # read from db: operation
1401 step = "Getting nslcmop={} from db".format(nslcmop_id)
1402 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1403 ns_params = db_nslcmop.get("operationParams")
1404 if ns_params and ns_params.get("timeout_ns_deploy"):
1405 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1406 else:
1407 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1408
1409 # read from db: ns
1410 step = "Getting nsr={} from db".format(nsr_id)
1411 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1412 # nsd is replicated into ns (no db read)
1413 nsd = db_nsr["nsd"]
1414 # nsr_name = db_nsr["name"] # TODO short-name??
1415
1416 # read from db: vnf's of this ns
1417 step = "Getting vnfrs from db"
1418 self.logger.debug(logging_text + step)
1419 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1420
1421 # read from db: vnfd's for every vnf
1422 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1423 db_vnfds = {} # every vnfd data indexed by vnf id
1424 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1425
1426 self._write_op_status(
1427 op_id=nslcmop_id,
1428 stage='Stage 1/5: preparation of the environment',
1429 queuePosition=0
1430 )
1431
1432 # for each vnf in ns, read vnfd
1433 for vnfr in db_vnfrs_list:
1434 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1435 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1436 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1437 # if we haven't this vnfd, read it from db
1438 if vnfd_id not in db_vnfds:
1439 # read from cb
1440 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1441 self.logger.debug(logging_text + step)
1442 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1443
1444 # store vnfd
1445 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1446 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1447 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1448
1449 # Get or generates the _admin.deployed.VCA list
1450 vca_deployed_list = None
1451 if db_nsr["_admin"].get("deployed"):
1452 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1453 if vca_deployed_list is None:
1454 vca_deployed_list = []
1455 configuration_status_list = []
1456 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1457 db_nsr_update["configurationStatus"] = configuration_status_list
1458 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1459 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1460 elif isinstance(vca_deployed_list, dict):
1461 # maintain backward compatibility. Change a dict to list at database
1462 vca_deployed_list = list(vca_deployed_list.values())
1463 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1464 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1465
1466 db_nsr_update["detailed-status"] = "creating"
1467 db_nsr_update["operational-status"] = "init"
1468
1469 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1470 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1471 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1472
1473 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1474 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1475 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1476
1477 # n2vc_redesign STEP 2 Deploy Network Scenario
1478
1479 self._write_op_status(
1480 op_id=nslcmop_id,
1481 stage='Stage 2/5: deployment of VMs and execution environments'
1482 )
1483
1484 self.logger.debug(logging_text + "Before deploy_kdus")
1485 # Call to deploy_kdus in case exists the "vdu:kdu" param
1486 task_kdu = asyncio.ensure_future(
1487 self.deploy_kdus(
1488 logging_text=logging_text,
1489 nsr_id=nsr_id,
1490 db_nsr=db_nsr,
1491 db_vnfrs=db_vnfrs,
1492 )
1493 )
1494 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1495 task_instantiation_info[task_kdu] = "Deploy KDUs"
1496 task_instantiation_list.append(task_kdu)
1497 # n2vc_redesign STEP 1 Get VCA public ssh-key
1498 # feature 1429. Add n2vc public key to needed VMs
1499 n2vc_key = self.n2vc.get_public_key()
1500 n2vc_key_list = [n2vc_key]
1501 if self.vca_config.get("public_key"):
1502 n2vc_key_list.append(self.vca_config["public_key"])
1503
1504 task_ro = asyncio.ensure_future(
1505 self.instantiate_RO(
1506 logging_text=logging_text,
1507 nsr_id=nsr_id,
1508 nsd=nsd,
1509 db_nsr=db_nsr,
1510 db_nslcmop=db_nslcmop,
1511 db_vnfrs=db_vnfrs,
1512 db_vnfds_ref=db_vnfds_ref,
1513 n2vc_key_list=n2vc_key_list
1514 )
1515 )
1516 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1517 task_instantiation_info[task_ro] = "Deploy at VIM"
1518 task_instantiation_list.append(task_ro)
1519
1520 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1521 step = "Looking for needed vnfd to configure with proxy charm"
1522 self.logger.debug(logging_text + step)
1523
1524 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1525 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1526 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1527 vnfd_id = c_vnf["vnfd-id-ref"]
1528 vnfd = db_vnfds_ref[vnfd_id]
1529 member_vnf_index = str(c_vnf["member-vnf-index"])
1530 db_vnfr = db_vnfrs[member_vnf_index]
1531 base_folder = vnfd["_admin"]["storage"]
1532 vdu_id = None
1533 vdu_index = 0
1534 vdu_name = None
1535 kdu_name = None
1536
1537 # Get additional parameters
1538 deploy_params = {}
1539 if db_vnfr.get("additionalParamsForVnf"):
1540 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1541
1542 descriptor_config = vnfd.get("vnf-configuration")
1543 if descriptor_config and descriptor_config.get("juju"):
1544 self._deploy_n2vc(
1545 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1546 db_nsr=db_nsr,
1547 db_vnfr=db_vnfr,
1548 nslcmop_id=nslcmop_id,
1549 nsr_id=nsr_id,
1550 nsi_id=nsi_id,
1551 vnfd_id=vnfd_id,
1552 vdu_id=vdu_id,
1553 kdu_name=kdu_name,
1554 member_vnf_index=member_vnf_index,
1555 vdu_index=vdu_index,
1556 vdu_name=vdu_name,
1557 deploy_params=deploy_params,
1558 descriptor_config=descriptor_config,
1559 base_folder=base_folder,
1560 task_instantiation_list=task_instantiation_list,
1561 task_instantiation_info=task_instantiation_info
1562 )
1563
1564 # Deploy charms for each VDU that supports one.
1565 for vdud in get_iterable(vnfd, 'vdu'):
1566 vdu_id = vdud["id"]
1567 descriptor_config = vdud.get('vdu-configuration')
1568 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1569 if vdur.get("additionalParams"):
1570 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1571 else:
1572 deploy_params_vdu = deploy_params
1573 if descriptor_config and descriptor_config.get("juju"):
1574 # look for vdu index in the db_vnfr["vdu"] section
1575 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1576 # if vdur["vdu-id-ref"] == vdu_id:
1577 # break
1578 # else:
1579 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1580 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1581 # vdu_name = vdur.get("name")
1582 vdu_name = None
1583 kdu_name = None
1584 for vdu_index in range(int(vdud.get("count", 1))):
1585 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1586 self._deploy_n2vc(
1587 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1588 member_vnf_index, vdu_id, vdu_index),
1589 db_nsr=db_nsr,
1590 db_vnfr=db_vnfr,
1591 nslcmop_id=nslcmop_id,
1592 nsr_id=nsr_id,
1593 nsi_id=nsi_id,
1594 vnfd_id=vnfd_id,
1595 vdu_id=vdu_id,
1596 kdu_name=kdu_name,
1597 member_vnf_index=member_vnf_index,
1598 vdu_index=vdu_index,
1599 vdu_name=vdu_name,
1600 deploy_params=deploy_params_vdu,
1601 descriptor_config=descriptor_config,
1602 base_folder=base_folder,
1603 task_instantiation_list=task_instantiation_list,
1604 task_instantiation_info=task_instantiation_info
1605 )
1606 for kdud in get_iterable(vnfd, 'kdu'):
1607 kdu_name = kdud["name"]
1608 descriptor_config = kdud.get('kdu-configuration')
1609 if descriptor_config and descriptor_config.get("juju"):
1610 vdu_id = None
1611 vdu_index = 0
1612 vdu_name = None
1613 # look for vdu index in the db_vnfr["vdu"] section
1614 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1615 # if vdur["vdu-id-ref"] == vdu_id:
1616 # break
1617 # else:
1618 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1619 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1620 # vdu_name = vdur.get("name")
1621 # vdu_name = None
1622
1623 self._deploy_n2vc(
1624 logging_text=logging_text,
1625 db_nsr=db_nsr,
1626 db_vnfr=db_vnfr,
1627 nslcmop_id=nslcmop_id,
1628 nsr_id=nsr_id,
1629 nsi_id=nsi_id,
1630 vnfd_id=vnfd_id,
1631 vdu_id=vdu_id,
1632 kdu_name=kdu_name,
1633 member_vnf_index=member_vnf_index,
1634 vdu_index=vdu_index,
1635 vdu_name=vdu_name,
1636 deploy_params=deploy_params,
1637 descriptor_config=descriptor_config,
1638 base_folder=base_folder,
1639 task_instantiation_list=task_instantiation_list,
1640 task_instantiation_info=task_instantiation_info
1641 )
1642
1643 # Check if this NS has a charm configuration
1644 descriptor_config = nsd.get("ns-configuration")
1645 if descriptor_config and descriptor_config.get("juju"):
1646 vnfd_id = None
1647 db_vnfr = None
1648 member_vnf_index = None
1649 vdu_id = None
1650 kdu_name = None
1651 vdu_index = 0
1652 vdu_name = None
1653
1654 # Get additional parameters
1655 deploy_params = {}
1656 if db_nsr.get("additionalParamsForNs"):
1657 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1658 base_folder = nsd["_admin"]["storage"]
1659 self._deploy_n2vc(
1660 logging_text=logging_text,
1661 db_nsr=db_nsr,
1662 db_vnfr=db_vnfr,
1663 nslcmop_id=nslcmop_id,
1664 nsr_id=nsr_id,
1665 nsi_id=nsi_id,
1666 vnfd_id=vnfd_id,
1667 vdu_id=vdu_id,
1668 kdu_name=kdu_name,
1669 member_vnf_index=member_vnf_index,
1670 vdu_index=vdu_index,
1671 vdu_name=vdu_name,
1672 deploy_params=deploy_params,
1673 descriptor_config=descriptor_config,
1674 base_folder=base_folder,
1675 task_instantiation_list=task_instantiation_list,
1676 task_instantiation_info=task_instantiation_info
1677 )
1678
1679 # Wait until all tasks of "task_instantiation_list" have been finished
1680
1681 error_text_list = []
1682
1683 # let's begin with all OK
1684 instantiated_ok = True
1685 # let's begin with RO 'running' status (later we can change it)
1686 db_nsr_update["operational-status"] = "running"
1687 # let's begin with VCA 'configured' status (later we can change it)
1688 db_nsr_update["config-status"] = "configured"
1689
1690 if task_instantiation_list:
1691 # wait for all tasks completion
1692 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1693
1694 for task in pending:
1695 instantiated_ok = False
1696 if task == task_ro:
1697 # RO task is pending
1698 db_nsr_update["operational-status"] = "failed"
1699 elif task == task_kdu:
1700 # KDU task is pending
1701 db_nsr_update["operational-status"] = "failed"
1702 else:
1703 # A N2VC task is pending
1704 db_nsr_update["config-status"] = "failed"
1705 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1706 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1707 for task in done:
1708 if task.cancelled():
1709 instantiated_ok = False
1710 if task == task_ro:
1711 # RO task was cancelled
1712 db_nsr_update["operational-status"] = "failed"
1713 elif task == task_kdu:
1714 # KDU task was cancelled
1715 db_nsr_update["operational-status"] = "failed"
1716 else:
1717 # A N2VC was cancelled
1718 db_nsr_update["config-status"] = "failed"
1719 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1720 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1721 else:
1722 exc = task.exception()
1723 if exc:
1724 instantiated_ok = False
1725 if task == task_ro:
1726 # RO task raised an exception
1727 db_nsr_update["operational-status"] = "failed"
1728 elif task == task_kdu:
1729 # KDU task raised an exception
1730 db_nsr_update["operational-status"] = "failed"
1731 else:
1732 # A N2VC task raised an exception
1733 db_nsr_update["config-status"] = "failed"
1734 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1735
1736 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1737 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1738 else:
1739 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1740 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1741 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1742 else:
1743 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1744
1745 if error_text_list:
1746 error_text = "\n".join(error_text_list)
1747 db_nsr_update["detailed-status"] = error_text
1748 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1749 db_nslcmop_update["detailed-status"] = error_text
1750 db_nslcmop_update["statusEnteredTime"] = time()
1751 else:
1752 # all is done
1753 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1754 db_nslcmop_update["statusEnteredTime"] = time()
1755 db_nslcmop_update["detailed-status"] = "done"
1756 db_nsr_update["detailed-status"] = "done"
1757
1758 except (ROclient.ROClientException, DbException, LcmException) as e:
1759 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1760 exc = e
1761 except asyncio.CancelledError:
1762 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1763 exc = "Operation was cancelled"
1764 except Exception as e:
1765 exc = traceback.format_exc()
1766 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1767 exc_info=True)
1768 finally:
1769 if exc:
1770 if db_nsr:
1771 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1772 db_nsr_update["operational-status"] = "failed"
1773 db_nsr_update["config-status"] = "failed"
1774 if db_nslcmop:
1775 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1776 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1777 db_nslcmop_update["statusEnteredTime"] = time()
1778 try:
1779 if db_nsr:
1780 db_nsr_update["_admin.nslcmop"] = None
1781 db_nsr_update["_admin.current-operation"] = None
1782 db_nsr_update["_admin.operation-type"] = None
1783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1784
1785 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1786 ns_state = None
1787 error_description = None
1788 if instantiated_ok:
1789 ns_state = "READY"
1790 else:
1791 ns_state = "BROKEN"
1792 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1793
1794 self._write_ns_status(
1795 nsr_id=nsr_id,
1796 ns_state=ns_state,
1797 current_operation="IDLE",
1798 current_operation_id=None,
1799 error_description=error_description
1800 )
1801
1802 self._write_op_status(
1803 op_id=nslcmop_id,
1804 error_message=error_description
1805 )
1806
1807 if db_nslcmop_update:
1808 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1809
1810 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1811
1812 except DbException as e:
1813 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1814
1815 if nslcmop_operation_state:
1816 try:
1817 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1818 "operationState": nslcmop_operation_state},
1819 loop=self.loop)
1820 except Exception as e:
1821 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1822
1823 self.logger.debug(logging_text + "Exit")
1824 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1825
1826 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1827 # Launch kdus if present in the descriptor
1828
1829 deployed_ok = True
1830
1831 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1832
1833 def _get_cluster_id(cluster_id, cluster_type):
1834 nonlocal k8scluster_id_2_uuic
1835 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1836 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1837
1838 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1839 if not db_k8scluster:
1840 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1841 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1842 if not k8s_id:
1843 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1844 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1845 return k8s_id
1846
1847 logging_text += "Deploy kdus: "
1848 try:
1849 db_nsr_update = {"_admin.deployed.K8s": []}
1850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1851
1852 # Look for all vnfds
1853 pending_tasks = {}
1854 index = 0
1855 for vnfr_data in db_vnfrs.values():
1856 for kdur in get_iterable(vnfr_data, "kdur"):
1857 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1858 kdumodel = None
1859 k8sclustertype = None
1860 error_text = None
1861 cluster_uuid = None
1862 if kdur.get("helm-chart"):
1863 kdumodel = kdur["helm-chart"]
1864 k8sclustertype = "chart"
1865 k8sclustertype_full = "helm-chart"
1866 elif kdur.get("juju-bundle"):
1867 kdumodel = kdur["juju-bundle"]
1868 k8sclustertype = "juju"
1869 k8sclustertype_full = "juju-bundle"
1870 else:
1871 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
1872 " running"
1873 try:
1874 if not error_text:
1875 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1876 except LcmException as e:
1877 error_text = str(e)
1878 deployed_ok = False
1879
1880 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1881
1882 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1883 "k8scluster-type": k8sclustertype,
1884 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1885 if error_text:
1886 k8s_instace_info["detailed-status"] = error_text
1887 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1888 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1889 if error_text:
1890 continue
1891
1892 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1893 "{}".format(index)}
1894 if k8sclustertype == "chart":
1895 task = asyncio.ensure_future(
1896 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1897 params=desc_params, db_dict=db_dict, timeout=3600)
1898 )
1899 else:
1900 task = asyncio.ensure_future(
1901 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1902 atomic=True, params=desc_params,
1903 db_dict=db_dict, timeout=600)
1904 )
1905
1906 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1907 index += 1
1908 if not pending_tasks:
1909 return
1910 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1911 pending_list = list(pending_tasks.keys())
1912 while pending_list:
1913 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1914 return_when=asyncio.FIRST_COMPLETED)
1915 if not done_list: # timeout
1916 for task in pending_list:
1917 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1918 deployed_ok = False
1919 break
1920 for task in done_list:
1921 exc = task.exception()
1922 if exc:
1923 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1924 deployed_ok = False
1925 else:
1926 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1927
1928 if not deployed_ok:
1929 raise LcmException('Cannot deploy KDUs')
1930
1931 except Exception as e:
1932 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1933 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1934 finally:
1935 if db_nsr_update:
1936 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1937
1938 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1939 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1940 base_folder, task_instantiation_list, task_instantiation_info):
1941 # launch instantiate_N2VC in a asyncio task and register task object
1942 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1943 # if not found, create one entry and update database
1944
1945 # fill db_nsr._admin.deployed.VCA.<index>
1946 vca_index = -1
1947 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1948 if not vca_deployed:
1949 continue
1950 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1951 vca_deployed.get("vdu_id") == vdu_id and \
1952 vca_deployed.get("kdu_name") == kdu_name and \
1953 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1954 break
1955 else:
1956 # not found, create one.
1957 vca_deployed = {
1958 "member-vnf-index": member_vnf_index,
1959 "vdu_id": vdu_id,
1960 "kdu_name": kdu_name,
1961 "vdu_count_index": vdu_index,
1962 "operational-status": "init", # TODO revise
1963 "detailed-status": "", # TODO revise
1964 "step": "initial-deploy", # TODO revise
1965 "vnfd_id": vnfd_id,
1966 "vdu_name": vdu_name,
1967 }
1968 vca_index += 1
1969
1970 # create VCA and configurationStatus in db
1971 db_dict = {
1972 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
1973 "configurationStatus.{}".format(vca_index): dict()
1974 }
1975 self.update_db_2("nsrs", nsr_id, db_dict)
1976
1977 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1978
1979 # Launch task
1980 task_n2vc = asyncio.ensure_future(
1981 self.instantiate_N2VC(
1982 logging_text=logging_text,
1983 vca_index=vca_index,
1984 nsi_id=nsi_id,
1985 db_nsr=db_nsr,
1986 db_vnfr=db_vnfr,
1987 vdu_id=vdu_id,
1988 kdu_name=kdu_name,
1989 vdu_index=vdu_index,
1990 deploy_params=deploy_params,
1991 config_descriptor=descriptor_config,
1992 base_folder=base_folder,
1993 nslcmop_id=nslcmop_id
1994 )
1995 )
1996 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1997 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1998 task_instantiation_list.append(task_n2vc)
1999
2000 # Check if this VNFD has a configured terminate action
2001 def _has_terminate_config_primitive(self, vnfd):
2002 vnf_config = vnfd.get("vnf-configuration")
2003 if vnf_config and vnf_config.get("terminate-config-primitive"):
2004 return True
2005 else:
2006 return False
2007
2008 @staticmethod
2009 def _get_terminate_config_primitive_seq_list(vnfd):
2010 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2011 # No need to check for existing primitive twice, already done before
2012 vnf_config = vnfd.get("vnf-configuration")
2013 seq_list = vnf_config.get("terminate-config-primitive")
2014 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2015 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2016 return seq_list_sorted
2017
2018 @staticmethod
2019 def _create_nslcmop(nsr_id, operation, params):
2020 """
2021 Creates a ns-lcm-opp content to be stored at database.
2022 :param nsr_id: internal id of the instance
2023 :param operation: instantiate, terminate, scale, action, ...
2024 :param params: user parameters for the operation
2025 :return: dictionary following SOL005 format
2026 """
2027 # Raise exception if invalid arguments
2028 if not (nsr_id and operation and params):
2029 raise LcmException(
2030 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2031 now = time()
2032 _id = str(uuid4())
2033 nslcmop = {
2034 "id": _id,
2035 "_id": _id,
2036 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2037 "operationState": "PROCESSING",
2038 "statusEnteredTime": now,
2039 "nsInstanceId": nsr_id,
2040 "lcmOperationType": operation,
2041 "startTime": now,
2042 "isAutomaticInvocation": False,
2043 "operationParams": params,
2044 "isCancelPending": False,
2045 "links": {
2046 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2047 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2048 }
2049 }
2050 return nslcmop
2051
2052 def _format_additional_params(self, params):
2053 params = params or {}
2054 for key, value in params.items():
2055 if str(value).startswith("!!yaml "):
2056 params[key] = yaml.safe_load(value[7:])
2057 return params
2058
2059 def _get_terminate_primitive_params(self, seq, vnf_index):
2060 primitive = seq.get('name')
2061 primitive_params = {}
2062 params = {
2063 "member_vnf_index": vnf_index,
2064 "primitive": primitive,
2065 "primitive_params": primitive_params,
2066 }
2067 desc_params = {}
2068 return self._map_primitive_params(seq, params, desc_params)
2069
2070 # sub-operations
2071
2072 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2073 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2074 if (op.get('operationState') == 'COMPLETED'):
2075 # b. Skip sub-operation
2076 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2077 return self.SUBOPERATION_STATUS_SKIP
2078 else:
2079 # c. Reintent executing sub-operation
2080 # The sub-operation exists, and operationState != 'COMPLETED'
2081 # Update operationState = 'PROCESSING' to indicate a reintent.
2082 operationState = 'PROCESSING'
2083 detailed_status = 'In progress'
2084 self._update_suboperation_status(
2085 db_nslcmop, op_index, operationState, detailed_status)
2086 # Return the sub-operation index
2087 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2088 # with arguments extracted from the sub-operation
2089 return op_index
2090
2091 # Find a sub-operation where all keys in a matching dictionary must match
2092 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2093 def _find_suboperation(self, db_nslcmop, match):
2094 if (db_nslcmop and match):
2095 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2096 for i, op in enumerate(op_list):
2097 if all(op.get(k) == match[k] for k in match):
2098 return i
2099 return self.SUBOPERATION_STATUS_NOT_FOUND
2100
2101 # Update status for a sub-operation given its index
2102 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2103 # Update DB for HA tasks
2104 q_filter = {'_id': db_nslcmop['_id']}
2105 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2106 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2107 self.db.set_one("nslcmops",
2108 q_filter=q_filter,
2109 update_dict=update_dict,
2110 fail_on_empty=False)
2111
2112 # Add sub-operation, return the index of the added sub-operation
2113 # Optionally, set operationState, detailed-status, and operationType
2114 # Status and type are currently set for 'scale' sub-operations:
2115 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2116 # 'detailed-status' : status message
2117 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2118 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2119 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2120 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2121 RO_nsr_id=None, RO_scaling_info=None):
2122 if not (db_nslcmop):
2123 return self.SUBOPERATION_STATUS_NOT_FOUND
2124 # Get the "_admin.operations" list, if it exists
2125 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2126 op_list = db_nslcmop_admin.get('operations')
2127 # Create or append to the "_admin.operations" list
2128 new_op = {'member_vnf_index': vnf_index,
2129 'vdu_id': vdu_id,
2130 'vdu_count_index': vdu_count_index,
2131 'primitive': primitive,
2132 'primitive_params': mapped_primitive_params}
2133 if operationState:
2134 new_op['operationState'] = operationState
2135 if detailed_status:
2136 new_op['detailed-status'] = detailed_status
2137 if operationType:
2138 new_op['lcmOperationType'] = operationType
2139 if RO_nsr_id:
2140 new_op['RO_nsr_id'] = RO_nsr_id
2141 if RO_scaling_info:
2142 new_op['RO_scaling_info'] = RO_scaling_info
2143 if not op_list:
2144 # No existing operations, create key 'operations' with current operation as first list element
2145 db_nslcmop_admin.update({'operations': [new_op]})
2146 op_list = db_nslcmop_admin.get('operations')
2147 else:
2148 # Existing operations, append operation to list
2149 op_list.append(new_op)
2150
2151 db_nslcmop_update = {'_admin.operations': op_list}
2152 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2153 op_index = len(op_list) - 1
2154 return op_index
2155
2156 # Helper methods for scale() sub-operations
2157
2158 # pre-scale/post-scale:
2159 # Check for 3 different cases:
2160 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2161 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2162 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2163 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2164 operationType, RO_nsr_id=None, RO_scaling_info=None):
2165 # Find this sub-operation
2166 if (RO_nsr_id and RO_scaling_info):
2167 operationType = 'SCALE-RO'
2168 match = {
2169 'member_vnf_index': vnf_index,
2170 'RO_nsr_id': RO_nsr_id,
2171 'RO_scaling_info': RO_scaling_info,
2172 }
2173 else:
2174 match = {
2175 'member_vnf_index': vnf_index,
2176 'primitive': vnf_config_primitive,
2177 'primitive_params': primitive_params,
2178 'lcmOperationType': operationType
2179 }
2180 op_index = self._find_suboperation(db_nslcmop, match)
2181 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2182 # a. New sub-operation
2183 # The sub-operation does not exist, add it.
2184 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2185 # The following parameters are set to None for all kind of scaling:
2186 vdu_id = None
2187 vdu_count_index = None
2188 vdu_name = None
2189 if (RO_nsr_id and RO_scaling_info):
2190 vnf_config_primitive = None
2191 primitive_params = None
2192 else:
2193 RO_nsr_id = None
2194 RO_scaling_info = None
2195 # Initial status for sub-operation
2196 operationState = 'PROCESSING'
2197 detailed_status = 'In progress'
2198 # Add sub-operation for pre/post-scaling (zero or more operations)
2199 self._add_suboperation(db_nslcmop,
2200 vnf_index,
2201 vdu_id,
2202 vdu_count_index,
2203 vdu_name,
2204 vnf_config_primitive,
2205 primitive_params,
2206 operationState,
2207 detailed_status,
2208 operationType,
2209 RO_nsr_id,
2210 RO_scaling_info)
2211 return self.SUBOPERATION_STATUS_NEW
2212 else:
2213 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2214 # or op_index (operationState != 'COMPLETED')
2215 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2216
2217 # Function to return execution_environment id
2218
2219 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2220 for vca in vca_deployed_list:
2221 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2222 return vca["ee_id"]
2223
2224 # Helper methods for terminate()
2225
2226 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2227 """ Create a primitive with params from VNFD
2228 Called from terminate() before deleting instance
2229 Calls action() to execute the primitive """
2230 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2231 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2232 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2233 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2234 db_vnfds = {}
2235 # Loop over VNFRs
2236 for vnfr in db_vnfrs_list:
2237 vnfd_id = vnfr["vnfd-id"]
2238 vnf_index = vnfr["member-vnf-index-ref"]
2239 if vnfd_id not in db_vnfds:
2240 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2241 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2242 db_vnfds[vnfd_id] = vnfd
2243 vnfd = db_vnfds[vnfd_id]
2244 if not self._has_terminate_config_primitive(vnfd):
2245 continue
2246 # Get the primitive's sorted sequence list
2247 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2248 for seq in seq_list:
2249 # For each sequence in list, get primitive and call _ns_execute_primitive()
2250 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2251 vnf_index, seq.get("name"))
2252 self.logger.debug(logging_text + step)
2253 # Create the primitive for each sequence, i.e. "primitive": "touch"
2254 primitive = seq.get('name')
2255 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2256 # The following 3 parameters are currently set to None for 'terminate':
2257 # vdu_id, vdu_count_index, vdu_name
2258 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2259 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2260 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2261 # Add sub-operation
2262 self._add_suboperation(db_nslcmop,
2263 nslcmop_id,
2264 vnf_index,
2265 vdu_id,
2266 vdu_count_index,
2267 vdu_name,
2268 primitive,
2269 mapped_primitive_params)
2270 # Sub-operations: Call _ns_execute_primitive() instead of action()
2271 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2272 # nsr_deployed = db_nsr["_admin"]["deployed"]
2273
2274 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2275 # nsr_id, nslcmop_terminate_action_id)
2276 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2277 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2278 # if result not in result_ok:
2279 # raise LcmException(
2280 # "terminate_primitive_action for vnf_member_index={}",
2281 # " primitive={} fails with error {}".format(
2282 # vnf_index, seq.get("name"), result_detail))
2283
2284 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2285 try:
2286 await self.n2vc.exec_primitive(
2287 ee_id=ee_id,
2288 primitive_name=primitive,
2289 params_dict=mapped_primitive_params
2290 )
2291 except Exception as e:
2292 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2293 raise LcmException(
2294 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2295 .format(vnf_index, seq.get("name"), e),
2296 )
2297
2298 async def _delete_N2VC(self, nsr_id: str):
2299 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2300 namespace = "." + nsr_id
2301 await self.n2vc.delete_namespace(namespace=namespace)
2302 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2303
2304 async def terminate(self, nsr_id, nslcmop_id):
2305
2306 # Try to lock HA task here
2307 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2308 if not task_is_locked_by_me:
2309 return
2310
2311 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2312 self.logger.debug(logging_text + "Enter")
2313 db_nsr = None
2314 db_nslcmop = None
2315 exc = None
2316 failed_detail = [] # annotates all failed error messages
2317 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2318 "_admin.current-operation": nslcmop_id,
2319 "_admin.operation-type": "terminate"}
2320 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2321 db_nslcmop_update = {}
2322 nslcmop_operation_state = None
2323 autoremove = False # autoremove after terminated
2324 pending_tasks = []
2325 try:
2326 # wait for any previous tasks in process
2327 step = "Waiting for previous operations to terminate"
2328 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2329
2330 self._write_ns_status(
2331 nsr_id=nsr_id,
2332 ns_state="TERMINATING",
2333 current_operation="TERMINATING",
2334 current_operation_id=nslcmop_id
2335 )
2336 self._write_op_status(
2337 op_id=nslcmop_id,
2338 queuePosition=0
2339 )
2340
2341 step = "Getting nslcmop={} from db".format(nslcmop_id)
2342 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2343 step = "Getting nsr={} from db".format(nsr_id)
2344 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2345 # nsd = db_nsr["nsd"]
2346 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2347 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2348 return
2349 # #TODO check if VIM is creating and wait
2350 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2351 # Call internal terminate action
2352 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2353
2354 pending_tasks = []
2355
2356 db_nsr_update["operational-status"] = "terminating"
2357 db_nsr_update["config-status"] = "terminating"
2358
2359 # remove NS
2360 try:
2361 step = "delete execution environment"
2362 self.logger.debug(logging_text + step)
2363
2364 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2365 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2366
2367 pending_tasks.append(task_delete_ee)
2368 except Exception as e:
2369 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2370 self.logger.error(msg)
2371 failed_detail.append(msg)
2372
2373 try:
2374 # Delete from k8scluster
2375 step = "delete kdus"
2376 self.logger.debug(logging_text + step)
2377 # print(nsr_deployed)
2378 if nsr_deployed:
2379 for kdu in nsr_deployed.get("K8s", ()):
2380 kdu_instance = kdu.get("kdu-instance")
2381 if not kdu_instance:
2382 continue
2383 if kdu.get("k8scluster-type") == "chart":
2384 task_delete_kdu_instance = asyncio.ensure_future(
2385 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2386 kdu_instance=kdu_instance))
2387 elif kdu.get("k8scluster-type") == "juju":
2388 task_delete_kdu_instance = asyncio.ensure_future(
2389 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2390 kdu_instance=kdu_instance))
2391 else:
2392 self.error(logging_text + "Unknown k8s deployment type {}".
2393 format(kdu.get("k8scluster-type")))
2394 continue
2395 pending_tasks.append(task_delete_kdu_instance)
2396 except LcmException as e:
2397 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2398 self.logger.error(msg)
2399 failed_detail.append(msg)
2400
2401 # remove from RO
2402 RO_fail = False
2403
2404 # Delete ns
2405 RO_nsr_id = RO_delete_action = None
2406 if nsr_deployed and nsr_deployed.get("RO"):
2407 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2408 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2409 try:
2410 if RO_nsr_id:
2411 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2412 "Deleting ns from VIM"
2413 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2414 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2415 self.logger.debug(logging_text + step)
2416 desc = await self.RO.delete("ns", RO_nsr_id)
2417 RO_delete_action = desc["action_id"]
2418 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2419 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2420 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2421 if RO_delete_action:
2422 # wait until NS is deleted from VIM
2423 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2424 format(RO_nsr_id, RO_delete_action)
2425 detailed_status_old = None
2426 self.logger.debug(logging_text + step)
2427
2428 delete_timeout = 20 * 60 # 20 minutes
2429 while delete_timeout > 0:
2430 desc = await self.RO.show(
2431 "ns",
2432 item_id_name=RO_nsr_id,
2433 extra_item="action",
2434 extra_item_id=RO_delete_action)
2435
2436 # deploymentStatus
2437 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2438
2439 ns_status, ns_status_info = self.RO.check_action_status(desc)
2440 if ns_status == "ERROR":
2441 raise ROclient.ROClientException(ns_status_info)
2442 elif ns_status == "BUILD":
2443 detailed_status = step + "; {}".format(ns_status_info)
2444 elif ns_status == "ACTIVE":
2445 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2446 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2447 break
2448 else:
2449 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2450 if detailed_status != detailed_status_old:
2451 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2452 db_nsr_update["detailed-status"] = detailed_status
2453 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2454 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2455 await asyncio.sleep(5, loop=self.loop)
2456 delete_timeout -= 5
2457 else: # delete_timeout <= 0:
2458 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2459
2460 except ROclient.ROClientException as e:
2461 if e.http_code == 404: # not found
2462 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2463 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2464 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2465 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2466 elif e.http_code == 409: # conflict
2467 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2468 self.logger.debug(logging_text + failed_detail[-1])
2469 RO_fail = True
2470 else:
2471 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2472 self.logger.error(logging_text + failed_detail[-1])
2473 RO_fail = True
2474
2475 # Delete nsd
2476 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2477 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2478 try:
2479 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2480 "Deleting nsd from RO"
2481 await self.RO.delete("nsd", RO_nsd_id)
2482 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2483 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2484 except ROclient.ROClientException as e:
2485 if e.http_code == 404: # not found
2486 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2487 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2488 elif e.http_code == 409: # conflict
2489 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2490 self.logger.debug(logging_text + failed_detail[-1])
2491 RO_fail = True
2492 else:
2493 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2494 self.logger.error(logging_text + failed_detail[-1])
2495 RO_fail = True
2496
2497 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2498 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2499 if not vnf_deployed or not vnf_deployed["id"]:
2500 continue
2501 try:
2502 RO_vnfd_id = vnf_deployed["id"]
2503 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2504 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2505 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2506 await self.RO.delete("vnfd", RO_vnfd_id)
2507 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2508 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2509 except ROclient.ROClientException as e:
2510 if e.http_code == 404: # not found
2511 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2512 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2513 elif e.http_code == 409: # conflict
2514 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2515 self.logger.debug(logging_text + failed_detail[-1])
2516 else:
2517 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2518 self.logger.error(logging_text + failed_detail[-1])
2519
2520 if failed_detail:
2521 terminate_ok = False
2522 self.logger.error(logging_text + " ;".join(failed_detail))
2523 db_nsr_update["operational-status"] = "failed"
2524 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2525 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2526 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2527 db_nslcmop_update["statusEnteredTime"] = time()
2528 else:
2529 terminate_ok = True
2530 db_nsr_update["operational-status"] = "terminated"
2531 db_nsr_update["detailed-status"] = "Done"
2532 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2533 db_nslcmop_update["detailed-status"] = "Done"
2534 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2535 db_nslcmop_update["statusEnteredTime"] = time()
2536 if db_nslcmop["operationParams"].get("autoremove"):
2537 autoremove = True
2538
2539 except (ROclient.ROClientException, DbException, LcmException) as e:
2540 self.logger.error(logging_text + "Exit Exception {}".format(e))
2541 exc = e
2542 except asyncio.CancelledError:
2543 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2544 exc = "Operation was cancelled"
2545 except Exception as e:
2546 exc = traceback.format_exc()
2547 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2548 finally:
2549 if exc and db_nslcmop:
2550 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2551 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2552 db_nslcmop_update["statusEnteredTime"] = time()
2553 try:
2554 if db_nslcmop and db_nslcmop_update:
2555 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2556 if db_nsr:
2557 db_nsr_update["_admin.nslcmop"] = None
2558 db_nsr_update["_admin.current-operation"] = None
2559 db_nsr_update["_admin.operation-type"] = None
2560 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2561
2562 if terminate_ok:
2563 ns_state = "IDLE"
2564 error_description = None
2565 error_detail = None
2566 else:
2567 ns_state = "BROKEN"
2568 error_detail = "; ".join(failed_detail)
2569 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2570 .format(nslcmop_id, step, error_detail)
2571
2572 self._write_ns_status(
2573 nsr_id=nsr_id,
2574 ns_state=ns_state,
2575 current_operation="IDLE",
2576 current_operation_id=None,
2577 error_description=error_description
2578 )
2579
2580 self._write_op_status(
2581 op_id=nslcmop_id,
2582 error_message=error_description
2583 )
2584
2585 except DbException as e:
2586 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2587 if nslcmop_operation_state:
2588 try:
2589 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2590 "operationState": nslcmop_operation_state,
2591 "autoremove": autoremove},
2592 loop=self.loop)
2593 except Exception as e:
2594 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2595
2596 # wait for pending tasks
2597 done = None
2598 pending = None
2599 if pending_tasks:
2600 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2601 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2602 if not pending:
2603 self.logger.debug(logging_text + 'All tasks finished...')
2604 else:
2605 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2606
2607 self.logger.debug(logging_text + "Exit")
2608 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2609
2610 @staticmethod
2611 def _map_primitive_params(primitive_desc, params, instantiation_params):
2612 """
2613 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2614 The default-value is used. If it is between < > it look for a value at instantiation_params
2615 :param primitive_desc: portion of VNFD/NSD that describes primitive
2616 :param params: Params provided by user
2617 :param instantiation_params: Instantiation params provided by user
2618 :return: a dictionary with the calculated params
2619 """
2620 calculated_params = {}
2621 for parameter in primitive_desc.get("parameter", ()):
2622 param_name = parameter["name"]
2623 if param_name in params:
2624 calculated_params[param_name] = params[param_name]
2625 elif "default-value" in parameter or "value" in parameter:
2626 if "value" in parameter:
2627 calculated_params[param_name] = parameter["value"]
2628 else:
2629 calculated_params[param_name] = parameter["default-value"]
2630 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2631 and calculated_params[param_name].endswith(">"):
2632 if calculated_params[param_name][1:-1] in instantiation_params:
2633 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2634 else:
2635 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2636 format(calculated_params[param_name], primitive_desc["name"]))
2637 else:
2638 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2639 format(param_name, primitive_desc["name"]))
2640
2641 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2642 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2643 width=256)
2644 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2645 calculated_params[param_name] = calculated_params[param_name][7:]
2646
2647 # add always ns_config_info if primitive name is config
2648 if primitive_desc["name"] == "config":
2649 if "ns_config_info" in instantiation_params:
2650 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2651 return calculated_params
2652
2653 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2654 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2655
2656 # find vca_deployed record for this action
2657 try:
2658 for vca_deployed in db_deployed["VCA"]:
2659 if not vca_deployed:
2660 continue
2661 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2662 continue
2663 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2664 continue
2665 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2666 continue
2667 break
2668 else:
2669 # vca_deployed not found
2670 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2671 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2672
2673 # get ee_id
2674 ee_id = vca_deployed.get("ee_id")
2675 if not ee_id:
2676 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2677 "execution environment"
2678 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2679
2680 if primitive == "config":
2681 primitive_params = {"params": primitive_params}
2682
2683 while retries >= 0:
2684 try:
2685 output = await self.n2vc.exec_primitive(
2686 ee_id=ee_id,
2687 primitive_name=primitive,
2688 params_dict=primitive_params
2689 )
2690 # execution was OK
2691 break
2692 except Exception as e:
2693 retries -= 1
2694 if retries >= 0:
2695 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2696 # wait and retry
2697 await asyncio.sleep(retries_interval, loop=self.loop)
2698 else:
2699 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2700
2701 return output, 'OK'
2702
2703 except Exception as e:
2704 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2705
2706 async def action(self, nsr_id, nslcmop_id):
2707
2708 # Try to lock HA task here
2709 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2710 if not task_is_locked_by_me:
2711 return
2712
2713 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2714 self.logger.debug(logging_text + "Enter")
2715 # get all needed from database
2716 db_nsr = None
2717 db_nslcmop = None
2718 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2719 "_admin.current-operation": nslcmop_id,
2720 "_admin.operation-type": "action"}
2721 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2722 db_nslcmop_update = {}
2723 nslcmop_operation_state = None
2724 nslcmop_operation_state_detail = None
2725 exc = None
2726 try:
2727 # wait for any previous tasks in process
2728 step = "Waiting for previous operations to terminate"
2729 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2730
2731 self._write_ns_status(
2732 nsr_id=nsr_id,
2733 ns_state=None,
2734 current_operation="RUNNING ACTION",
2735 current_operation_id=nslcmop_id
2736 )
2737
2738 step = "Getting information from database"
2739 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2740 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2741
2742 nsr_deployed = db_nsr["_admin"].get("deployed")
2743 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2744 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2745 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2746 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2747 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2748
2749 if vnf_index:
2750 step = "Getting vnfr from database"
2751 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2752 step = "Getting vnfd from database"
2753 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2754 else:
2755 if db_nsr.get("nsd"):
2756 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2757 else:
2758 step = "Getting nsd from database"
2759 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2760
2761 # for backward compatibility
2762 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2763 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2764 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2765 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2766
2767 primitive = db_nslcmop["operationParams"]["primitive"]
2768 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2769
2770 # look for primitive
2771 config_primitive_desc = None
2772 if vdu_id:
2773 for vdu in get_iterable(db_vnfd, "vdu"):
2774 if vdu_id == vdu["id"]:
2775 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2776 if config_primitive["name"] == primitive:
2777 config_primitive_desc = config_primitive
2778 break
2779 elif kdu_name:
2780 self.logger.debug(logging_text + "Checking actions in KDUs")
2781 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2782 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2783 if primitive_params:
2784 desc_params.update(primitive_params)
2785 # TODO Check if we will need something at vnf level
2786 index = 0
2787 for kdu in get_iterable(nsr_deployed, "K8s"):
2788 if kdu_name == kdu["kdu-name"]:
2789 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2790 "path": "_admin.deployed.K8s.{}".format(index)}
2791 if primitive == "upgrade":
2792 if desc_params.get("kdu_model"):
2793 kdu_model = desc_params.get("kdu_model")
2794 del desc_params["kdu_model"]
2795 else:
2796 kdu_model = kdu.get("kdu-model")
2797 parts = kdu_model.split(sep=":")
2798 if len(parts) == 2:
2799 kdu_model = parts[0]
2800
2801 if kdu.get("k8scluster-type") == "chart":
2802 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2803 kdu_instance=kdu.get("kdu-instance"),
2804 atomic=True, kdu_model=kdu_model,
2805 params=desc_params, db_dict=db_dict,
2806 timeout=300)
2807 elif kdu.get("k8scluster-type") == "juju":
2808 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2809 kdu_instance=kdu.get("kdu-instance"),
2810 atomic=True, kdu_model=kdu_model,
2811 params=desc_params, db_dict=db_dict,
2812 timeout=300)
2813
2814 else:
2815 msg = "k8scluster-type not defined"
2816 raise LcmException(msg)
2817
2818 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2819 break
2820 elif primitive == "rollback":
2821 if kdu.get("k8scluster-type") == "chart":
2822 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2823 kdu_instance=kdu.get("kdu-instance"),
2824 db_dict=db_dict)
2825 elif kdu.get("k8scluster-type") == "juju":
2826 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2827 kdu_instance=kdu.get("kdu-instance"),
2828 db_dict=db_dict)
2829 else:
2830 msg = "k8scluster-type not defined"
2831 raise LcmException(msg)
2832 break
2833 elif primitive == "status":
2834 if kdu.get("k8scluster-type") == "chart":
2835 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2836 kdu_instance=kdu.get("kdu-instance"))
2837 elif kdu.get("k8scluster-type") == "juju":
2838 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2839 kdu_instance=kdu.get("kdu-instance"))
2840 else:
2841 msg = "k8scluster-type not defined"
2842 raise LcmException(msg)
2843 break
2844 index += 1
2845
2846 else:
2847 raise LcmException("KDU '{}' not found".format(kdu_name))
2848 if output:
2849 db_nslcmop_update["detailed-status"] = output
2850 db_nslcmop_update["operationState"] = 'COMPLETED'
2851 db_nslcmop_update["statusEnteredTime"] = time()
2852 else:
2853 db_nslcmop_update["detailed-status"] = ''
2854 db_nslcmop_update["operationState"] = 'FAILED'
2855 db_nslcmop_update["statusEnteredTime"] = time()
2856 return
2857 elif vnf_index:
2858 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2859 if config_primitive["name"] == primitive:
2860 config_primitive_desc = config_primitive
2861 break
2862 else:
2863 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2864 if config_primitive["name"] == primitive:
2865 config_primitive_desc = config_primitive
2866 break
2867
2868 if not config_primitive_desc:
2869 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2870 format(primitive))
2871
2872 desc_params = {}
2873 if vnf_index:
2874 if db_vnfr.get("additionalParamsForVnf"):
2875 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2876 if vdu_id:
2877 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2878 if vdur.get("additionalParams"):
2879 desc_params = self._format_additional_params(vdur["additionalParams"])
2880 else:
2881 if db_nsr.get("additionalParamsForNs"):
2882 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2883
2884 # TODO check if ns is in a proper status
2885 output, detail = await self._ns_execute_primitive(
2886 db_deployed=nsr_deployed,
2887 member_vnf_index=vnf_index,
2888 vdu_id=vdu_id,
2889 vdu_name=vdu_name,
2890 vdu_count_index=vdu_count_index,
2891 primitive=primitive,
2892 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2893
2894 detailed_status = output
2895 if detail == 'OK':
2896 result = 'COMPLETED'
2897 else:
2898 result = 'FAILED'
2899
2900 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2901 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2902 db_nslcmop_update["statusEnteredTime"] = time()
2903 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2904 return # database update is called inside finally
2905
2906 except (DbException, LcmException) as e:
2907 self.logger.error(logging_text + "Exit Exception {}".format(e))
2908 exc = e
2909 except asyncio.CancelledError:
2910 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2911 exc = "Operation was cancelled"
2912 except Exception as e:
2913 exc = traceback.format_exc()
2914 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2915 finally:
2916 if exc and db_nslcmop:
2917 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2918 "FAILED {}: {}".format(step, exc)
2919 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2920 db_nslcmop_update["statusEnteredTime"] = time()
2921 try:
2922 if db_nslcmop_update:
2923 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2924 if db_nsr:
2925 db_nsr_update["_admin.nslcmop"] = None
2926 db_nsr_update["_admin.operation-type"] = None
2927 db_nsr_update["_admin.nslcmop"] = None
2928 db_nsr_update["_admin.current-operation"] = None
2929 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2930 self._write_ns_status(
2931 nsr_id=nsr_id,
2932 ns_state=None,
2933 current_operation="IDLE",
2934 current_operation_id=None
2935 )
2936 if exc:
2937 self._write_op_status(
2938 op_id=nslcmop_id,
2939 error_message=nslcmop_operation_state_detail
2940 )
2941 except DbException as e:
2942 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2943 self.logger.debug(logging_text + "Exit")
2944 if nslcmop_operation_state:
2945 try:
2946 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2947 "operationState": nslcmop_operation_state},
2948 loop=self.loop)
2949 except Exception as e:
2950 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2951 self.logger.debug(logging_text + "Exit")
2952 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2953 return nslcmop_operation_state, nslcmop_operation_state_detail
2954
2955 async def scale(self, nsr_id, nslcmop_id):
2956
2957 # Try to lock HA task here
2958 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2959 if not task_is_locked_by_me:
2960 return
2961
2962 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2963 self.logger.debug(logging_text + "Enter")
2964 # get all needed from database
2965 db_nsr = None
2966 db_nslcmop = None
2967 db_nslcmop_update = {}
2968 nslcmop_operation_state = None
2969 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2970 "_admin.current-operation": nslcmop_id,
2971 "_admin.operation-type": "scale"}
2972 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2973 exc = None
2974 # in case of error, indicates what part of scale was failed to put nsr at error status
2975 scale_process = None
2976 old_operational_status = ""
2977 old_config_status = ""
2978 vnfr_scaled = False
2979 try:
2980 # wait for any previous tasks in process
2981 step = "Waiting for previous operations to terminate"
2982 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2983
2984 self._write_ns_status(
2985 nsr_id=nsr_id,
2986 ns_state=None,
2987 current_operation="SCALING",
2988 current_operation_id=nslcmop_id
2989 )
2990
2991 step = "Getting nslcmop from database"
2992 self.logger.debug(step + " after having waited for previous tasks to be completed")
2993 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2994 step = "Getting nsr from database"
2995 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2996
2997 old_operational_status = db_nsr["operational-status"]
2998 old_config_status = db_nsr["config-status"]
2999 step = "Parsing scaling parameters"
3000 # self.logger.debug(step)
3001 db_nsr_update["operational-status"] = "scaling"
3002 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3003 nsr_deployed = db_nsr["_admin"].get("deployed")
3004
3005 #######
3006 nsr_deployed = db_nsr["_admin"].get("deployed")
3007 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3008 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3009 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3010 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3011 #######
3012
3013 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3014 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3015 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3016 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3017 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3018
3019 # for backward compatibility
3020 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3021 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3022 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3023 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3024
3025 step = "Getting vnfr from database"
3026 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3027 step = "Getting vnfd from database"
3028 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3029
3030 step = "Getting scaling-group-descriptor"
3031 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3032 if scaling_descriptor["name"] == scaling_group:
3033 break
3034 else:
3035 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3036 "at vnfd:scaling-group-descriptor".format(scaling_group))
3037
3038 # cooldown_time = 0
3039 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3040 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3041 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3042 # break
3043
3044 # TODO check if ns is in a proper status
3045 step = "Sending scale order to VIM"
3046 nb_scale_op = 0
3047 if not db_nsr["_admin"].get("scaling-group"):
3048 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3049 admin_scale_index = 0
3050 else:
3051 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3052 if admin_scale_info["name"] == scaling_group:
3053 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3054 break
3055 else: # not found, set index one plus last element and add new entry with the name
3056 admin_scale_index += 1
3057 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3058 RO_scaling_info = []
3059 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3060 if scaling_type == "SCALE_OUT":
3061 # count if max-instance-count is reached
3062 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3063 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3064 if nb_scale_op >= max_instance_count:
3065 raise LcmException("reached the limit of {} (max-instance-count) "
3066 "scaling-out operations for the "
3067 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3068
3069 nb_scale_op += 1
3070 vdu_scaling_info["scaling_direction"] = "OUT"
3071 vdu_scaling_info["vdu-create"] = {}
3072 for vdu_scale_info in scaling_descriptor["vdu"]:
3073 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3074 "type": "create", "count": vdu_scale_info.get("count", 1)})
3075 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3076
3077 elif scaling_type == "SCALE_IN":
3078 # count if min-instance-count is reached
3079 min_instance_count = 0
3080 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3081 min_instance_count = int(scaling_descriptor["min-instance-count"])
3082 if nb_scale_op <= min_instance_count:
3083 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3084 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3085 nb_scale_op -= 1
3086 vdu_scaling_info["scaling_direction"] = "IN"
3087 vdu_scaling_info["vdu-delete"] = {}
3088 for vdu_scale_info in scaling_descriptor["vdu"]:
3089 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3090 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3091 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3092
3093 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3094 vdu_create = vdu_scaling_info.get("vdu-create")
3095 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3096 if vdu_scaling_info["scaling_direction"] == "IN":
3097 for vdur in reversed(db_vnfr["vdur"]):
3098 if vdu_delete.get(vdur["vdu-id-ref"]):
3099 vdu_delete[vdur["vdu-id-ref"]] -= 1
3100 vdu_scaling_info["vdu"].append({
3101 "name": vdur["name"],
3102 "vdu_id": vdur["vdu-id-ref"],
3103 "interface": []
3104 })
3105 for interface in vdur["interfaces"]:
3106 vdu_scaling_info["vdu"][-1]["interface"].append({
3107 "name": interface["name"],
3108 "ip_address": interface["ip-address"],
3109 "mac_address": interface.get("mac-address"),
3110 })
3111 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3112
3113 # PRE-SCALE BEGIN
3114 step = "Executing pre-scale vnf-config-primitive"
3115 if scaling_descriptor.get("scaling-config-action"):
3116 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3117 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3118 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3119 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3120 step = db_nslcmop_update["detailed-status"] = \
3121 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3122
3123 # look for primitive
3124 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3125 if config_primitive["name"] == vnf_config_primitive:
3126 break
3127 else:
3128 raise LcmException(
3129 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3130 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3131 "primitive".format(scaling_group, config_primitive))
3132
3133 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3134 if db_vnfr.get("additionalParamsForVnf"):
3135 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3136
3137 scale_process = "VCA"
3138 db_nsr_update["config-status"] = "configuring pre-scaling"
3139 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3140
3141 # Pre-scale reintent check: Check if this sub-operation has been executed before
3142 op_index = self._check_or_add_scale_suboperation(
3143 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3144 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3145 # Skip sub-operation
3146 result = 'COMPLETED'
3147 result_detail = 'Done'
3148 self.logger.debug(logging_text +
3149 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3150 vnf_config_primitive, result, result_detail))
3151 else:
3152 if (op_index == self.SUBOPERATION_STATUS_NEW):
3153 # New sub-operation: Get index of this sub-operation
3154 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3155 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3156 format(vnf_config_primitive))
3157 else:
3158 # Reintent: Get registered params for this existing sub-operation
3159 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3160 vnf_index = op.get('member_vnf_index')
3161 vnf_config_primitive = op.get('primitive')
3162 primitive_params = op.get('primitive_params')
3163 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3164 format(vnf_config_primitive))
3165 # Execute the primitive, either with new (first-time) or registered (reintent) args
3166 result, result_detail = await self._ns_execute_primitive(
3167 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3168 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3169 vnf_config_primitive, result, result_detail))
3170 # Update operationState = COMPLETED | FAILED
3171 self._update_suboperation_status(
3172 db_nslcmop, op_index, result, result_detail)
3173
3174 if result == "FAILED":
3175 raise LcmException(result_detail)
3176 db_nsr_update["config-status"] = old_config_status
3177 scale_process = None
3178 # PRE-SCALE END
3179
3180 # SCALE RO - BEGIN
3181 # Should this block be skipped if 'RO_nsr_id' == None ?
3182 # if (RO_nsr_id and RO_scaling_info):
3183 if RO_scaling_info:
3184 scale_process = "RO"
3185 # Scale RO reintent check: Check if this sub-operation has been executed before
3186 op_index = self._check_or_add_scale_suboperation(
3187 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3188 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3189 # Skip sub-operation
3190 result = 'COMPLETED'
3191 result_detail = 'Done'
3192 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3193 result, result_detail))
3194 else:
3195 if (op_index == self.SUBOPERATION_STATUS_NEW):
3196 # New sub-operation: Get index of this sub-operation
3197 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3198 self.logger.debug(logging_text + "New sub-operation RO")
3199 else:
3200 # Reintent: Get registered params for this existing sub-operation
3201 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3202 RO_nsr_id = op.get('RO_nsr_id')
3203 RO_scaling_info = op.get('RO_scaling_info')
3204 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3205 vnf_config_primitive))
3206
3207 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3208 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3209 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3210 # wait until ready
3211 RO_nslcmop_id = RO_desc["instance_action_id"]
3212 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3213
3214 RO_task_done = False
3215 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3216 detailed_status_old = None
3217 self.logger.debug(logging_text + step)
3218
3219 deployment_timeout = 1 * 3600 # One hour
3220 while deployment_timeout > 0:
3221 if not RO_task_done:
3222 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3223 extra_item_id=RO_nslcmop_id)
3224
3225 # deploymentStatus
3226 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3227
3228 ns_status, ns_status_info = self.RO.check_action_status(desc)
3229 if ns_status == "ERROR":
3230 raise ROclient.ROClientException(ns_status_info)
3231 elif ns_status == "BUILD":
3232 detailed_status = step + "; {}".format(ns_status_info)
3233 elif ns_status == "ACTIVE":
3234 RO_task_done = True
3235 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3236 self.logger.debug(logging_text + step)
3237 else:
3238 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3239 else:
3240
3241 if ns_status == "ERROR":
3242 raise ROclient.ROClientException(ns_status_info)
3243 elif ns_status == "BUILD":
3244 detailed_status = step + "; {}".format(ns_status_info)
3245 elif ns_status == "ACTIVE":
3246 step = detailed_status = \
3247 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3248 if not vnfr_scaled:
3249 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3250 vnfr_scaled = True
3251 try:
3252 desc = await self.RO.show("ns", RO_nsr_id)
3253
3254 # deploymentStatus
3255 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3256
3257 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3258 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3259 break
3260 except LcmExceptionNoMgmtIP:
3261 pass
3262 else:
3263 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3264 if detailed_status != detailed_status_old:
3265 self._update_suboperation_status(
3266 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3267 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3268 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3269
3270 await asyncio.sleep(5, loop=self.loop)
3271 deployment_timeout -= 5
3272 if deployment_timeout <= 0:
3273 self._update_suboperation_status(
3274 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3275 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3276
3277 # update VDU_SCALING_INFO with the obtained ip_addresses
3278 if vdu_scaling_info["scaling_direction"] == "OUT":
3279 for vdur in reversed(db_vnfr["vdur"]):
3280 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3281 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3282 vdu_scaling_info["vdu"].append({
3283 "name": vdur["name"],
3284 "vdu_id": vdur["vdu-id-ref"],
3285 "interface": []
3286 })
3287 for interface in vdur["interfaces"]:
3288 vdu_scaling_info["vdu"][-1]["interface"].append({
3289 "name": interface["name"],
3290 "ip_address": interface["ip-address"],
3291 "mac_address": interface.get("mac-address"),
3292 })
3293 del vdu_scaling_info["vdu-create"]
3294
3295 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3296 # SCALE RO - END
3297
3298 scale_process = None
3299 if db_nsr_update:
3300 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3301
3302 # POST-SCALE BEGIN
3303 # execute primitive service POST-SCALING
3304 step = "Executing post-scale vnf-config-primitive"
3305 if scaling_descriptor.get("scaling-config-action"):
3306 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3307 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3308 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3309 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3310 step = db_nslcmop_update["detailed-status"] = \
3311 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3312
3313 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3314 if db_vnfr.get("additionalParamsForVnf"):
3315 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3316
3317 # look for primitive
3318 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3319 if config_primitive["name"] == vnf_config_primitive:
3320 break
3321 else:
3322 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3323 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3324 "match any vnf-configuration:config-primitive".format(scaling_group,
3325 config_primitive))
3326 scale_process = "VCA"
3327 db_nsr_update["config-status"] = "configuring post-scaling"
3328 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3329
3330 # Post-scale reintent check: Check if this sub-operation has been executed before
3331 op_index = self._check_or_add_scale_suboperation(
3332 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3333 if op_index == self.SUBOPERATION_STATUS_SKIP:
3334 # Skip sub-operation
3335 result = 'COMPLETED'
3336 result_detail = 'Done'
3337 self.logger.debug(logging_text +
3338 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3339 format(vnf_config_primitive, result, result_detail))
3340 else:
3341 if op_index == self.SUBOPERATION_STATUS_NEW:
3342 # New sub-operation: Get index of this sub-operation
3343 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3344 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3345 format(vnf_config_primitive))
3346 else:
3347 # Reintent: Get registered params for this existing sub-operation
3348 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3349 vnf_index = op.get('member_vnf_index')
3350 vnf_config_primitive = op.get('primitive')
3351 primitive_params = op.get('primitive_params')
3352 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3353 format(vnf_config_primitive))
3354 # Execute the primitive, either with new (first-time) or registered (reintent) args
3355 result, result_detail = await self._ns_execute_primitive(
3356 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3357 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3358 vnf_config_primitive, result, result_detail))
3359 # Update operationState = COMPLETED | FAILED
3360 self._update_suboperation_status(
3361 db_nslcmop, op_index, result, result_detail)
3362
3363 if result == "FAILED":
3364 raise LcmException(result_detail)
3365 db_nsr_update["config-status"] = old_config_status
3366 scale_process = None
3367 # POST-SCALE END
3368
3369 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3370 db_nslcmop_update["statusEnteredTime"] = time()
3371 db_nslcmop_update["detailed-status"] = "done"
3372 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3373 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3374 else old_operational_status
3375 db_nsr_update["config-status"] = old_config_status
3376 return
3377 except (ROclient.ROClientException, DbException, LcmException) as e:
3378 self.logger.error(logging_text + "Exit Exception {}".format(e))
3379 exc = e
3380 except asyncio.CancelledError:
3381 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3382 exc = "Operation was cancelled"
3383 except Exception as e:
3384 exc = traceback.format_exc()
3385 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3386 finally:
3387 self._write_ns_status(
3388 nsr_id=nsr_id,
3389 ns_state=None,
3390 current_operation="IDLE",
3391 current_operation_id=None
3392 )
3393 if exc:
3394 if db_nslcmop:
3395 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3396 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3397 db_nslcmop_update["statusEnteredTime"] = time()
3398 if db_nsr:
3399 db_nsr_update["operational-status"] = old_operational_status
3400 db_nsr_update["config-status"] = old_config_status
3401 db_nsr_update["detailed-status"] = ""
3402 db_nsr_update["_admin.nslcmop"] = None
3403 if scale_process:
3404 if "VCA" in scale_process:
3405 db_nsr_update["config-status"] = "failed"
3406 if "RO" in scale_process:
3407 db_nsr_update["operational-status"] = "failed"
3408 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3409 exc)
3410 try:
3411 if db_nslcmop and db_nslcmop_update:
3412 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3413 if db_nsr:
3414 db_nsr_update["_admin.current-operation"] = None
3415 db_nsr_update["_admin.operation-type"] = None
3416 db_nsr_update["_admin.nslcmop"] = None
3417 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3418
3419 self._write_ns_status(
3420 nsr_id=nsr_id,
3421 ns_state=None,
3422 current_operation="IDLE",
3423 current_operation_id=None
3424 )
3425
3426 except DbException as e:
3427 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3428 if nslcmop_operation_state:
3429 try:
3430 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3431 "operationState": nslcmop_operation_state},
3432 loop=self.loop)
3433 # if cooldown_time:
3434 # await asyncio.sleep(cooldown_time, loop=self.loop)
3435 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3436 except Exception as e:
3437 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3438 self.logger.debug(logging_text + "Exit")
3439 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")