Fix Bug 994: Add fs.sync before deploying charm
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
619
620 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
621 vdur_RO_count_index = 0
622 if vdur.get("pdu-type"):
623 continue
624 for vdur_RO in get_iterable(vnf_RO, "vms"):
625 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
626 continue
627 if vdur["count-index"] != vdur_RO_count_index:
628 vdur_RO_count_index += 1
629 continue
630 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
631 if vdur_RO.get("ip_address"):
632 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
633 else:
634 vdur["ip-address"] = None
635 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
636 vdur["name"] = vdur_RO.get("vim_name")
637 vdur["status"] = vdur_RO.get("status")
638 vdur["status-detailed"] = vdur_RO.get("error_msg")
639 for ifacer in get_iterable(vdur, "interfaces"):
640 for interface_RO in get_iterable(vdur_RO, "interfaces"):
641 if ifacer["name"] == interface_RO.get("internal_name"):
642 ifacer["ip-address"] = interface_RO.get("ip_address")
643 ifacer["mac-address"] = interface_RO.get("mac_address")
644 break
645 else:
646 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
647 "from VIM info"
648 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
649 vnfr_update["vdur.{}".format(vdu_index)] = vdur
650 break
651 else:
652 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
653 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
654
655 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
656 for net_RO in get_iterable(nsr_desc_RO, "nets"):
657 if vld["id"] != net_RO.get("vnf_net_osm_id"):
658 continue
659 vld["vim-id"] = net_RO.get("vim_net_id")
660 vld["name"] = net_RO.get("vim_name")
661 vld["status"] = net_RO.get("status")
662 vld["status-detailed"] = net_RO.get("error_msg")
663 vnfr_update["vld.{}".format(vld_index)] = vld
664 break
665 else:
666 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
667 vnf_index, vld["id"]))
668
669 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
670 break
671
672 else:
673 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
674
675 def _get_ns_config_info(self, nsr_id):
676 """
677 Generates a mapping between vnf,vdu elements and the N2VC id
678 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
679 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
680 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
681 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
682 """
683 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
684 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
685 mapping = {}
686 ns_config_info = {"osm-config-mapping": mapping}
687 for vca in vca_deployed_list:
688 if not vca["member-vnf-index"]:
689 continue
690 if not vca["vdu_id"]:
691 mapping[vca["member-vnf-index"]] = vca["application"]
692 else:
693 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
694 vca["application"]
695 return ns_config_info
696
697 @staticmethod
698 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
699 """
700 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
701 primitives as verify-ssh-credentials, or config when needed
702 :param desc_primitive_list: information of the descriptor
703 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
704 this element contains a ssh public key
705 :return: The modified list. Can ba an empty list, but always a list
706 """
707 if desc_primitive_list:
708 primitive_list = desc_primitive_list.copy()
709 else:
710 primitive_list = []
711 # look for primitive config, and get the position. None if not present
712 config_position = None
713 for index, primitive in enumerate(primitive_list):
714 if primitive["name"] == "config":
715 config_position = index
716 break
717
718 # for NS, add always a config primitive if not present (bug 874)
719 if not vca_deployed["member-vnf-index"] and config_position is None:
720 primitive_list.insert(0, {"name": "config", "parameter": []})
721 config_position = 0
722 # for VNF/VDU add verify-ssh-credentials after config
723 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
724 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
725 return primitive_list
726
727 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
728 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
729
730 db_nsr_update = {}
731 RO_descriptor_number = 0 # number of descriptors created at RO
732 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
733 start_deploy = time()
734 vdu_flag = False # If any of the VNFDs has VDUs
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # deploy RO
742
743 # get vnfds, instantiate at RO
744
745 for c_vnf in nsd.get("constituent-vnfd", ()):
746 member_vnf_index = c_vnf["member-vnf-index"]
747 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
748 if vnfd.get("vdu"):
749 vdu_flag = True
750 vnfd_ref = vnfd["id"]
751 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
752 " RO".format(vnfd_ref, member_vnf_index)
753 # self.logger.debug(logging_text + step)
754 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
755 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
756 RO_descriptor_number += 1
757
758 # look position at deployed.RO.vnfd if not present it will be appended at the end
759 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
760 if vnf_deployed["member-vnf-index"] == member_vnf_index:
761 break
762 else:
763 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
764 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
765
766 # look if present
767 RO_update = {"member-vnf-index": member_vnf_index}
768 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
769 if vnfd_list:
770 RO_update["id"] = vnfd_list[0]["uuid"]
771 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
772 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
773 else:
774 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
775 get("additionalParamsForVnf"), nsr_id)
776 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
777 RO_update["id"] = desc["uuid"]
778 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
779 vnfd_ref, member_vnf_index, desc["uuid"]))
780 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
781 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
782 self.update_db_2("nsrs", nsr_id, db_nsr_update)
783
784 # create nsd at RO
785 nsd_ref = nsd["id"]
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
787 # self.logger.debug(logging_text + step)
788
789 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
790 RO_descriptor_number += 1
791 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
792 if nsd_list:
793 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
794 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
795 nsd_ref, RO_nsd_uuid))
796 else:
797 nsd_RO = deepcopy(nsd)
798 nsd_RO["id"] = RO_osm_nsd_id
799 nsd_RO.pop("_id", None)
800 nsd_RO.pop("_admin", None)
801 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
802 member_vnf_index = c_vnf["member-vnf-index"]
803 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
804 for c_vld in nsd_RO.get("vld", ()):
805 for cp in c_vld.get("vnfd-connection-point-ref", ()):
806 member_vnf_index = cp["member-vnf-index-ref"]
807 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
808
809 desc = await self.RO.create("nsd", descriptor=nsd_RO)
810 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
811 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
812 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
813 self.update_db_2("nsrs", nsr_id, db_nsr_update)
814
815 # Crate ns at RO
816 # if present use it unless in error status
817 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
818 if RO_nsr_id:
819 try:
820 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
821 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
822 desc = await self.RO.show("ns", RO_nsr_id)
823
824 except ROclient.ROClientException as e:
825 if e.http_code != HTTPStatus.NOT_FOUND:
826 raise
827 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
828 if RO_nsr_id:
829 ns_status, ns_status_info = self.RO.check_ns_status(desc)
830 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
831 if ns_status == "ERROR":
832 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
833 .format(RO_nsr_id)
834 self.logger.debug(logging_text + step)
835 await self.RO.delete("ns", RO_nsr_id)
836 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
837 if not RO_nsr_id:
838 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
839 # self.logger.debug(logging_text + step)
840
841 # check if VIM is creating and wait look if previous tasks in process
842 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
843 if task_dependency:
844 step = "Waiting for related tasks to be completed: {}".format(task_name)
845 self.logger.debug(logging_text + step)
846 await asyncio.wait(task_dependency, timeout=3600)
847 if ns_params.get("vnf"):
848 for vnf in ns_params["vnf"]:
849 if "vimAccountId" in vnf:
850 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
851 vnf["vimAccountId"])
852 if task_dependency:
853 step = "Waiting for related tasks to be completed: {}".format(task_name)
854 self.logger.debug(logging_text + step)
855 await asyncio.wait(task_dependency, timeout=3600)
856
857 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
858
859 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
860
861 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
862 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
863 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
864 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
865 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
866 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
867 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
868 self.update_db_2("nsrs", nsr_id, db_nsr_update)
869
870 # wait until NS is ready
871 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
872 detailed_status_old = None
873 self.logger.debug(logging_text + step)
874
875 old_desc = None
876 while time() <= start_deploy + timeout_ns_deploy:
877 desc = await self.RO.show("ns", RO_nsr_id)
878
879 # deploymentStatus
880 if desc != old_desc:
881 # desc has changed => update db
882 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
883 old_desc = desc
884
885 ns_status, ns_status_info = self.RO.check_ns_status(desc)
886 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
887 if ns_status == "ERROR":
888 raise ROclient.ROClientException(ns_status_info)
889 elif ns_status == "BUILD":
890 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
891 elif ns_status == "ACTIVE":
892 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
893 try:
894 if vdu_flag:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur:
968 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
969 vnfr_id, vdu_id, vdu_index
970 ))
971
972 if vdur.get("status") == "ACTIVE":
973 ip_address = vdur.get("ip-address")
974 if not ip_address:
975 continue
976 target_vdu_id = vdur["vdu-id-ref"]
977 elif vdur.get("status") == "ERROR":
978 raise LcmException("Cannot inject ssh-key because target VM is in error state")
979
980 if not target_vdu_id:
981 continue
982
983 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
984
985 # inject public key into machine
986 if pub_key and user:
987 # self.logger.debug(logging_text + "Inserting RO key")
988 try:
989 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
990 result_dict = await self.RO.create_action(
991 item="ns",
992 item_id_name=ro_nsr_id,
993 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
994 )
995 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
996 if not result_dict or not isinstance(result_dict, dict):
997 raise LcmException("Unknown response from RO when injecting key")
998 for result in result_dict.values():
999 if result.get("vim_result") == 200:
1000 break
1001 else:
1002 raise ROclient.ROClientException("error injecting key: {}".format(
1003 result.get("description")))
1004 break
1005 except ROclient.ROClientException as e:
1006 if not nb_tries:
1007 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1008 format(e, 20*10))
1009 nb_tries += 1
1010 if nb_tries >= 20:
1011 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1012 else:
1013 break
1014
1015 return ip_address
1016
1017 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1018 """
1019 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1020 """
1021 my_vca = vca_deployed_list[vca_index]
1022 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1023 # vdu or kdu: no dependencies
1024 return
1025 timeout = 300
1026 while timeout >= 0:
1027 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1028 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1029 configuration_status_list = db_nsr["configurationStatus"]
1030 for index, vca_deployed in enumerate(configuration_status_list):
1031 if index == vca_index:
1032 # myself
1033 continue
1034 if not my_vca.get("member-vnf-index") or \
1035 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1036 internal_status = configuration_status_list[index].get("status")
1037 if internal_status == 'READY':
1038 continue
1039 elif internal_status == 'BROKEN':
1040 raise LcmException("Configuration aborted because dependent charm/s has failed")
1041 else:
1042 break
1043 else:
1044 # no dependencies, return
1045 return
1046 await asyncio.sleep(10)
1047 timeout -= 1
1048
1049 raise LcmException("Configuration aborted because dependent charm/s timeout")
1050
1051 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1052 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1053 nsr_id = db_nsr["_id"]
1054 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1055 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1056 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1057 db_dict = {
1058 'collection': 'nsrs',
1059 'filter': {'_id': nsr_id},
1060 'path': db_update_entry
1061 }
1062 step = ""
1063 try:
1064
1065 element_type = 'NS'
1066 element_under_configuration = nsr_id
1067
1068 vnfr_id = None
1069 if db_vnfr:
1070 vnfr_id = db_vnfr["_id"]
1071
1072 namespace = "{nsi}.{ns}".format(
1073 nsi=nsi_id if nsi_id else "",
1074 ns=nsr_id)
1075
1076 if vnfr_id:
1077 element_type = 'VNF'
1078 element_under_configuration = vnfr_id
1079 namespace += ".{}".format(vnfr_id)
1080 if vdu_id:
1081 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1082 element_type = 'VDU'
1083 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1084
1085 # Get artifact path
1086 self.fs.sync() # Sync from FSMongo
1087 artifact_path = "{}/{}/charms/{}".format(
1088 base_folder["folder"],
1089 base_folder["pkg-dir"],
1090 config_descriptor["juju"]["charm"]
1091 )
1092
1093 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1094 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1095 is_proxy_charm = False
1096
1097 # n2vc_redesign STEP 3.1
1098
1099 # find old ee_id if exists
1100 ee_id = vca_deployed.get("ee_id")
1101
1102 # create or register execution environment in VCA
1103 if is_proxy_charm:
1104
1105 await self._write_configuration_status(
1106 nsr_id=nsr_id,
1107 vca_index=vca_index,
1108 status='CREATING',
1109 element_under_configuration=element_under_configuration,
1110 element_type=element_type
1111 )
1112
1113 step = "create execution environment"
1114 self.logger.debug(logging_text + step)
1115 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1116 reuse_ee_id=ee_id,
1117 db_dict=db_dict)
1118
1119 else:
1120 step = "Waiting to VM being up and getting IP address"
1121 self.logger.debug(logging_text + step)
1122 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1123 user=None, pub_key=None)
1124 credentials = {"hostname": rw_mgmt_ip}
1125 # get username
1126 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1127 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1128 # merged. Meanwhile let's get username from initial-config-primitive
1129 if not username and config_descriptor.get("initial-config-primitive"):
1130 for config_primitive in config_descriptor["initial-config-primitive"]:
1131 for param in config_primitive.get("parameter", ()):
1132 if param["name"] == "ssh-username":
1133 username = param["value"]
1134 break
1135 if not username:
1136 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1137 "'config-access.ssh-access.default-user'")
1138 credentials["username"] = username
1139 # n2vc_redesign STEP 3.2
1140
1141 await self._write_configuration_status(
1142 nsr_id=nsr_id,
1143 vca_index=vca_index,
1144 status='REGISTERING',
1145 element_under_configuration=element_under_configuration,
1146 element_type=element_type
1147 )
1148
1149 step = "register execution environment {}".format(credentials)
1150 self.logger.debug(logging_text + step)
1151 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1152 db_dict=db_dict)
1153
1154 # for compatibility with MON/POL modules, the need model and application name at database
1155 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1156 ee_id_parts = ee_id.split('.')
1157 model_name = ee_id_parts[0]
1158 application_name = ee_id_parts[1]
1159 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1160 db_update_entry + "application": application_name,
1161 db_update_entry + "ee_id": ee_id})
1162
1163 # n2vc_redesign STEP 3.3
1164
1165 step = "Install configuration Software"
1166
1167 await self._write_configuration_status(
1168 nsr_id=nsr_id,
1169 vca_index=vca_index,
1170 status='INSTALLING SW',
1171 element_under_configuration=element_under_configuration,
1172 element_type=element_type
1173 )
1174
1175 # TODO check if already done
1176 self.logger.debug(logging_text + step)
1177 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1178
1179 # if SSH access is required, then get execution environment SSH public
1180 if is_proxy_charm: # if native charm we have waited already to VM be UP
1181 pub_key = None
1182 user = None
1183 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1184 # Needed to inject a ssh key
1185 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1186 step = "Install configuration Software, getting public ssh key"
1187 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1188
1189 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1190 else:
1191 step = "Waiting to VM being up and getting IP address"
1192 self.logger.debug(logging_text + step)
1193
1194 # n2vc_redesign STEP 5.1
1195 # wait for RO (ip-address) Insert pub_key into VM
1196 if vnfr_id:
1197 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1198 user=user, pub_key=pub_key)
1199 else:
1200 rw_mgmt_ip = None # This is for a NS configuration
1201
1202 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1203
1204 # store rw_mgmt_ip in deploy params for later replacement
1205 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1206
1207 # n2vc_redesign STEP 6 Execute initial config primitive
1208 step = 'execute initial config primitive'
1209 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1210
1211 # sort initial config primitives by 'seq'
1212 try:
1213 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1214 except Exception as e:
1215 self.logger.error(logging_text + step + ": " + str(e))
1216
1217 # add config if not present for NS charm
1218 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1219 vca_deployed)
1220
1221 # wait for dependent primitives execution (NS -> VNF -> VDU)
1222 if initial_config_primitive_list:
1223 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1224
1225 # stage, in function of element type: vdu, kdu, vnf or ns
1226 my_vca = vca_deployed_list[vca_index]
1227 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1228 # VDU or KDU
1229 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1230 elif my_vca.get("member-vnf-index"):
1231 # VNF
1232 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1233 else:
1234 # NS
1235 stage = 'Stage 5/5: running Day-1 primitives for NS'
1236
1237 await self._write_configuration_status(
1238 nsr_id=nsr_id,
1239 vca_index=vca_index,
1240 status='EXECUTING PRIMITIVE'
1241 )
1242
1243 self._write_op_status(
1244 op_id=nslcmop_id,
1245 stage=stage
1246 )
1247
1248 for initial_config_primitive in initial_config_primitive_list:
1249 # adding information on the vca_deployed if it is a NS execution environment
1250 if not vca_deployed["member-vnf-index"]:
1251 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1252 # TODO check if already done
1253 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1254
1255 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1256 self.logger.debug(logging_text + step)
1257 await self.n2vc.exec_primitive(
1258 ee_id=ee_id,
1259 primitive_name=initial_config_primitive["name"],
1260 params_dict=primitive_params_,
1261 db_dict=db_dict
1262 )
1263
1264 # TODO register in database that primitive is done
1265
1266 step = "instantiated at VCA"
1267 self.logger.debug(logging_text + step)
1268
1269 await self._write_configuration_status(
1270 nsr_id=nsr_id,
1271 vca_index=vca_index,
1272 status='READY'
1273 )
1274
1275 except Exception as e: # TODO not use Exception but N2VC exception
1276 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1277 await self._write_configuration_status(
1278 nsr_id=nsr_id,
1279 vca_index=vca_index,
1280 status='BROKEN'
1281 )
1282 raise Exception("{} {}".format(step, e)) from e
1283 # TODO raise N2VC exception with 'step' extra information
1284
1285 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1286 error_description: str = None):
1287 try:
1288 db_dict = dict()
1289 if ns_state:
1290 db_dict["nsState"] = ns_state
1291 db_dict["currentOperation"] = current_operation
1292 db_dict["currentOperationID"] = current_operation_id
1293 db_dict["errorDescription"] = error_description
1294 self.update_db_2("nsrs", nsr_id, db_dict)
1295 except Exception as e:
1296 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1297
1298 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1299 try:
1300 db_dict = dict()
1301 db_dict['queuePosition'] = queuePosition
1302 db_dict['stage'] = stage
1303 if error_message:
1304 db_dict['errorMessage'] = error_message
1305 self.update_db_2("nslcmops", op_id, db_dict)
1306 except Exception as e:
1307 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1308
1309 def _write_all_config_status(self, nsr_id: str, status: str):
1310 try:
1311 # nsrs record
1312 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1313 # configurationStatus
1314 config_status = db_nsr.get('configurationStatus')
1315 if config_status:
1316 # update status
1317 db_dict = dict()
1318 db_dict['configurationStatus'] = list()
1319 for c in config_status:
1320 c['status'] = status
1321 db_dict['configurationStatus'].append(c)
1322 self.update_db_2("nsrs", nsr_id, db_dict)
1323
1324 except Exception as e:
1325 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1326
1327 async def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str,
1328 element_under_configuration: str = None, element_type: str = None):
1329
1330 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1331 # .format(vca_index, status))
1332
1333 try:
1334 db_path = 'configurationStatus.{}.'.format(vca_index)
1335 db_dict = dict()
1336 db_dict[db_path + 'status'] = status
1337 if element_under_configuration:
1338 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1339 if element_type:
1340 db_dict[db_path + 'elementType'] = element_type
1341 self.update_db_2("nsrs", nsr_id, db_dict)
1342 except Exception as e:
1343 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1344 .format(status, nsr_id, vca_index, e))
1345
1346 async def instantiate(self, nsr_id, nslcmop_id):
1347 """
1348
1349 :param nsr_id: ns instance to deploy
1350 :param nslcmop_id: operation to run
1351 :return:
1352 """
1353
1354 # Try to lock HA task here
1355 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1356 if not task_is_locked_by_me:
1357 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1358 return
1359
1360 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1361 self.logger.debug(logging_text + "Enter")
1362
1363 # get all needed from database
1364
1365 # database nsrs record
1366 db_nsr = None
1367
1368 # database nslcmops record
1369 db_nslcmop = None
1370
1371 # update operation on nsrs
1372 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1373 "_admin.current-operation": nslcmop_id,
1374 "_admin.operation-type": "instantiate"}
1375 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1376
1377 # update operation on nslcmops
1378 db_nslcmop_update = {}
1379
1380 nslcmop_operation_state = None
1381 db_vnfrs = {} # vnf's info indexed by member-index
1382 # n2vc_info = {}
1383 task_instantiation_list = []
1384 task_instantiation_info = {} # from task to info text
1385 exc = None
1386 try:
1387 # wait for any previous tasks in process
1388 step = "Waiting for previous operations to terminate"
1389 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1390
1391 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1392
1393 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1394 self._write_ns_status(
1395 nsr_id=nsr_id,
1396 ns_state="BUILDING",
1397 current_operation="INSTANTIATING",
1398 current_operation_id=nslcmop_id
1399 )
1400
1401 # read from db: operation
1402 step = "Getting nslcmop={} from db".format(nslcmop_id)
1403 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1404 ns_params = db_nslcmop.get("operationParams")
1405 if ns_params and ns_params.get("timeout_ns_deploy"):
1406 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1407 else:
1408 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1409
1410 # read from db: ns
1411 step = "Getting nsr={} from db".format(nsr_id)
1412 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1413 # nsd is replicated into ns (no db read)
1414 nsd = db_nsr["nsd"]
1415 # nsr_name = db_nsr["name"] # TODO short-name??
1416
1417 # read from db: vnf's of this ns
1418 step = "Getting vnfrs from db"
1419 self.logger.debug(logging_text + step)
1420 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1421
1422 # read from db: vnfd's for every vnf
1423 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1424 db_vnfds = {} # every vnfd data indexed by vnf id
1425 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1426
1427 self._write_op_status(
1428 op_id=nslcmop_id,
1429 stage='Stage 1/5: preparation of the environment',
1430 queuePosition=0
1431 )
1432
1433 # for each vnf in ns, read vnfd
1434 for vnfr in db_vnfrs_list:
1435 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1436 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1437 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1438 # if we haven't this vnfd, read it from db
1439 if vnfd_id not in db_vnfds:
1440 # read from cb
1441 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1442 self.logger.debug(logging_text + step)
1443 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1444
1445 # store vnfd
1446 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1447 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1448 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1449
1450 # Get or generates the _admin.deployed.VCA list
1451 vca_deployed_list = None
1452 if db_nsr["_admin"].get("deployed"):
1453 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1454 if vca_deployed_list is None:
1455 vca_deployed_list = []
1456 configuration_status_list = []
1457 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1458 db_nsr_update["configurationStatus"] = configuration_status_list
1459 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1460 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1461 elif isinstance(vca_deployed_list, dict):
1462 # maintain backward compatibility. Change a dict to list at database
1463 vca_deployed_list = list(vca_deployed_list.values())
1464 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1465 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1466
1467 db_nsr_update["detailed-status"] = "creating"
1468 db_nsr_update["operational-status"] = "init"
1469
1470 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1471 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1472 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1473
1474 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1475 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1476 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1477
1478 # n2vc_redesign STEP 2 Deploy Network Scenario
1479
1480 self._write_op_status(
1481 op_id=nslcmop_id,
1482 stage='Stage 2/5: deployment of VMs and execution environments'
1483 )
1484
1485 self.logger.debug(logging_text + "Before deploy_kdus")
1486 # Call to deploy_kdus in case exists the "vdu:kdu" param
1487 task_kdu = asyncio.ensure_future(
1488 self.deploy_kdus(
1489 logging_text=logging_text,
1490 nsr_id=nsr_id,
1491 db_nsr=db_nsr,
1492 db_vnfrs=db_vnfrs,
1493 )
1494 )
1495 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1496 task_instantiation_info[task_kdu] = "Deploy KDUs"
1497 task_instantiation_list.append(task_kdu)
1498 # n2vc_redesign STEP 1 Get VCA public ssh-key
1499 # feature 1429. Add n2vc public key to needed VMs
1500 n2vc_key = self.n2vc.get_public_key()
1501 n2vc_key_list = [n2vc_key]
1502 if self.vca_config.get("public_key"):
1503 n2vc_key_list.append(self.vca_config["public_key"])
1504
1505 task_ro = asyncio.ensure_future(
1506 self.instantiate_RO(
1507 logging_text=logging_text,
1508 nsr_id=nsr_id,
1509 nsd=nsd,
1510 db_nsr=db_nsr,
1511 db_nslcmop=db_nslcmop,
1512 db_vnfrs=db_vnfrs,
1513 db_vnfds_ref=db_vnfds_ref,
1514 n2vc_key_list=n2vc_key_list
1515 )
1516 )
1517 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1518 task_instantiation_info[task_ro] = "Deploy at VIM"
1519 task_instantiation_list.append(task_ro)
1520
1521 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1522 step = "Looking for needed vnfd to configure with proxy charm"
1523 self.logger.debug(logging_text + step)
1524
1525 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1526 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1527 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1528 vnfd_id = c_vnf["vnfd-id-ref"]
1529 vnfd = db_vnfds_ref[vnfd_id]
1530 member_vnf_index = str(c_vnf["member-vnf-index"])
1531 db_vnfr = db_vnfrs[member_vnf_index]
1532 base_folder = vnfd["_admin"]["storage"]
1533 vdu_id = None
1534 vdu_index = 0
1535 vdu_name = None
1536 kdu_name = None
1537
1538 # Get additional parameters
1539 deploy_params = {}
1540 if db_vnfr.get("additionalParamsForVnf"):
1541 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1542
1543 descriptor_config = vnfd.get("vnf-configuration")
1544 if descriptor_config and descriptor_config.get("juju"):
1545 self._deploy_n2vc(
1546 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1547 db_nsr=db_nsr,
1548 db_vnfr=db_vnfr,
1549 nslcmop_id=nslcmop_id,
1550 nsr_id=nsr_id,
1551 nsi_id=nsi_id,
1552 vnfd_id=vnfd_id,
1553 vdu_id=vdu_id,
1554 kdu_name=kdu_name,
1555 member_vnf_index=member_vnf_index,
1556 vdu_index=vdu_index,
1557 vdu_name=vdu_name,
1558 deploy_params=deploy_params,
1559 descriptor_config=descriptor_config,
1560 base_folder=base_folder,
1561 task_instantiation_list=task_instantiation_list,
1562 task_instantiation_info=task_instantiation_info
1563 )
1564
1565 # Deploy charms for each VDU that supports one.
1566 for vdud in get_iterable(vnfd, 'vdu'):
1567 vdu_id = vdud["id"]
1568 descriptor_config = vdud.get('vdu-configuration')
1569 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1570 if vdur.get("additionalParams"):
1571 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1572 else:
1573 deploy_params_vdu = deploy_params
1574 if descriptor_config and descriptor_config.get("juju"):
1575 # look for vdu index in the db_vnfr["vdu"] section
1576 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1577 # if vdur["vdu-id-ref"] == vdu_id:
1578 # break
1579 # else:
1580 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1581 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1582 # vdu_name = vdur.get("name")
1583 vdu_name = None
1584 kdu_name = None
1585 for vdu_index in range(int(vdud.get("count", 1))):
1586 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1587 self._deploy_n2vc(
1588 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1589 member_vnf_index, vdu_id, vdu_index),
1590 db_nsr=db_nsr,
1591 db_vnfr=db_vnfr,
1592 nslcmop_id=nslcmop_id,
1593 nsr_id=nsr_id,
1594 nsi_id=nsi_id,
1595 vnfd_id=vnfd_id,
1596 vdu_id=vdu_id,
1597 kdu_name=kdu_name,
1598 member_vnf_index=member_vnf_index,
1599 vdu_index=vdu_index,
1600 vdu_name=vdu_name,
1601 deploy_params=deploy_params_vdu,
1602 descriptor_config=descriptor_config,
1603 base_folder=base_folder,
1604 task_instantiation_list=task_instantiation_list,
1605 task_instantiation_info=task_instantiation_info
1606 )
1607 for kdud in get_iterable(vnfd, 'kdu'):
1608 kdu_name = kdud["name"]
1609 descriptor_config = kdud.get('kdu-configuration')
1610 if descriptor_config and descriptor_config.get("juju"):
1611 vdu_id = None
1612 vdu_index = 0
1613 vdu_name = None
1614 # look for vdu index in the db_vnfr["vdu"] section
1615 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1616 # if vdur["vdu-id-ref"] == vdu_id:
1617 # break
1618 # else:
1619 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1620 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1621 # vdu_name = vdur.get("name")
1622 # vdu_name = None
1623
1624 self._deploy_n2vc(
1625 logging_text=logging_text,
1626 db_nsr=db_nsr,
1627 db_vnfr=db_vnfr,
1628 nslcmop_id=nslcmop_id,
1629 nsr_id=nsr_id,
1630 nsi_id=nsi_id,
1631 vnfd_id=vnfd_id,
1632 vdu_id=vdu_id,
1633 kdu_name=kdu_name,
1634 member_vnf_index=member_vnf_index,
1635 vdu_index=vdu_index,
1636 vdu_name=vdu_name,
1637 deploy_params=deploy_params,
1638 descriptor_config=descriptor_config,
1639 base_folder=base_folder,
1640 task_instantiation_list=task_instantiation_list,
1641 task_instantiation_info=task_instantiation_info
1642 )
1643
1644 # Check if this NS has a charm configuration
1645 descriptor_config = nsd.get("ns-configuration")
1646 if descriptor_config and descriptor_config.get("juju"):
1647 vnfd_id = None
1648 db_vnfr = None
1649 member_vnf_index = None
1650 vdu_id = None
1651 kdu_name = None
1652 vdu_index = 0
1653 vdu_name = None
1654
1655 # Get additional parameters
1656 deploy_params = {}
1657 if db_nsr.get("additionalParamsForNs"):
1658 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1659 base_folder = nsd["_admin"]["storage"]
1660 self._deploy_n2vc(
1661 logging_text=logging_text,
1662 db_nsr=db_nsr,
1663 db_vnfr=db_vnfr,
1664 nslcmop_id=nslcmop_id,
1665 nsr_id=nsr_id,
1666 nsi_id=nsi_id,
1667 vnfd_id=vnfd_id,
1668 vdu_id=vdu_id,
1669 kdu_name=kdu_name,
1670 member_vnf_index=member_vnf_index,
1671 vdu_index=vdu_index,
1672 vdu_name=vdu_name,
1673 deploy_params=deploy_params,
1674 descriptor_config=descriptor_config,
1675 base_folder=base_folder,
1676 task_instantiation_list=task_instantiation_list,
1677 task_instantiation_info=task_instantiation_info
1678 )
1679
1680 # Wait until all tasks of "task_instantiation_list" have been finished
1681
1682 error_text_list = []
1683
1684 # let's begin with all OK
1685 instantiated_ok = True
1686 # let's begin with RO 'running' status (later we can change it)
1687 db_nsr_update["operational-status"] = "running"
1688 # let's begin with VCA 'configured' status (later we can change it)
1689 db_nsr_update["config-status"] = "configured"
1690
1691 if task_instantiation_list:
1692 # wait for all tasks completion
1693 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1694
1695 for task in pending:
1696 instantiated_ok = False
1697 if task == task_ro:
1698 # RO task is pending
1699 db_nsr_update["operational-status"] = "failed"
1700 elif task == task_kdu:
1701 # KDU task is pending
1702 db_nsr_update["operational-status"] = "failed"
1703 else:
1704 # A N2VC task is pending
1705 db_nsr_update["config-status"] = "failed"
1706 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1707 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1708 for task in done:
1709 if task.cancelled():
1710 instantiated_ok = False
1711 if task == task_ro:
1712 # RO task was cancelled
1713 db_nsr_update["operational-status"] = "failed"
1714 elif task == task_kdu:
1715 # KDU task was cancelled
1716 db_nsr_update["operational-status"] = "failed"
1717 else:
1718 # A N2VC was cancelled
1719 db_nsr_update["config-status"] = "failed"
1720 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1721 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1722 else:
1723 exc = task.exception()
1724 if exc:
1725 instantiated_ok = False
1726 if task == task_ro:
1727 # RO task raised an exception
1728 db_nsr_update["operational-status"] = "failed"
1729 elif task == task_kdu:
1730 # KDU task raised an exception
1731 db_nsr_update["operational-status"] = "failed"
1732 else:
1733 # A N2VC task raised an exception
1734 db_nsr_update["config-status"] = "failed"
1735 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1736
1737 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1738 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1739 else:
1740 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1741 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1742 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1743 else:
1744 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1745
1746 if error_text_list:
1747 error_text = "\n".join(error_text_list)
1748 db_nsr_update["detailed-status"] = error_text
1749 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1750 db_nslcmop_update["detailed-status"] = error_text
1751 db_nslcmop_update["statusEnteredTime"] = time()
1752 else:
1753 # all is done
1754 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1755 db_nslcmop_update["statusEnteredTime"] = time()
1756 db_nslcmop_update["detailed-status"] = "done"
1757 db_nsr_update["detailed-status"] = "done"
1758
1759 except (ROclient.ROClientException, DbException, LcmException) as e:
1760 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1761 exc = e
1762 except asyncio.CancelledError:
1763 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1764 exc = "Operation was cancelled"
1765 except Exception as e:
1766 exc = traceback.format_exc()
1767 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1768 exc_info=True)
1769 finally:
1770 if exc:
1771 if db_nsr:
1772 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1773 db_nsr_update["operational-status"] = "failed"
1774 db_nsr_update["config-status"] = "failed"
1775 if db_nslcmop:
1776 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1777 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1778 db_nslcmop_update["statusEnteredTime"] = time()
1779 try:
1780 if db_nsr:
1781 db_nsr_update["_admin.nslcmop"] = None
1782 db_nsr_update["_admin.current-operation"] = None
1783 db_nsr_update["_admin.operation-type"] = None
1784 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1785
1786 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1787 ns_state = None
1788 error_description = None
1789 if instantiated_ok:
1790 ns_state = "READY"
1791 else:
1792 ns_state = "BROKEN"
1793 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1794
1795 self._write_ns_status(
1796 nsr_id=nsr_id,
1797 ns_state=ns_state,
1798 current_operation="IDLE",
1799 current_operation_id=None,
1800 error_description=error_description
1801 )
1802
1803 self._write_op_status(
1804 op_id=nslcmop_id,
1805 error_message=error_description
1806 )
1807
1808 if db_nslcmop_update:
1809 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1810
1811 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1812
1813 except DbException as e:
1814 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1815
1816 if nslcmop_operation_state:
1817 try:
1818 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1819 "operationState": nslcmop_operation_state},
1820 loop=self.loop)
1821 except Exception as e:
1822 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1823
1824 self.logger.debug(logging_text + "Exit")
1825 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1826
1827 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1828 # Launch kdus if present in the descriptor
1829
1830 deployed_ok = True
1831
1832 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1833
1834 def _get_cluster_id(cluster_id, cluster_type):
1835 nonlocal k8scluster_id_2_uuic
1836 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1837 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1838
1839 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1840 if not db_k8scluster:
1841 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1842 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1843 if not k8s_id:
1844 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1845 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1846 return k8s_id
1847
1848 logging_text += "Deploy kdus: "
1849 try:
1850 db_nsr_update = {"_admin.deployed.K8s": []}
1851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1852
1853 # Look for all vnfds
1854 pending_tasks = {}
1855 index = 0
1856 for vnfr_data in db_vnfrs.values():
1857 for kdur in get_iterable(vnfr_data, "kdur"):
1858 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1859 kdumodel = None
1860 k8sclustertype = None
1861 error_text = None
1862 cluster_uuid = None
1863 if kdur.get("helm-chart"):
1864 kdumodel = kdur["helm-chart"]
1865 k8sclustertype = "chart"
1866 k8sclustertype_full = "helm-chart"
1867 elif kdur.get("juju-bundle"):
1868 kdumodel = kdur["juju-bundle"]
1869 k8sclustertype = "juju"
1870 k8sclustertype_full = "juju-bundle"
1871 else:
1872 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
1873 " running"
1874 try:
1875 if not error_text:
1876 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1877 except LcmException as e:
1878 error_text = str(e)
1879 deployed_ok = False
1880
1881 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1882
1883 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1884 "k8scluster-type": k8sclustertype,
1885 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1886 if error_text:
1887 k8s_instace_info["detailed-status"] = error_text
1888 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1889 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1890 if error_text:
1891 continue
1892
1893 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1894 "{}".format(index)}
1895 if k8sclustertype == "chart":
1896 task = asyncio.ensure_future(
1897 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1898 params=desc_params, db_dict=db_dict, timeout=3600)
1899 )
1900 else:
1901 task = asyncio.ensure_future(
1902 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1903 atomic=True, params=desc_params,
1904 db_dict=db_dict, timeout=600)
1905 )
1906
1907 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1908 index += 1
1909 if not pending_tasks:
1910 return
1911 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1912 pending_list = list(pending_tasks.keys())
1913 while pending_list:
1914 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1915 return_when=asyncio.FIRST_COMPLETED)
1916 if not done_list: # timeout
1917 for task in pending_list:
1918 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1919 deployed_ok = False
1920 break
1921 for task in done_list:
1922 exc = task.exception()
1923 if exc:
1924 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1925 deployed_ok = False
1926 else:
1927 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1928
1929 if not deployed_ok:
1930 raise LcmException('Cannot deploy KDUs')
1931
1932 except Exception as e:
1933 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1934 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1935 finally:
1936 if db_nsr_update:
1937 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1938
1939 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1940 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1941 base_folder, task_instantiation_list, task_instantiation_info):
1942 # launch instantiate_N2VC in a asyncio task and register task object
1943 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1944 # if not found, create one entry and update database
1945
1946 # fill db_nsr._admin.deployed.VCA.<index>
1947 vca_index = -1
1948 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1949 if not vca_deployed:
1950 continue
1951 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1952 vca_deployed.get("vdu_id") == vdu_id and \
1953 vca_deployed.get("kdu_name") == kdu_name and \
1954 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1955 break
1956 else:
1957 # not found, create one.
1958 vca_deployed = {
1959 "member-vnf-index": member_vnf_index,
1960 "vdu_id": vdu_id,
1961 "kdu_name": kdu_name,
1962 "vdu_count_index": vdu_index,
1963 "operational-status": "init", # TODO revise
1964 "detailed-status": "", # TODO revise
1965 "step": "initial-deploy", # TODO revise
1966 "vnfd_id": vnfd_id,
1967 "vdu_name": vdu_name,
1968 }
1969 vca_index += 1
1970
1971 # create VCA and configurationStatus in db
1972 db_dict = {
1973 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
1974 "configurationStatus.{}".format(vca_index): dict()
1975 }
1976 self.update_db_2("nsrs", nsr_id, db_dict)
1977
1978 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1979
1980 # Launch task
1981 task_n2vc = asyncio.ensure_future(
1982 self.instantiate_N2VC(
1983 logging_text=logging_text,
1984 vca_index=vca_index,
1985 nsi_id=nsi_id,
1986 db_nsr=db_nsr,
1987 db_vnfr=db_vnfr,
1988 vdu_id=vdu_id,
1989 kdu_name=kdu_name,
1990 vdu_index=vdu_index,
1991 deploy_params=deploy_params,
1992 config_descriptor=descriptor_config,
1993 base_folder=base_folder,
1994 nslcmop_id=nslcmop_id
1995 )
1996 )
1997 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1998 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1999 task_instantiation_list.append(task_n2vc)
2000
2001 # Check if this VNFD has a configured terminate action
2002 def _has_terminate_config_primitive(self, vnfd):
2003 vnf_config = vnfd.get("vnf-configuration")
2004 if vnf_config and vnf_config.get("terminate-config-primitive"):
2005 return True
2006 else:
2007 return False
2008
2009 @staticmethod
2010 def _get_terminate_config_primitive_seq_list(vnfd):
2011 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2012 # No need to check for existing primitive twice, already done before
2013 vnf_config = vnfd.get("vnf-configuration")
2014 seq_list = vnf_config.get("terminate-config-primitive")
2015 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2016 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2017 return seq_list_sorted
2018
2019 @staticmethod
2020 def _create_nslcmop(nsr_id, operation, params):
2021 """
2022 Creates a ns-lcm-opp content to be stored at database.
2023 :param nsr_id: internal id of the instance
2024 :param operation: instantiate, terminate, scale, action, ...
2025 :param params: user parameters for the operation
2026 :return: dictionary following SOL005 format
2027 """
2028 # Raise exception if invalid arguments
2029 if not (nsr_id and operation and params):
2030 raise LcmException(
2031 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2032 now = time()
2033 _id = str(uuid4())
2034 nslcmop = {
2035 "id": _id,
2036 "_id": _id,
2037 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2038 "operationState": "PROCESSING",
2039 "statusEnteredTime": now,
2040 "nsInstanceId": nsr_id,
2041 "lcmOperationType": operation,
2042 "startTime": now,
2043 "isAutomaticInvocation": False,
2044 "operationParams": params,
2045 "isCancelPending": False,
2046 "links": {
2047 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2048 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2049 }
2050 }
2051 return nslcmop
2052
2053 def _format_additional_params(self, params):
2054 params = params or {}
2055 for key, value in params.items():
2056 if str(value).startswith("!!yaml "):
2057 params[key] = yaml.safe_load(value[7:])
2058 return params
2059
2060 def _get_terminate_primitive_params(self, seq, vnf_index):
2061 primitive = seq.get('name')
2062 primitive_params = {}
2063 params = {
2064 "member_vnf_index": vnf_index,
2065 "primitive": primitive,
2066 "primitive_params": primitive_params,
2067 }
2068 desc_params = {}
2069 return self._map_primitive_params(seq, params, desc_params)
2070
2071 # sub-operations
2072
2073 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2074 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2075 if (op.get('operationState') == 'COMPLETED'):
2076 # b. Skip sub-operation
2077 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2078 return self.SUBOPERATION_STATUS_SKIP
2079 else:
2080 # c. Reintent executing sub-operation
2081 # The sub-operation exists, and operationState != 'COMPLETED'
2082 # Update operationState = 'PROCESSING' to indicate a reintent.
2083 operationState = 'PROCESSING'
2084 detailed_status = 'In progress'
2085 self._update_suboperation_status(
2086 db_nslcmop, op_index, operationState, detailed_status)
2087 # Return the sub-operation index
2088 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2089 # with arguments extracted from the sub-operation
2090 return op_index
2091
2092 # Find a sub-operation where all keys in a matching dictionary must match
2093 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2094 def _find_suboperation(self, db_nslcmop, match):
2095 if (db_nslcmop and match):
2096 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2097 for i, op in enumerate(op_list):
2098 if all(op.get(k) == match[k] for k in match):
2099 return i
2100 return self.SUBOPERATION_STATUS_NOT_FOUND
2101
2102 # Update status for a sub-operation given its index
2103 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2104 # Update DB for HA tasks
2105 q_filter = {'_id': db_nslcmop['_id']}
2106 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2107 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2108 self.db.set_one("nslcmops",
2109 q_filter=q_filter,
2110 update_dict=update_dict,
2111 fail_on_empty=False)
2112
2113 # Add sub-operation, return the index of the added sub-operation
2114 # Optionally, set operationState, detailed-status, and operationType
2115 # Status and type are currently set for 'scale' sub-operations:
2116 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2117 # 'detailed-status' : status message
2118 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2119 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2120 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2121 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2122 RO_nsr_id=None, RO_scaling_info=None):
2123 if not (db_nslcmop):
2124 return self.SUBOPERATION_STATUS_NOT_FOUND
2125 # Get the "_admin.operations" list, if it exists
2126 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2127 op_list = db_nslcmop_admin.get('operations')
2128 # Create or append to the "_admin.operations" list
2129 new_op = {'member_vnf_index': vnf_index,
2130 'vdu_id': vdu_id,
2131 'vdu_count_index': vdu_count_index,
2132 'primitive': primitive,
2133 'primitive_params': mapped_primitive_params}
2134 if operationState:
2135 new_op['operationState'] = operationState
2136 if detailed_status:
2137 new_op['detailed-status'] = detailed_status
2138 if operationType:
2139 new_op['lcmOperationType'] = operationType
2140 if RO_nsr_id:
2141 new_op['RO_nsr_id'] = RO_nsr_id
2142 if RO_scaling_info:
2143 new_op['RO_scaling_info'] = RO_scaling_info
2144 if not op_list:
2145 # No existing operations, create key 'operations' with current operation as first list element
2146 db_nslcmop_admin.update({'operations': [new_op]})
2147 op_list = db_nslcmop_admin.get('operations')
2148 else:
2149 # Existing operations, append operation to list
2150 op_list.append(new_op)
2151
2152 db_nslcmop_update = {'_admin.operations': op_list}
2153 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2154 op_index = len(op_list) - 1
2155 return op_index
2156
2157 # Helper methods for scale() sub-operations
2158
2159 # pre-scale/post-scale:
2160 # Check for 3 different cases:
2161 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2162 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2163 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2164 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2165 operationType, RO_nsr_id=None, RO_scaling_info=None):
2166 # Find this sub-operation
2167 if (RO_nsr_id and RO_scaling_info):
2168 operationType = 'SCALE-RO'
2169 match = {
2170 'member_vnf_index': vnf_index,
2171 'RO_nsr_id': RO_nsr_id,
2172 'RO_scaling_info': RO_scaling_info,
2173 }
2174 else:
2175 match = {
2176 'member_vnf_index': vnf_index,
2177 'primitive': vnf_config_primitive,
2178 'primitive_params': primitive_params,
2179 'lcmOperationType': operationType
2180 }
2181 op_index = self._find_suboperation(db_nslcmop, match)
2182 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2183 # a. New sub-operation
2184 # The sub-operation does not exist, add it.
2185 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2186 # The following parameters are set to None for all kind of scaling:
2187 vdu_id = None
2188 vdu_count_index = None
2189 vdu_name = None
2190 if (RO_nsr_id and RO_scaling_info):
2191 vnf_config_primitive = None
2192 primitive_params = None
2193 else:
2194 RO_nsr_id = None
2195 RO_scaling_info = None
2196 # Initial status for sub-operation
2197 operationState = 'PROCESSING'
2198 detailed_status = 'In progress'
2199 # Add sub-operation for pre/post-scaling (zero or more operations)
2200 self._add_suboperation(db_nslcmop,
2201 vnf_index,
2202 vdu_id,
2203 vdu_count_index,
2204 vdu_name,
2205 vnf_config_primitive,
2206 primitive_params,
2207 operationState,
2208 detailed_status,
2209 operationType,
2210 RO_nsr_id,
2211 RO_scaling_info)
2212 return self.SUBOPERATION_STATUS_NEW
2213 else:
2214 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2215 # or op_index (operationState != 'COMPLETED')
2216 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2217
2218 # Function to return execution_environment id
2219
2220 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2221 for vca in vca_deployed_list:
2222 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2223 return vca["ee_id"]
2224
2225 # Helper methods for terminate()
2226
2227 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2228 """ Create a primitive with params from VNFD
2229 Called from terminate() before deleting instance
2230 Calls action() to execute the primitive """
2231 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2232 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2233 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2234 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2235 db_vnfds = {}
2236 # Loop over VNFRs
2237 for vnfr in db_vnfrs_list:
2238 vnfd_id = vnfr["vnfd-id"]
2239 vnf_index = vnfr["member-vnf-index-ref"]
2240 if vnfd_id not in db_vnfds:
2241 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2242 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2243 db_vnfds[vnfd_id] = vnfd
2244 vnfd = db_vnfds[vnfd_id]
2245 if not self._has_terminate_config_primitive(vnfd):
2246 continue
2247 # Get the primitive's sorted sequence list
2248 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2249 for seq in seq_list:
2250 # For each sequence in list, get primitive and call _ns_execute_primitive()
2251 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2252 vnf_index, seq.get("name"))
2253 self.logger.debug(logging_text + step)
2254 # Create the primitive for each sequence, i.e. "primitive": "touch"
2255 primitive = seq.get('name')
2256 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2257 # The following 3 parameters are currently set to None for 'terminate':
2258 # vdu_id, vdu_count_index, vdu_name
2259 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2260 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2261 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2262 # Add sub-operation
2263 self._add_suboperation(db_nslcmop,
2264 nslcmop_id,
2265 vnf_index,
2266 vdu_id,
2267 vdu_count_index,
2268 vdu_name,
2269 primitive,
2270 mapped_primitive_params)
2271 # Sub-operations: Call _ns_execute_primitive() instead of action()
2272 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2273 # nsr_deployed = db_nsr["_admin"]["deployed"]
2274
2275 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2276 # nsr_id, nslcmop_terminate_action_id)
2277 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2278 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2279 # if result not in result_ok:
2280 # raise LcmException(
2281 # "terminate_primitive_action for vnf_member_index={}",
2282 # " primitive={} fails with error {}".format(
2283 # vnf_index, seq.get("name"), result_detail))
2284
2285 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2286 try:
2287 await self.n2vc.exec_primitive(
2288 ee_id=ee_id,
2289 primitive_name=primitive,
2290 params_dict=mapped_primitive_params
2291 )
2292 except Exception as e:
2293 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2294 raise LcmException(
2295 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2296 .format(vnf_index, seq.get("name"), e),
2297 )
2298
2299 async def _delete_N2VC(self, nsr_id: str):
2300 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2301 namespace = "." + nsr_id
2302 await self.n2vc.delete_namespace(namespace=namespace)
2303 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2304
2305 async def terminate(self, nsr_id, nslcmop_id):
2306
2307 # Try to lock HA task here
2308 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2309 if not task_is_locked_by_me:
2310 return
2311
2312 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2313 self.logger.debug(logging_text + "Enter")
2314 db_nsr = None
2315 db_nslcmop = None
2316 exc = None
2317 failed_detail = [] # annotates all failed error messages
2318 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2319 "_admin.current-operation": nslcmop_id,
2320 "_admin.operation-type": "terminate"}
2321 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2322 db_nslcmop_update = {}
2323 nslcmop_operation_state = None
2324 autoremove = False # autoremove after terminated
2325 pending_tasks = []
2326 try:
2327 # wait for any previous tasks in process
2328 step = "Waiting for previous operations to terminate"
2329 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2330
2331 self._write_ns_status(
2332 nsr_id=nsr_id,
2333 ns_state="TERMINATING",
2334 current_operation="TERMINATING",
2335 current_operation_id=nslcmop_id
2336 )
2337 self._write_op_status(
2338 op_id=nslcmop_id,
2339 queuePosition=0
2340 )
2341
2342 step = "Getting nslcmop={} from db".format(nslcmop_id)
2343 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2344 step = "Getting nsr={} from db".format(nsr_id)
2345 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2346 # nsd = db_nsr["nsd"]
2347 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2348 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2349 return
2350 # #TODO check if VIM is creating and wait
2351 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2352 # Call internal terminate action
2353 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2354
2355 pending_tasks = []
2356
2357 db_nsr_update["operational-status"] = "terminating"
2358 db_nsr_update["config-status"] = "terminating"
2359
2360 # remove NS
2361 try:
2362 step = "delete execution environment"
2363 self.logger.debug(logging_text + step)
2364
2365 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2366 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2367
2368 pending_tasks.append(task_delete_ee)
2369 except Exception as e:
2370 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2371 self.logger.error(msg)
2372 failed_detail.append(msg)
2373
2374 try:
2375 # Delete from k8scluster
2376 step = "delete kdus"
2377 self.logger.debug(logging_text + step)
2378 # print(nsr_deployed)
2379 if nsr_deployed:
2380 for kdu in nsr_deployed.get("K8s", ()):
2381 kdu_instance = kdu.get("kdu-instance")
2382 if not kdu_instance:
2383 continue
2384 if kdu.get("k8scluster-type") == "chart":
2385 task_delete_kdu_instance = asyncio.ensure_future(
2386 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2387 kdu_instance=kdu_instance))
2388 elif kdu.get("k8scluster-type") == "juju":
2389 task_delete_kdu_instance = asyncio.ensure_future(
2390 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2391 kdu_instance=kdu_instance))
2392 else:
2393 self.error(logging_text + "Unknown k8s deployment type {}".
2394 format(kdu.get("k8scluster-type")))
2395 continue
2396 pending_tasks.append(task_delete_kdu_instance)
2397 except LcmException as e:
2398 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2399 self.logger.error(msg)
2400 failed_detail.append(msg)
2401
2402 # remove from RO
2403 RO_fail = False
2404
2405 # Delete ns
2406 RO_nsr_id = RO_delete_action = None
2407 if nsr_deployed and nsr_deployed.get("RO"):
2408 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2409 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2410 try:
2411 if RO_nsr_id:
2412 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2413 "Deleting ns from VIM"
2414 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2415 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2416 self.logger.debug(logging_text + step)
2417 desc = await self.RO.delete("ns", RO_nsr_id)
2418 RO_delete_action = desc["action_id"]
2419 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2420 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2421 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2422 if RO_delete_action:
2423 # wait until NS is deleted from VIM
2424 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2425 format(RO_nsr_id, RO_delete_action)
2426 detailed_status_old = None
2427 self.logger.debug(logging_text + step)
2428
2429 delete_timeout = 20 * 60 # 20 minutes
2430 while delete_timeout > 0:
2431 desc = await self.RO.show(
2432 "ns",
2433 item_id_name=RO_nsr_id,
2434 extra_item="action",
2435 extra_item_id=RO_delete_action)
2436
2437 # deploymentStatus
2438 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2439
2440 ns_status, ns_status_info = self.RO.check_action_status(desc)
2441 if ns_status == "ERROR":
2442 raise ROclient.ROClientException(ns_status_info)
2443 elif ns_status == "BUILD":
2444 detailed_status = step + "; {}".format(ns_status_info)
2445 elif ns_status == "ACTIVE":
2446 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2447 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2448 break
2449 else:
2450 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2451 if detailed_status != detailed_status_old:
2452 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2453 db_nsr_update["detailed-status"] = detailed_status
2454 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2455 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2456 await asyncio.sleep(5, loop=self.loop)
2457 delete_timeout -= 5
2458 else: # delete_timeout <= 0:
2459 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2460
2461 except ROclient.ROClientException as e:
2462 if e.http_code == 404: # not found
2463 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2464 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2465 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2466 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2467 elif e.http_code == 409: # conflict
2468 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2469 self.logger.debug(logging_text + failed_detail[-1])
2470 RO_fail = True
2471 else:
2472 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2473 self.logger.error(logging_text + failed_detail[-1])
2474 RO_fail = True
2475
2476 # Delete nsd
2477 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2478 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2479 try:
2480 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2481 "Deleting nsd from RO"
2482 await self.RO.delete("nsd", RO_nsd_id)
2483 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2484 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2485 except ROclient.ROClientException as e:
2486 if e.http_code == 404: # not found
2487 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2488 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2489 elif e.http_code == 409: # conflict
2490 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2491 self.logger.debug(logging_text + failed_detail[-1])
2492 RO_fail = True
2493 else:
2494 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2495 self.logger.error(logging_text + failed_detail[-1])
2496 RO_fail = True
2497
2498 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2499 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2500 if not vnf_deployed or not vnf_deployed["id"]:
2501 continue
2502 try:
2503 RO_vnfd_id = vnf_deployed["id"]
2504 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2505 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2506 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2507 await self.RO.delete("vnfd", RO_vnfd_id)
2508 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2509 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2510 except ROclient.ROClientException as e:
2511 if e.http_code == 404: # not found
2512 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2513 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2514 elif e.http_code == 409: # conflict
2515 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2516 self.logger.debug(logging_text + failed_detail[-1])
2517 else:
2518 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2519 self.logger.error(logging_text + failed_detail[-1])
2520
2521 if failed_detail:
2522 terminate_ok = False
2523 self.logger.error(logging_text + " ;".join(failed_detail))
2524 db_nsr_update["operational-status"] = "failed"
2525 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2526 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2527 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2528 db_nslcmop_update["statusEnteredTime"] = time()
2529 else:
2530 terminate_ok = True
2531 db_nsr_update["operational-status"] = "terminated"
2532 db_nsr_update["detailed-status"] = "Done"
2533 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2534 db_nslcmop_update["detailed-status"] = "Done"
2535 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2536 db_nslcmop_update["statusEnteredTime"] = time()
2537 if db_nslcmop["operationParams"].get("autoremove"):
2538 autoremove = True
2539
2540 except (ROclient.ROClientException, DbException, LcmException) as e:
2541 self.logger.error(logging_text + "Exit Exception {}".format(e))
2542 exc = e
2543 except asyncio.CancelledError:
2544 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2545 exc = "Operation was cancelled"
2546 except Exception as e:
2547 exc = traceback.format_exc()
2548 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2549 finally:
2550 if exc and db_nslcmop:
2551 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2552 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2553 db_nslcmop_update["statusEnteredTime"] = time()
2554 try:
2555 if db_nslcmop and db_nslcmop_update:
2556 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2557 if db_nsr:
2558 db_nsr_update["_admin.nslcmop"] = None
2559 db_nsr_update["_admin.current-operation"] = None
2560 db_nsr_update["_admin.operation-type"] = None
2561 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2562
2563 if terminate_ok:
2564 ns_state = "IDLE"
2565 error_description = None
2566 error_detail = None
2567 else:
2568 ns_state = "BROKEN"
2569 error_detail = "; ".join(failed_detail)
2570 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2571 .format(nslcmop_id, step, error_detail)
2572
2573 self._write_ns_status(
2574 nsr_id=nsr_id,
2575 ns_state=ns_state,
2576 current_operation="IDLE",
2577 current_operation_id=None,
2578 error_description=error_description
2579 )
2580
2581 self._write_op_status(
2582 op_id=nslcmop_id,
2583 error_message=error_description
2584 )
2585
2586 except DbException as e:
2587 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2588 if nslcmop_operation_state:
2589 try:
2590 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2591 "operationState": nslcmop_operation_state,
2592 "autoremove": autoremove},
2593 loop=self.loop)
2594 except Exception as e:
2595 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2596
2597 # wait for pending tasks
2598 done = None
2599 pending = None
2600 if pending_tasks:
2601 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2602 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2603 if not pending:
2604 self.logger.debug(logging_text + 'All tasks finished...')
2605 else:
2606 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2607
2608 self.logger.debug(logging_text + "Exit")
2609 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2610
2611 @staticmethod
2612 def _map_primitive_params(primitive_desc, params, instantiation_params):
2613 """
2614 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2615 The default-value is used. If it is between < > it look for a value at instantiation_params
2616 :param primitive_desc: portion of VNFD/NSD that describes primitive
2617 :param params: Params provided by user
2618 :param instantiation_params: Instantiation params provided by user
2619 :return: a dictionary with the calculated params
2620 """
2621 calculated_params = {}
2622 for parameter in primitive_desc.get("parameter", ()):
2623 param_name = parameter["name"]
2624 if param_name in params:
2625 calculated_params[param_name] = params[param_name]
2626 elif "default-value" in parameter or "value" in parameter:
2627 if "value" in parameter:
2628 calculated_params[param_name] = parameter["value"]
2629 else:
2630 calculated_params[param_name] = parameter["default-value"]
2631 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2632 and calculated_params[param_name].endswith(">"):
2633 if calculated_params[param_name][1:-1] in instantiation_params:
2634 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2635 else:
2636 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2637 format(calculated_params[param_name], primitive_desc["name"]))
2638 else:
2639 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2640 format(param_name, primitive_desc["name"]))
2641
2642 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2643 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2644 width=256)
2645 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2646 calculated_params[param_name] = calculated_params[param_name][7:]
2647
2648 # add always ns_config_info if primitive name is config
2649 if primitive_desc["name"] == "config":
2650 if "ns_config_info" in instantiation_params:
2651 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2652 return calculated_params
2653
2654 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2655 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2656
2657 # find vca_deployed record for this action
2658 try:
2659 for vca_deployed in db_deployed["VCA"]:
2660 if not vca_deployed:
2661 continue
2662 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2663 continue
2664 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2665 continue
2666 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2667 continue
2668 break
2669 else:
2670 # vca_deployed not found
2671 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2672 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2673
2674 # get ee_id
2675 ee_id = vca_deployed.get("ee_id")
2676 if not ee_id:
2677 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2678 "execution environment"
2679 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2680
2681 if primitive == "config":
2682 primitive_params = {"params": primitive_params}
2683
2684 while retries >= 0:
2685 try:
2686 output = await self.n2vc.exec_primitive(
2687 ee_id=ee_id,
2688 primitive_name=primitive,
2689 params_dict=primitive_params
2690 )
2691 # execution was OK
2692 break
2693 except Exception as e:
2694 retries -= 1
2695 if retries >= 0:
2696 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2697 # wait and retry
2698 await asyncio.sleep(retries_interval, loop=self.loop)
2699 else:
2700 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2701
2702 return output, 'OK'
2703
2704 except Exception as e:
2705 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2706
2707 async def action(self, nsr_id, nslcmop_id):
2708
2709 # Try to lock HA task here
2710 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2711 if not task_is_locked_by_me:
2712 return
2713
2714 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2715 self.logger.debug(logging_text + "Enter")
2716 # get all needed from database
2717 db_nsr = None
2718 db_nslcmop = None
2719 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2720 "_admin.current-operation": nslcmop_id,
2721 "_admin.operation-type": "action"}
2722 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2723 db_nslcmop_update = {}
2724 nslcmop_operation_state = None
2725 nslcmop_operation_state_detail = None
2726 exc = None
2727 try:
2728 # wait for any previous tasks in process
2729 step = "Waiting for previous operations to terminate"
2730 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2731
2732 self._write_ns_status(
2733 nsr_id=nsr_id,
2734 ns_state=None,
2735 current_operation="RUNNING ACTION",
2736 current_operation_id=nslcmop_id
2737 )
2738
2739 step = "Getting information from database"
2740 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2741 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2742
2743 nsr_deployed = db_nsr["_admin"].get("deployed")
2744 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2745 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2746 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2747 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2748 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2749
2750 if vnf_index:
2751 step = "Getting vnfr from database"
2752 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2753 step = "Getting vnfd from database"
2754 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2755 else:
2756 if db_nsr.get("nsd"):
2757 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2758 else:
2759 step = "Getting nsd from database"
2760 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2761
2762 # for backward compatibility
2763 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2764 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2765 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2767
2768 primitive = db_nslcmop["operationParams"]["primitive"]
2769 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2770
2771 # look for primitive
2772 config_primitive_desc = None
2773 if vdu_id:
2774 for vdu in get_iterable(db_vnfd, "vdu"):
2775 if vdu_id == vdu["id"]:
2776 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2777 if config_primitive["name"] == primitive:
2778 config_primitive_desc = config_primitive
2779 break
2780 elif kdu_name:
2781 self.logger.debug(logging_text + "Checking actions in KDUs")
2782 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2783 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2784 if primitive_params:
2785 desc_params.update(primitive_params)
2786 # TODO Check if we will need something at vnf level
2787 index = 0
2788 for kdu in get_iterable(nsr_deployed, "K8s"):
2789 if kdu_name == kdu["kdu-name"]:
2790 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2791 "path": "_admin.deployed.K8s.{}".format(index)}
2792 if primitive == "upgrade":
2793 if desc_params.get("kdu_model"):
2794 kdu_model = desc_params.get("kdu_model")
2795 del desc_params["kdu_model"]
2796 else:
2797 kdu_model = kdu.get("kdu-model")
2798 parts = kdu_model.split(sep=":")
2799 if len(parts) == 2:
2800 kdu_model = parts[0]
2801
2802 if kdu.get("k8scluster-type") == "chart":
2803 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2804 kdu_instance=kdu.get("kdu-instance"),
2805 atomic=True, kdu_model=kdu_model,
2806 params=desc_params, db_dict=db_dict,
2807 timeout=300)
2808 elif kdu.get("k8scluster-type") == "juju":
2809 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2810 kdu_instance=kdu.get("kdu-instance"),
2811 atomic=True, kdu_model=kdu_model,
2812 params=desc_params, db_dict=db_dict,
2813 timeout=300)
2814
2815 else:
2816 msg = "k8scluster-type not defined"
2817 raise LcmException(msg)
2818
2819 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2820 break
2821 elif primitive == "rollback":
2822 if kdu.get("k8scluster-type") == "chart":
2823 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2824 kdu_instance=kdu.get("kdu-instance"),
2825 db_dict=db_dict)
2826 elif kdu.get("k8scluster-type") == "juju":
2827 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2828 kdu_instance=kdu.get("kdu-instance"),
2829 db_dict=db_dict)
2830 else:
2831 msg = "k8scluster-type not defined"
2832 raise LcmException(msg)
2833 break
2834 elif primitive == "status":
2835 if kdu.get("k8scluster-type") == "chart":
2836 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2837 kdu_instance=kdu.get("kdu-instance"))
2838 elif kdu.get("k8scluster-type") == "juju":
2839 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2840 kdu_instance=kdu.get("kdu-instance"))
2841 else:
2842 msg = "k8scluster-type not defined"
2843 raise LcmException(msg)
2844 break
2845 index += 1
2846
2847 else:
2848 raise LcmException("KDU '{}' not found".format(kdu_name))
2849 if output:
2850 db_nslcmop_update["detailed-status"] = output
2851 db_nslcmop_update["operationState"] = 'COMPLETED'
2852 db_nslcmop_update["statusEnteredTime"] = time()
2853 else:
2854 db_nslcmop_update["detailed-status"] = ''
2855 db_nslcmop_update["operationState"] = 'FAILED'
2856 db_nslcmop_update["statusEnteredTime"] = time()
2857 return
2858 elif vnf_index:
2859 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2860 if config_primitive["name"] == primitive:
2861 config_primitive_desc = config_primitive
2862 break
2863 else:
2864 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2865 if config_primitive["name"] == primitive:
2866 config_primitive_desc = config_primitive
2867 break
2868
2869 if not config_primitive_desc:
2870 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2871 format(primitive))
2872
2873 desc_params = {}
2874 if vnf_index:
2875 if db_vnfr.get("additionalParamsForVnf"):
2876 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2877 if vdu_id:
2878 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2879 if vdur.get("additionalParams"):
2880 desc_params = self._format_additional_params(vdur["additionalParams"])
2881 else:
2882 if db_nsr.get("additionalParamsForNs"):
2883 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2884
2885 # TODO check if ns is in a proper status
2886 output, detail = await self._ns_execute_primitive(
2887 db_deployed=nsr_deployed,
2888 member_vnf_index=vnf_index,
2889 vdu_id=vdu_id,
2890 vdu_name=vdu_name,
2891 vdu_count_index=vdu_count_index,
2892 primitive=primitive,
2893 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2894
2895 detailed_status = output
2896 if detail == 'OK':
2897 result = 'COMPLETED'
2898 else:
2899 result = 'FAILED'
2900
2901 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2902 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2903 db_nslcmop_update["statusEnteredTime"] = time()
2904 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2905 return # database update is called inside finally
2906
2907 except (DbException, LcmException) as e:
2908 self.logger.error(logging_text + "Exit Exception {}".format(e))
2909 exc = e
2910 except asyncio.CancelledError:
2911 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2912 exc = "Operation was cancelled"
2913 except Exception as e:
2914 exc = traceback.format_exc()
2915 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2916 finally:
2917 if exc and db_nslcmop:
2918 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2919 "FAILED {}: {}".format(step, exc)
2920 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2921 db_nslcmop_update["statusEnteredTime"] = time()
2922 try:
2923 if db_nslcmop_update:
2924 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2925 if db_nsr:
2926 db_nsr_update["_admin.nslcmop"] = None
2927 db_nsr_update["_admin.operation-type"] = None
2928 db_nsr_update["_admin.nslcmop"] = None
2929 db_nsr_update["_admin.current-operation"] = None
2930 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2931 self._write_ns_status(
2932 nsr_id=nsr_id,
2933 ns_state=None,
2934 current_operation="IDLE",
2935 current_operation_id=None
2936 )
2937 if exc:
2938 self._write_op_status(
2939 op_id=nslcmop_id,
2940 error_message=nslcmop_operation_state_detail
2941 )
2942 except DbException as e:
2943 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2944 self.logger.debug(logging_text + "Exit")
2945 if nslcmop_operation_state:
2946 try:
2947 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2948 "operationState": nslcmop_operation_state},
2949 loop=self.loop)
2950 except Exception as e:
2951 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2952 self.logger.debug(logging_text + "Exit")
2953 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2954 return nslcmop_operation_state, nslcmop_operation_state_detail
2955
2956 async def scale(self, nsr_id, nslcmop_id):
2957
2958 # Try to lock HA task here
2959 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2960 if not task_is_locked_by_me:
2961 return
2962
2963 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2964 self.logger.debug(logging_text + "Enter")
2965 # get all needed from database
2966 db_nsr = None
2967 db_nslcmop = None
2968 db_nslcmop_update = {}
2969 nslcmop_operation_state = None
2970 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2971 "_admin.current-operation": nslcmop_id,
2972 "_admin.operation-type": "scale"}
2973 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2974 exc = None
2975 # in case of error, indicates what part of scale was failed to put nsr at error status
2976 scale_process = None
2977 old_operational_status = ""
2978 old_config_status = ""
2979 vnfr_scaled = False
2980 try:
2981 # wait for any previous tasks in process
2982 step = "Waiting for previous operations to terminate"
2983 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2984
2985 self._write_ns_status(
2986 nsr_id=nsr_id,
2987 ns_state=None,
2988 current_operation="SCALING",
2989 current_operation_id=nslcmop_id
2990 )
2991
2992 step = "Getting nslcmop from database"
2993 self.logger.debug(step + " after having waited for previous tasks to be completed")
2994 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2995 step = "Getting nsr from database"
2996 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2997
2998 old_operational_status = db_nsr["operational-status"]
2999 old_config_status = db_nsr["config-status"]
3000 step = "Parsing scaling parameters"
3001 # self.logger.debug(step)
3002 db_nsr_update["operational-status"] = "scaling"
3003 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3004 nsr_deployed = db_nsr["_admin"].get("deployed")
3005
3006 #######
3007 nsr_deployed = db_nsr["_admin"].get("deployed")
3008 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3009 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3010 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3011 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3012 #######
3013
3014 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3015 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3016 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3017 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3018 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3019
3020 # for backward compatibility
3021 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3022 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3023 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3024 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3025
3026 step = "Getting vnfr from database"
3027 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3028 step = "Getting vnfd from database"
3029 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3030
3031 step = "Getting scaling-group-descriptor"
3032 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3033 if scaling_descriptor["name"] == scaling_group:
3034 break
3035 else:
3036 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3037 "at vnfd:scaling-group-descriptor".format(scaling_group))
3038
3039 # cooldown_time = 0
3040 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3041 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3042 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3043 # break
3044
3045 # TODO check if ns is in a proper status
3046 step = "Sending scale order to VIM"
3047 nb_scale_op = 0
3048 if not db_nsr["_admin"].get("scaling-group"):
3049 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3050 admin_scale_index = 0
3051 else:
3052 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3053 if admin_scale_info["name"] == scaling_group:
3054 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3055 break
3056 else: # not found, set index one plus last element and add new entry with the name
3057 admin_scale_index += 1
3058 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3059 RO_scaling_info = []
3060 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3061 if scaling_type == "SCALE_OUT":
3062 # count if max-instance-count is reached
3063 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3064 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3065 if nb_scale_op >= max_instance_count:
3066 raise LcmException("reached the limit of {} (max-instance-count) "
3067 "scaling-out operations for the "
3068 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3069
3070 nb_scale_op += 1
3071 vdu_scaling_info["scaling_direction"] = "OUT"
3072 vdu_scaling_info["vdu-create"] = {}
3073 for vdu_scale_info in scaling_descriptor["vdu"]:
3074 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3075 "type": "create", "count": vdu_scale_info.get("count", 1)})
3076 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3077
3078 elif scaling_type == "SCALE_IN":
3079 # count if min-instance-count is reached
3080 min_instance_count = 0
3081 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3082 min_instance_count = int(scaling_descriptor["min-instance-count"])
3083 if nb_scale_op <= min_instance_count:
3084 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3085 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3086 nb_scale_op -= 1
3087 vdu_scaling_info["scaling_direction"] = "IN"
3088 vdu_scaling_info["vdu-delete"] = {}
3089 for vdu_scale_info in scaling_descriptor["vdu"]:
3090 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3091 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3092 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3093
3094 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3095 vdu_create = vdu_scaling_info.get("vdu-create")
3096 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3097 if vdu_scaling_info["scaling_direction"] == "IN":
3098 for vdur in reversed(db_vnfr["vdur"]):
3099 if vdu_delete.get(vdur["vdu-id-ref"]):
3100 vdu_delete[vdur["vdu-id-ref"]] -= 1
3101 vdu_scaling_info["vdu"].append({
3102 "name": vdur["name"],
3103 "vdu_id": vdur["vdu-id-ref"],
3104 "interface": []
3105 })
3106 for interface in vdur["interfaces"]:
3107 vdu_scaling_info["vdu"][-1]["interface"].append({
3108 "name": interface["name"],
3109 "ip_address": interface["ip-address"],
3110 "mac_address": interface.get("mac-address"),
3111 })
3112 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3113
3114 # PRE-SCALE BEGIN
3115 step = "Executing pre-scale vnf-config-primitive"
3116 if scaling_descriptor.get("scaling-config-action"):
3117 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3118 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3119 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3120 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3121 step = db_nslcmop_update["detailed-status"] = \
3122 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3123
3124 # look for primitive
3125 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3126 if config_primitive["name"] == vnf_config_primitive:
3127 break
3128 else:
3129 raise LcmException(
3130 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3131 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3132 "primitive".format(scaling_group, config_primitive))
3133
3134 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3135 if db_vnfr.get("additionalParamsForVnf"):
3136 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3137
3138 scale_process = "VCA"
3139 db_nsr_update["config-status"] = "configuring pre-scaling"
3140 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3141
3142 # Pre-scale reintent check: Check if this sub-operation has been executed before
3143 op_index = self._check_or_add_scale_suboperation(
3144 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3145 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3146 # Skip sub-operation
3147 result = 'COMPLETED'
3148 result_detail = 'Done'
3149 self.logger.debug(logging_text +
3150 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3151 vnf_config_primitive, result, result_detail))
3152 else:
3153 if (op_index == self.SUBOPERATION_STATUS_NEW):
3154 # New sub-operation: Get index of this sub-operation
3155 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3156 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3157 format(vnf_config_primitive))
3158 else:
3159 # Reintent: Get registered params for this existing sub-operation
3160 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3161 vnf_index = op.get('member_vnf_index')
3162 vnf_config_primitive = op.get('primitive')
3163 primitive_params = op.get('primitive_params')
3164 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3165 format(vnf_config_primitive))
3166 # Execute the primitive, either with new (first-time) or registered (reintent) args
3167 result, result_detail = await self._ns_execute_primitive(
3168 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3169 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3170 vnf_config_primitive, result, result_detail))
3171 # Update operationState = COMPLETED | FAILED
3172 self._update_suboperation_status(
3173 db_nslcmop, op_index, result, result_detail)
3174
3175 if result == "FAILED":
3176 raise LcmException(result_detail)
3177 db_nsr_update["config-status"] = old_config_status
3178 scale_process = None
3179 # PRE-SCALE END
3180
3181 # SCALE RO - BEGIN
3182 # Should this block be skipped if 'RO_nsr_id' == None ?
3183 # if (RO_nsr_id and RO_scaling_info):
3184 if RO_scaling_info:
3185 scale_process = "RO"
3186 # Scale RO reintent check: Check if this sub-operation has been executed before
3187 op_index = self._check_or_add_scale_suboperation(
3188 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3189 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3190 # Skip sub-operation
3191 result = 'COMPLETED'
3192 result_detail = 'Done'
3193 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3194 result, result_detail))
3195 else:
3196 if (op_index == self.SUBOPERATION_STATUS_NEW):
3197 # New sub-operation: Get index of this sub-operation
3198 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3199 self.logger.debug(logging_text + "New sub-operation RO")
3200 else:
3201 # Reintent: Get registered params for this existing sub-operation
3202 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3203 RO_nsr_id = op.get('RO_nsr_id')
3204 RO_scaling_info = op.get('RO_scaling_info')
3205 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3206 vnf_config_primitive))
3207
3208 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3209 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3210 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3211 # wait until ready
3212 RO_nslcmop_id = RO_desc["instance_action_id"]
3213 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3214
3215 RO_task_done = False
3216 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3217 detailed_status_old = None
3218 self.logger.debug(logging_text + step)
3219
3220 deployment_timeout = 1 * 3600 # One hour
3221 while deployment_timeout > 0:
3222 if not RO_task_done:
3223 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3224 extra_item_id=RO_nslcmop_id)
3225
3226 # deploymentStatus
3227 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3228
3229 ns_status, ns_status_info = self.RO.check_action_status(desc)
3230 if ns_status == "ERROR":
3231 raise ROclient.ROClientException(ns_status_info)
3232 elif ns_status == "BUILD":
3233 detailed_status = step + "; {}".format(ns_status_info)
3234 elif ns_status == "ACTIVE":
3235 RO_task_done = True
3236 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3237 self.logger.debug(logging_text + step)
3238 else:
3239 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3240 else:
3241
3242 if ns_status == "ERROR":
3243 raise ROclient.ROClientException(ns_status_info)
3244 elif ns_status == "BUILD":
3245 detailed_status = step + "; {}".format(ns_status_info)
3246 elif ns_status == "ACTIVE":
3247 step = detailed_status = \
3248 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3249 if not vnfr_scaled:
3250 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3251 vnfr_scaled = True
3252 try:
3253 desc = await self.RO.show("ns", RO_nsr_id)
3254
3255 # deploymentStatus
3256 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3257
3258 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3259 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3260 break
3261 except LcmExceptionNoMgmtIP:
3262 pass
3263 else:
3264 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3265 if detailed_status != detailed_status_old:
3266 self._update_suboperation_status(
3267 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3268 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3269 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3270
3271 await asyncio.sleep(5, loop=self.loop)
3272 deployment_timeout -= 5
3273 if deployment_timeout <= 0:
3274 self._update_suboperation_status(
3275 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3276 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3277
3278 # update VDU_SCALING_INFO with the obtained ip_addresses
3279 if vdu_scaling_info["scaling_direction"] == "OUT":
3280 for vdur in reversed(db_vnfr["vdur"]):
3281 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3282 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3283 vdu_scaling_info["vdu"].append({
3284 "name": vdur["name"],
3285 "vdu_id": vdur["vdu-id-ref"],
3286 "interface": []
3287 })
3288 for interface in vdur["interfaces"]:
3289 vdu_scaling_info["vdu"][-1]["interface"].append({
3290 "name": interface["name"],
3291 "ip_address": interface["ip-address"],
3292 "mac_address": interface.get("mac-address"),
3293 })
3294 del vdu_scaling_info["vdu-create"]
3295
3296 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3297 # SCALE RO - END
3298
3299 scale_process = None
3300 if db_nsr_update:
3301 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3302
3303 # POST-SCALE BEGIN
3304 # execute primitive service POST-SCALING
3305 step = "Executing post-scale vnf-config-primitive"
3306 if scaling_descriptor.get("scaling-config-action"):
3307 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3308 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3309 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3310 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3311 step = db_nslcmop_update["detailed-status"] = \
3312 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3313
3314 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3315 if db_vnfr.get("additionalParamsForVnf"):
3316 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3317
3318 # look for primitive
3319 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3320 if config_primitive["name"] == vnf_config_primitive:
3321 break
3322 else:
3323 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3324 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3325 "match any vnf-configuration:config-primitive".format(scaling_group,
3326 config_primitive))
3327 scale_process = "VCA"
3328 db_nsr_update["config-status"] = "configuring post-scaling"
3329 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3330
3331 # Post-scale reintent check: Check if this sub-operation has been executed before
3332 op_index = self._check_or_add_scale_suboperation(
3333 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3334 if op_index == self.SUBOPERATION_STATUS_SKIP:
3335 # Skip sub-operation
3336 result = 'COMPLETED'
3337 result_detail = 'Done'
3338 self.logger.debug(logging_text +
3339 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3340 format(vnf_config_primitive, result, result_detail))
3341 else:
3342 if op_index == self.SUBOPERATION_STATUS_NEW:
3343 # New sub-operation: Get index of this sub-operation
3344 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3345 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3346 format(vnf_config_primitive))
3347 else:
3348 # Reintent: Get registered params for this existing sub-operation
3349 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3350 vnf_index = op.get('member_vnf_index')
3351 vnf_config_primitive = op.get('primitive')
3352 primitive_params = op.get('primitive_params')
3353 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3354 format(vnf_config_primitive))
3355 # Execute the primitive, either with new (first-time) or registered (reintent) args
3356 result, result_detail = await self._ns_execute_primitive(
3357 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3358 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3359 vnf_config_primitive, result, result_detail))
3360 # Update operationState = COMPLETED | FAILED
3361 self._update_suboperation_status(
3362 db_nslcmop, op_index, result, result_detail)
3363
3364 if result == "FAILED":
3365 raise LcmException(result_detail)
3366 db_nsr_update["config-status"] = old_config_status
3367 scale_process = None
3368 # POST-SCALE END
3369
3370 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3371 db_nslcmop_update["statusEnteredTime"] = time()
3372 db_nslcmop_update["detailed-status"] = "done"
3373 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3374 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3375 else old_operational_status
3376 db_nsr_update["config-status"] = old_config_status
3377 return
3378 except (ROclient.ROClientException, DbException, LcmException) as e:
3379 self.logger.error(logging_text + "Exit Exception {}".format(e))
3380 exc = e
3381 except asyncio.CancelledError:
3382 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3383 exc = "Operation was cancelled"
3384 except Exception as e:
3385 exc = traceback.format_exc()
3386 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3387 finally:
3388 self._write_ns_status(
3389 nsr_id=nsr_id,
3390 ns_state=None,
3391 current_operation="IDLE",
3392 current_operation_id=None
3393 )
3394 if exc:
3395 if db_nslcmop:
3396 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3397 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3398 db_nslcmop_update["statusEnteredTime"] = time()
3399 if db_nsr:
3400 db_nsr_update["operational-status"] = old_operational_status
3401 db_nsr_update["config-status"] = old_config_status
3402 db_nsr_update["detailed-status"] = ""
3403 db_nsr_update["_admin.nslcmop"] = None
3404 if scale_process:
3405 if "VCA" in scale_process:
3406 db_nsr_update["config-status"] = "failed"
3407 if "RO" in scale_process:
3408 db_nsr_update["operational-status"] = "failed"
3409 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3410 exc)
3411 try:
3412 if db_nslcmop and db_nslcmop_update:
3413 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3414 if db_nsr:
3415 db_nsr_update["_admin.current-operation"] = None
3416 db_nsr_update["_admin.operation-type"] = None
3417 db_nsr_update["_admin.nslcmop"] = None
3418 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3419
3420 self._write_ns_status(
3421 nsr_id=nsr_id,
3422 ns_state=None,
3423 current_operation="IDLE",
3424 current_operation_id=None
3425 )
3426
3427 except DbException as e:
3428 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3429 if nslcmop_operation_state:
3430 try:
3431 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3432 "operationState": nslcmop_operation_state},
3433 loop=self.loop)
3434 # if cooldown_time:
3435 # await asyncio.sleep(cooldown_time, loop=self.loop)
3436 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3437 except Exception as e:
3438 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3439 self.logger.debug(logging_text + "Exit")
3440 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")