9bc0dba0ca34eb37847348dffa6de781396982f0
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
619 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
620
621 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
622 vdur_RO_count_index = 0
623 if vdur.get("pdu-type"):
624 continue
625 for vdur_RO in get_iterable(vnf_RO, "vms"):
626 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
627 continue
628 if vdur["count-index"] != vdur_RO_count_index:
629 vdur_RO_count_index += 1
630 continue
631 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
632 if vdur_RO.get("ip_address"):
633 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
634 else:
635 vdur["ip-address"] = None
636 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
637 vdur["name"] = vdur_RO.get("vim_name")
638 vdur["status"] = vdur_RO.get("status")
639 vdur["status-detailed"] = vdur_RO.get("error_msg")
640 for ifacer in get_iterable(vdur, "interfaces"):
641 for interface_RO in get_iterable(vdur_RO, "interfaces"):
642 if ifacer["name"] == interface_RO.get("internal_name"):
643 ifacer["ip-address"] = interface_RO.get("ip_address")
644 ifacer["mac-address"] = interface_RO.get("mac_address")
645 break
646 else:
647 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
648 "from VIM info"
649 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
650 vnfr_update["vdur.{}".format(vdu_index)] = vdur
651 break
652 else:
653 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
654 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
655
656 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
657 for net_RO in get_iterable(nsr_desc_RO, "nets"):
658 if vld["id"] != net_RO.get("vnf_net_osm_id"):
659 continue
660 vld["vim-id"] = net_RO.get("vim_net_id")
661 vld["name"] = net_RO.get("vim_name")
662 vld["status"] = net_RO.get("status")
663 vld["status-detailed"] = net_RO.get("error_msg")
664 vnfr_update["vld.{}".format(vld_index)] = vld
665 break
666 else:
667 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
668 vnf_index, vld["id"]))
669
670 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
671 break
672
673 else:
674 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
675
676 def _get_ns_config_info(self, nsr_id):
677 """
678 Generates a mapping between vnf,vdu elements and the N2VC id
679 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
680 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
681 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
682 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
683 """
684 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
685 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
686 mapping = {}
687 ns_config_info = {"osm-config-mapping": mapping}
688 for vca in vca_deployed_list:
689 if not vca["member-vnf-index"]:
690 continue
691 if not vca["vdu_id"]:
692 mapping[vca["member-vnf-index"]] = vca["application"]
693 else:
694 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
695 vca["application"]
696 return ns_config_info
697
698 @staticmethod
699 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
700 """
701 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
702 primitives as verify-ssh-credentials, or config when needed
703 :param desc_primitive_list: information of the descriptor
704 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
705 this element contains a ssh public key
706 :return: The modified list. Can ba an empty list, but always a list
707 """
708 if desc_primitive_list:
709 primitive_list = desc_primitive_list.copy()
710 else:
711 primitive_list = []
712 # look for primitive config, and get the position. None if not present
713 config_position = None
714 for index, primitive in enumerate(primitive_list):
715 if primitive["name"] == "config":
716 config_position = index
717 break
718
719 # for NS, add always a config primitive if not present (bug 874)
720 if not vca_deployed["member-vnf-index"] and config_position is None:
721 primitive_list.insert(0, {"name": "config", "parameter": []})
722 config_position = 0
723 # for VNF/VDU add verify-ssh-credentials after config
724 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
725 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
726 return primitive_list
727
728 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
729 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
730
731 db_nsr_update = {}
732 RO_descriptor_number = 0 # number of descriptors created at RO
733 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
734 start_deploy = time()
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # Check for and optionally request placement optimization. Database will be updated if placement activated
742 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
743
744 # deploy RO
745
746 # get vnfds, instantiate at RO
747
748 for c_vnf in nsd.get("constituent-vnfd", ()):
749 member_vnf_index = c_vnf["member-vnf-index"]
750 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
751 vnfd_ref = vnfd["id"]
752 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
753 " RO".format(vnfd_ref, member_vnf_index)
754 # self.logger.debug(logging_text + step)
755 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
756 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
757 RO_descriptor_number += 1
758
759 # look position at deployed.RO.vnfd if not present it will be appended at the end
760 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
761 if vnf_deployed["member-vnf-index"] == member_vnf_index:
762 break
763 else:
764 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
765 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
766
767 # look if present
768 RO_update = {"member-vnf-index": member_vnf_index}
769 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
770 if vnfd_list:
771 RO_update["id"] = vnfd_list[0]["uuid"]
772 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
773 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
774 else:
775 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
776 get("additionalParamsForVnf"), nsr_id)
777 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
778 RO_update["id"] = desc["uuid"]
779 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
780 vnfd_ref, member_vnf_index, desc["uuid"]))
781 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
782 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
784
785 # create nsd at RO
786 nsd_ref = nsd["id"]
787 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
788 # self.logger.debug(logging_text + step)
789
790 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
791 RO_descriptor_number += 1
792 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
793 if nsd_list:
794 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
795 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
796 nsd_ref, RO_nsd_uuid))
797 else:
798 nsd_RO = deepcopy(nsd)
799 nsd_RO["id"] = RO_osm_nsd_id
800 nsd_RO.pop("_id", None)
801 nsd_RO.pop("_admin", None)
802 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
803 member_vnf_index = c_vnf["member-vnf-index"]
804 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
805 for c_vld in nsd_RO.get("vld", ()):
806 for cp in c_vld.get("vnfd-connection-point-ref", ()):
807 member_vnf_index = cp["member-vnf-index-ref"]
808 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
809
810 desc = await self.RO.create("nsd", descriptor=nsd_RO)
811 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
812 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
813 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
814 self.update_db_2("nsrs", nsr_id, db_nsr_update)
815
816 # Crate ns at RO
817 # if present use it unless in error status
818 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
819 if RO_nsr_id:
820 try:
821 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
822 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
823 desc = await self.RO.show("ns", RO_nsr_id)
824
825 except ROclient.ROClientException as e:
826 if e.http_code != HTTPStatus.NOT_FOUND:
827 raise
828 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
829 if RO_nsr_id:
830 ns_status, ns_status_info = self.RO.check_ns_status(desc)
831 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
832 if ns_status == "ERROR":
833 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
834 .format(RO_nsr_id)
835 self.logger.debug(logging_text + step)
836 await self.RO.delete("ns", RO_nsr_id)
837 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
838 if not RO_nsr_id:
839 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
840 # self.logger.debug(logging_text + step)
841
842 # check if VIM is creating and wait look if previous tasks in process
843 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
844 if task_dependency:
845 step = "Waiting for related tasks to be completed: {}".format(task_name)
846 self.logger.debug(logging_text + step)
847 await asyncio.wait(task_dependency, timeout=3600)
848 if ns_params.get("vnf"):
849 for vnf in ns_params["vnf"]:
850 if "vimAccountId" in vnf:
851 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
852 vnf["vimAccountId"])
853 if task_dependency:
854 step = "Waiting for related tasks to be completed: {}".format(task_name)
855 self.logger.debug(logging_text + step)
856 await asyncio.wait(task_dependency, timeout=3600)
857
858 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
859
860 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
861
862 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
863 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
864 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
865 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
866 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
867 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
868 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
870
871 # wait until NS is ready
872 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
873 detailed_status_old = None
874 self.logger.debug(logging_text + step)
875
876 old_desc = None
877 while time() <= start_deploy + timeout_ns_deploy:
878 desc = await self.RO.show("ns", RO_nsr_id)
879
880 # deploymentStatus
881 if desc != old_desc:
882 # desc has changed => update db
883 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
884 old_desc = desc
885
886 ns_status, ns_status_info = self.RO.check_ns_status(desc)
887 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
888 if ns_status == "ERROR":
889 raise ROclient.ROClientException(ns_status_info)
890 elif ns_status == "BUILD":
891 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
892 elif ns_status == "ACTIVE":
893 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
894 try:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur:
968 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
969 vnfr_id, vdu_id, vdu_index
970 ))
971
972 if vdur.get("status") == "ACTIVE":
973 ip_address = vdur.get("ip-address")
974 if not ip_address:
975 continue
976 target_vdu_id = vdur["vdu-id-ref"]
977 elif vdur.get("status") == "ERROR":
978 raise LcmException("Cannot inject ssh-key because target VM is in error state")
979
980 if not target_vdu_id:
981 continue
982
983 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
984
985 # inject public key into machine
986 if pub_key and user:
987 # self.logger.debug(logging_text + "Inserting RO key")
988 try:
989 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
990 result_dict = await self.RO.create_action(
991 item="ns",
992 item_id_name=ro_nsr_id,
993 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
994 )
995 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
996 if not result_dict or not isinstance(result_dict, dict):
997 raise LcmException("Unknown response from RO when injecting key")
998 for result in result_dict.values():
999 if result.get("vim_result") == 200:
1000 break
1001 else:
1002 raise ROclient.ROClientException("error injecting key: {}".format(
1003 result.get("description")))
1004 break
1005 except ROclient.ROClientException as e:
1006 if not nb_tries:
1007 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1008 format(e, 20*10))
1009 nb_tries += 1
1010 if nb_tries >= 20:
1011 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1012 else:
1013 break
1014
1015 return ip_address
1016
1017 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1018 """
1019 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1020 """
1021 my_vca = vca_deployed_list[vca_index]
1022 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1023 # vdu or kdu: no dependencies
1024 return
1025 timeout = 300
1026 while timeout >= 0:
1027 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1028 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1029 configuration_status_list = db_nsr["configurationStatus"]
1030 for index, vca_deployed in enumerate(configuration_status_list):
1031 if index == vca_index:
1032 # myself
1033 continue
1034 if not my_vca.get("member-vnf-index") or \
1035 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1036 internal_status = configuration_status_list[index].get("status")
1037 if internal_status == 'READY':
1038 continue
1039 elif internal_status == 'BROKEN':
1040 raise LcmException("Configuration aborted because dependent charm/s has failed")
1041 else:
1042 break
1043 else:
1044 # no dependencies, return
1045 return
1046 await asyncio.sleep(10)
1047 timeout -= 1
1048
1049 raise LcmException("Configuration aborted because dependent charm/s timeout")
1050
1051 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1052 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1053 nsr_id = db_nsr["_id"]
1054 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1055 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1056 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1057 db_dict = {
1058 'collection': 'nsrs',
1059 'filter': {'_id': nsr_id},
1060 'path': db_update_entry
1061 }
1062 step = ""
1063 try:
1064
1065 element_type = 'NS'
1066 element_under_configuration = nsr_id
1067
1068 vnfr_id = None
1069 if db_vnfr:
1070 vnfr_id = db_vnfr["_id"]
1071
1072 namespace = "{nsi}.{ns}".format(
1073 nsi=nsi_id if nsi_id else "",
1074 ns=nsr_id)
1075
1076 if vnfr_id:
1077 element_type = 'VNF'
1078 element_under_configuration = vnfr_id
1079 namespace += ".{}".format(vnfr_id)
1080 if vdu_id:
1081 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1082 element_type = 'VDU'
1083 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1084
1085 # Get artifact path
1086 self.fs.sync() # Sync from FSMongo
1087 artifact_path = "{}/{}/charms/{}".format(
1088 base_folder["folder"],
1089 base_folder["pkg-dir"],
1090 config_descriptor["juju"]["charm"]
1091 )
1092
1093 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1094 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1095 is_proxy_charm = False
1096
1097 # n2vc_redesign STEP 3.1
1098
1099 # find old ee_id if exists
1100 ee_id = vca_deployed.get("ee_id")
1101
1102 # create or register execution environment in VCA
1103 if is_proxy_charm:
1104
1105 self._write_configuration_status(
1106 nsr_id=nsr_id,
1107 vca_index=vca_index,
1108 status='CREATING',
1109 element_under_configuration=element_under_configuration,
1110 element_type=element_type
1111 )
1112
1113 step = "create execution environment"
1114 self.logger.debug(logging_text + step)
1115 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1116 reuse_ee_id=ee_id,
1117 db_dict=db_dict)
1118
1119 else:
1120 step = "Waiting to VM being up and getting IP address"
1121 self.logger.debug(logging_text + step)
1122 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1123 user=None, pub_key=None)
1124 credentials = {"hostname": rw_mgmt_ip}
1125 # get username
1126 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1127 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1128 # merged. Meanwhile let's get username from initial-config-primitive
1129 if not username and config_descriptor.get("initial-config-primitive"):
1130 for config_primitive in config_descriptor["initial-config-primitive"]:
1131 for param in config_primitive.get("parameter", ()):
1132 if param["name"] == "ssh-username":
1133 username = param["value"]
1134 break
1135 if not username:
1136 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1137 "'config-access.ssh-access.default-user'")
1138 credentials["username"] = username
1139 # n2vc_redesign STEP 3.2
1140
1141 self._write_configuration_status(
1142 nsr_id=nsr_id,
1143 vca_index=vca_index,
1144 status='REGISTERING',
1145 element_under_configuration=element_under_configuration,
1146 element_type=element_type
1147 )
1148
1149 step = "register execution environment {}".format(credentials)
1150 self.logger.debug(logging_text + step)
1151 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1152 db_dict=db_dict)
1153
1154 # for compatibility with MON/POL modules, the need model and application name at database
1155 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1156 ee_id_parts = ee_id.split('.')
1157 model_name = ee_id_parts[0]
1158 application_name = ee_id_parts[1]
1159 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1160 db_update_entry + "application": application_name,
1161 db_update_entry + "ee_id": ee_id})
1162
1163 # n2vc_redesign STEP 3.3
1164
1165 step = "Install configuration Software"
1166
1167 self._write_configuration_status(
1168 nsr_id=nsr_id,
1169 vca_index=vca_index,
1170 status='INSTALLING SW',
1171 element_under_configuration=element_under_configuration,
1172 element_type=element_type
1173 )
1174
1175 # TODO check if already done
1176 self.logger.debug(logging_text + step)
1177 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1178
1179 # write in db flag of configuration_sw already installed
1180 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1181
1182 # add relations for this VCA (wait for other peers related with this VCA)
1183 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1184
1185 # if SSH access is required, then get execution environment SSH public
1186 if is_proxy_charm: # if native charm we have waited already to VM be UP
1187 pub_key = None
1188 user = None
1189 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1190 # Needed to inject a ssh key
1191 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1192 step = "Install configuration Software, getting public ssh key"
1193 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1194
1195 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1196 else:
1197 step = "Waiting to VM being up and getting IP address"
1198 self.logger.debug(logging_text + step)
1199
1200 # n2vc_redesign STEP 5.1
1201 # wait for RO (ip-address) Insert pub_key into VM
1202 if vnfr_id:
1203 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1204 user=user, pub_key=pub_key)
1205 else:
1206 rw_mgmt_ip = None # This is for a NS configuration
1207
1208 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1209
1210 # store rw_mgmt_ip in deploy params for later replacement
1211 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1212
1213 # n2vc_redesign STEP 6 Execute initial config primitive
1214 step = 'execute initial config primitive'
1215 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1216
1217 # sort initial config primitives by 'seq'
1218 if initial_config_primitive_list:
1219 try:
1220 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1221 except Exception as e:
1222 self.logger.error(logging_text + step + ": " + str(e))
1223 else:
1224 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1225
1226 # add config if not present for NS charm
1227 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1228 vca_deployed)
1229
1230 # wait for dependent primitives execution (NS -> VNF -> VDU)
1231 if initial_config_primitive_list:
1232 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1233
1234 # stage, in function of element type: vdu, kdu, vnf or ns
1235 my_vca = vca_deployed_list[vca_index]
1236 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1237 # VDU or KDU
1238 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1239 elif my_vca.get("member-vnf-index"):
1240 # VNF
1241 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1242 else:
1243 # NS
1244 stage = 'Stage 5/5: running Day-1 primitives for NS'
1245
1246 self._write_configuration_status(
1247 nsr_id=nsr_id,
1248 vca_index=vca_index,
1249 status='EXECUTING PRIMITIVE'
1250 )
1251
1252 self._write_op_status(
1253 op_id=nslcmop_id,
1254 stage=stage
1255 )
1256
1257 for initial_config_primitive in initial_config_primitive_list:
1258 # adding information on the vca_deployed if it is a NS execution environment
1259 if not vca_deployed["member-vnf-index"]:
1260 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1261 # TODO check if already done
1262 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1263
1264 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1265 self.logger.debug(logging_text + step)
1266 await self.n2vc.exec_primitive(
1267 ee_id=ee_id,
1268 primitive_name=initial_config_primitive["name"],
1269 params_dict=primitive_params_,
1270 db_dict=db_dict
1271 )
1272
1273 # TODO register in database that primitive is done
1274
1275 step = "instantiated at VCA"
1276 self.logger.debug(logging_text + step)
1277
1278 self._write_configuration_status(
1279 nsr_id=nsr_id,
1280 vca_index=vca_index,
1281 status='READY'
1282 )
1283
1284 except Exception as e: # TODO not use Exception but N2VC exception
1285 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1286 self._write_configuration_status(
1287 nsr_id=nsr_id,
1288 vca_index=vca_index,
1289 status='BROKEN'
1290 )
1291 raise Exception("{} {}".format(step, e)) from e
1292 # TODO raise N2VC exception with 'step' extra information
1293
1294 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1295 error_description: str = None):
1296 try:
1297 db_dict = dict()
1298 if ns_state:
1299 db_dict["nsState"] = ns_state
1300 db_dict["currentOperation"] = current_operation
1301 db_dict["currentOperationID"] = current_operation_id
1302 db_dict["errorDescription"] = error_description
1303 self.update_db_2("nsrs", nsr_id, db_dict)
1304 except Exception as e:
1305 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1306
1307 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1308 try:
1309 db_dict = dict()
1310 db_dict['queuePosition'] = queuePosition
1311 db_dict['stage'] = stage
1312 if error_message:
1313 db_dict['errorMessage'] = error_message
1314 self.update_db_2("nslcmops", op_id, db_dict)
1315 except Exception as e:
1316 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1317
1318 def _write_all_config_status(self, nsr_id: str, status: str):
1319 try:
1320 # nsrs record
1321 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1322 # configurationStatus
1323 config_status = db_nsr.get('configurationStatus')
1324 if config_status:
1325 # update status
1326 db_dict = dict()
1327 db_dict['configurationStatus'] = list()
1328 for c in config_status:
1329 c['status'] = status
1330 db_dict['configurationStatus'].append(c)
1331 self.update_db_2("nsrs", nsr_id, db_dict)
1332
1333 except Exception as e:
1334 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1335
1336 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1337 element_under_configuration: str = None, element_type: str = None):
1338
1339 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1340 # .format(vca_index, status))
1341
1342 try:
1343 db_path = 'configurationStatus.{}.'.format(vca_index)
1344 db_dict = dict()
1345 if status:
1346 db_dict[db_path + 'status'] = status
1347 if element_under_configuration:
1348 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1349 if element_type:
1350 db_dict[db_path + 'elementType'] = element_type
1351 self.update_db_2("nsrs", nsr_id, db_dict)
1352 except Exception as e:
1353 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1354 .format(status, nsr_id, vca_index, e))
1355
1356 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1357 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1358 if placement_engine == "PLA":
1359 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1360 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1361 db_poll_interval = 5
1362 wait = db_poll_interval * 4
1363 pla_result = None
1364 while not pla_result and wait >= 0:
1365 await asyncio.sleep(db_poll_interval)
1366 wait -= db_poll_interval
1367 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1368 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1369
1370 if not pla_result:
1371 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1372
1373 for pla_vnf in pla_result['vnf']:
1374 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1375 if not pla_vnf.get('vimAccountId') or not vnfr:
1376 continue
1377 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1378 return
1379
1380 def update_nsrs_with_pla_result(self, params):
1381 try:
1382 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1383 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1384 except Exception as e:
1385 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1386
1387 async def instantiate(self, nsr_id, nslcmop_id):
1388 """
1389
1390 :param nsr_id: ns instance to deploy
1391 :param nslcmop_id: operation to run
1392 :return:
1393 """
1394
1395 # Try to lock HA task here
1396 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1397 if not task_is_locked_by_me:
1398 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1399 return
1400
1401 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1402 self.logger.debug(logging_text + "Enter")
1403
1404 # get all needed from database
1405
1406 # database nsrs record
1407 db_nsr = None
1408
1409 # database nslcmops record
1410 db_nslcmop = None
1411
1412 # update operation on nsrs
1413 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1414 "_admin.current-operation": nslcmop_id,
1415 "_admin.operation-type": "instantiate"}
1416 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1417
1418 # update operation on nslcmops
1419 db_nslcmop_update = {}
1420
1421 nslcmop_operation_state = None
1422 db_vnfrs = {} # vnf's info indexed by member-index
1423 # n2vc_info = {}
1424 task_instantiation_list = []
1425 task_instantiation_info = {} # from task to info text
1426 exc = None
1427 try:
1428 # wait for any previous tasks in process
1429 step = "Waiting for previous operations to terminate"
1430 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1431
1432 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1433
1434 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1435 self._write_ns_status(
1436 nsr_id=nsr_id,
1437 ns_state="BUILDING",
1438 current_operation="INSTANTIATING",
1439 current_operation_id=nslcmop_id
1440 )
1441
1442 # read from db: operation
1443 step = "Getting nslcmop={} from db".format(nslcmop_id)
1444 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1445 ns_params = db_nslcmop.get("operationParams")
1446 if ns_params and ns_params.get("timeout_ns_deploy"):
1447 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1448 else:
1449 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1450
1451 # read from db: ns
1452 step = "Getting nsr={} from db".format(nsr_id)
1453 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1454 # nsd is replicated into ns (no db read)
1455 nsd = db_nsr["nsd"]
1456 # nsr_name = db_nsr["name"] # TODO short-name??
1457
1458 # read from db: vnf's of this ns
1459 step = "Getting vnfrs from db"
1460 self.logger.debug(logging_text + step)
1461 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1462
1463 # read from db: vnfd's for every vnf
1464 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1465 db_vnfds = {} # every vnfd data indexed by vnf id
1466 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1467
1468 self._write_op_status(
1469 op_id=nslcmop_id,
1470 stage='Stage 1/5: preparation of the environment',
1471 queuePosition=0
1472 )
1473
1474 # for each vnf in ns, read vnfd
1475 for vnfr in db_vnfrs_list:
1476 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1477 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1478 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1479 # if we haven't this vnfd, read it from db
1480 if vnfd_id not in db_vnfds:
1481 # read from db
1482 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1483 self.logger.debug(logging_text + step)
1484 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1485
1486 # store vnfd
1487 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1488 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1489 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1490
1491 # Get or generates the _admin.deployed.VCA list
1492 vca_deployed_list = None
1493 if db_nsr["_admin"].get("deployed"):
1494 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1495 if vca_deployed_list is None:
1496 vca_deployed_list = []
1497 configuration_status_list = []
1498 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1499 db_nsr_update["configurationStatus"] = configuration_status_list
1500 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1501 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1502 elif isinstance(vca_deployed_list, dict):
1503 # maintain backward compatibility. Change a dict to list at database
1504 vca_deployed_list = list(vca_deployed_list.values())
1505 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1506 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1507
1508 db_nsr_update["detailed-status"] = "creating"
1509 db_nsr_update["operational-status"] = "init"
1510
1511 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1512 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1513 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1514
1515 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1516 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1517 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1518
1519 # n2vc_redesign STEP 2 Deploy Network Scenario
1520
1521 self._write_op_status(
1522 op_id=nslcmop_id,
1523 stage='Stage 2/5: deployment of VMs and execution environments'
1524 )
1525
1526 self.logger.debug(logging_text + "Before deploy_kdus")
1527 # Call to deploy_kdus in case exists the "vdu:kdu" param
1528 task_kdu = asyncio.ensure_future(
1529 self.deploy_kdus(
1530 logging_text=logging_text,
1531 nsr_id=nsr_id,
1532 db_nsr=db_nsr,
1533 db_vnfrs=db_vnfrs,
1534 db_vnfds=db_vnfds
1535 )
1536 )
1537 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1538 task_instantiation_info[task_kdu] = "Deploy KDUs"
1539 task_instantiation_list.append(task_kdu)
1540 # n2vc_redesign STEP 1 Get VCA public ssh-key
1541 # feature 1429. Add n2vc public key to needed VMs
1542 n2vc_key = self.n2vc.get_public_key()
1543 n2vc_key_list = [n2vc_key]
1544 if self.vca_config.get("public_key"):
1545 n2vc_key_list.append(self.vca_config["public_key"])
1546
1547 task_ro = asyncio.ensure_future(
1548 self.instantiate_RO(
1549 logging_text=logging_text,
1550 nsr_id=nsr_id,
1551 nsd=nsd,
1552 db_nsr=db_nsr,
1553 db_nslcmop=db_nslcmop,
1554 db_vnfrs=db_vnfrs,
1555 db_vnfds_ref=db_vnfds_ref,
1556 n2vc_key_list=n2vc_key_list
1557 )
1558 )
1559 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1560 task_instantiation_info[task_ro] = "Deploy at VIM"
1561 task_instantiation_list.append(task_ro)
1562
1563 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1564 step = "Deploying proxy and native charms"
1565 self.logger.debug(logging_text + step)
1566
1567 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1568 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1569 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1570 vnfd_id = c_vnf["vnfd-id-ref"]
1571 vnfd = db_vnfds_ref[vnfd_id]
1572 member_vnf_index = str(c_vnf["member-vnf-index"])
1573 db_vnfr = db_vnfrs[member_vnf_index]
1574 base_folder = vnfd["_admin"]["storage"]
1575 vdu_id = None
1576 vdu_index = 0
1577 vdu_name = None
1578 kdu_name = None
1579
1580 # Get additional parameters
1581 deploy_params = {}
1582 if db_vnfr.get("additionalParamsForVnf"):
1583 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1584
1585 descriptor_config = vnfd.get("vnf-configuration")
1586 if descriptor_config and descriptor_config.get("juju"):
1587 self._deploy_n2vc(
1588 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1589 db_nsr=db_nsr,
1590 db_vnfr=db_vnfr,
1591 nslcmop_id=nslcmop_id,
1592 nsr_id=nsr_id,
1593 nsi_id=nsi_id,
1594 vnfd_id=vnfd_id,
1595 vdu_id=vdu_id,
1596 kdu_name=kdu_name,
1597 member_vnf_index=member_vnf_index,
1598 vdu_index=vdu_index,
1599 vdu_name=vdu_name,
1600 deploy_params=deploy_params,
1601 descriptor_config=descriptor_config,
1602 base_folder=base_folder,
1603 task_instantiation_list=task_instantiation_list,
1604 task_instantiation_info=task_instantiation_info
1605 )
1606
1607 # Deploy charms for each VDU that supports one.
1608 for vdud in get_iterable(vnfd, 'vdu'):
1609 vdu_id = vdud["id"]
1610 descriptor_config = vdud.get('vdu-configuration')
1611 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1612 if vdur.get("additionalParams"):
1613 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1614 else:
1615 deploy_params_vdu = deploy_params
1616 if descriptor_config and descriptor_config.get("juju"):
1617 # look for vdu index in the db_vnfr["vdu"] section
1618 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1619 # if vdur["vdu-id-ref"] == vdu_id:
1620 # break
1621 # else:
1622 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1623 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1624 # vdu_name = vdur.get("name")
1625 vdu_name = None
1626 kdu_name = None
1627 for vdu_index in range(int(vdud.get("count", 1))):
1628 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1629 self._deploy_n2vc(
1630 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1631 member_vnf_index, vdu_id, vdu_index),
1632 db_nsr=db_nsr,
1633 db_vnfr=db_vnfr,
1634 nslcmop_id=nslcmop_id,
1635 nsr_id=nsr_id,
1636 nsi_id=nsi_id,
1637 vnfd_id=vnfd_id,
1638 vdu_id=vdu_id,
1639 kdu_name=kdu_name,
1640 member_vnf_index=member_vnf_index,
1641 vdu_index=vdu_index,
1642 vdu_name=vdu_name,
1643 deploy_params=deploy_params_vdu,
1644 descriptor_config=descriptor_config,
1645 base_folder=base_folder,
1646 task_instantiation_list=task_instantiation_list,
1647 task_instantiation_info=task_instantiation_info
1648 )
1649 for kdud in get_iterable(vnfd, 'kdu'):
1650 kdu_name = kdud["name"]
1651 descriptor_config = kdud.get('kdu-configuration')
1652 if descriptor_config and descriptor_config.get("juju"):
1653 vdu_id = None
1654 vdu_index = 0
1655 vdu_name = None
1656 # look for vdu index in the db_vnfr["vdu"] section
1657 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1658 # if vdur["vdu-id-ref"] == vdu_id:
1659 # break
1660 # else:
1661 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1662 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1663 # vdu_name = vdur.get("name")
1664 # vdu_name = None
1665
1666 self._deploy_n2vc(
1667 logging_text=logging_text,
1668 db_nsr=db_nsr,
1669 db_vnfr=db_vnfr,
1670 nslcmop_id=nslcmop_id,
1671 nsr_id=nsr_id,
1672 nsi_id=nsi_id,
1673 vnfd_id=vnfd_id,
1674 vdu_id=vdu_id,
1675 kdu_name=kdu_name,
1676 member_vnf_index=member_vnf_index,
1677 vdu_index=vdu_index,
1678 vdu_name=vdu_name,
1679 deploy_params=deploy_params,
1680 descriptor_config=descriptor_config,
1681 base_folder=base_folder,
1682 task_instantiation_list=task_instantiation_list,
1683 task_instantiation_info=task_instantiation_info
1684 )
1685
1686 # Check if this NS has a charm configuration
1687 descriptor_config = nsd.get("ns-configuration")
1688 if descriptor_config and descriptor_config.get("juju"):
1689 vnfd_id = None
1690 db_vnfr = None
1691 member_vnf_index = None
1692 vdu_id = None
1693 kdu_name = None
1694 vdu_index = 0
1695 vdu_name = None
1696
1697 # Get additional parameters
1698 deploy_params = {}
1699 if db_nsr.get("additionalParamsForNs"):
1700 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1701 base_folder = nsd["_admin"]["storage"]
1702 self._deploy_n2vc(
1703 logging_text=logging_text,
1704 db_nsr=db_nsr,
1705 db_vnfr=db_vnfr,
1706 nslcmop_id=nslcmop_id,
1707 nsr_id=nsr_id,
1708 nsi_id=nsi_id,
1709 vnfd_id=vnfd_id,
1710 vdu_id=vdu_id,
1711 kdu_name=kdu_name,
1712 member_vnf_index=member_vnf_index,
1713 vdu_index=vdu_index,
1714 vdu_name=vdu_name,
1715 deploy_params=deploy_params,
1716 descriptor_config=descriptor_config,
1717 base_folder=base_folder,
1718 task_instantiation_list=task_instantiation_list,
1719 task_instantiation_info=task_instantiation_info
1720 )
1721
1722 # Wait until all tasks of "task_instantiation_list" have been finished
1723
1724 error_text_list = []
1725
1726 # let's begin with all OK
1727 instantiated_ok = True
1728 # let's begin with RO 'running' status (later we can change it)
1729 db_nsr_update["operational-status"] = "running"
1730 # let's begin with VCA 'configured' status (later we can change it)
1731 db_nsr_update["config-status"] = "configured"
1732
1733 step = "Waiting for tasks to be finished"
1734 if task_instantiation_list:
1735 # wait for all tasks completion
1736 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1737
1738 for task in pending:
1739 instantiated_ok = False
1740 if task in (task_ro, task_kdu):
1741 # RO or KDU task is pending
1742 db_nsr_update["operational-status"] = "failed"
1743 else:
1744 # A N2VC task is pending
1745 db_nsr_update["config-status"] = "failed"
1746 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1747 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1748 for task in done:
1749 if task.cancelled():
1750 instantiated_ok = False
1751 if task in (task_ro, task_kdu):
1752 # RO or KDU task was cancelled
1753 db_nsr_update["operational-status"] = "failed"
1754 else:
1755 # A N2VC was cancelled
1756 db_nsr_update["config-status"] = "failed"
1757 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1758 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1759 else:
1760 exc = task.exception()
1761 if exc:
1762 instantiated_ok = False
1763 if task in (task_ro, task_kdu):
1764 # RO or KDU task raised an exception
1765 db_nsr_update["operational-status"] = "failed"
1766 else:
1767 # A N2VC task raised an exception
1768 db_nsr_update["config-status"] = "failed"
1769 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1770
1771 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1772 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1773 else:
1774 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1775 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1776 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1777 else:
1778 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1779
1780 if error_text_list:
1781 error_text = "\n".join(error_text_list)
1782 db_nsr_update["detailed-status"] = error_text
1783 db_nslcmop_update["detailed-status"] = error_text
1784 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1785 db_nslcmop_update["statusEnteredTime"] = time()
1786 else:
1787 # all is done
1788 db_nsr_update["detailed-status"] = "done"
1789 db_nslcmop_update["detailed-status"] = "done"
1790 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1791 db_nslcmop_update["statusEnteredTime"] = time()
1792
1793 except (ROclient.ROClientException, DbException, LcmException) as e:
1794 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1795 exc = e
1796 except asyncio.CancelledError:
1797 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1798 exc = "Operation was cancelled"
1799 except Exception as e:
1800 exc = traceback.format_exc()
1801 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1802 exc_info=True)
1803 finally:
1804 if exc:
1805 if db_nsr:
1806 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1807 db_nsr_update["operational-status"] = "failed"
1808 db_nsr_update["config-status"] = "failed"
1809 if db_nslcmop:
1810 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1811 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1812 db_nslcmop_update["statusEnteredTime"] = time()
1813 try:
1814 if db_nsr:
1815 db_nsr_update["_admin.nslcmop"] = None
1816 db_nsr_update["_admin.current-operation"] = None
1817 db_nsr_update["_admin.operation-type"] = None
1818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1819
1820 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1821 ns_state = None
1822 error_description = None
1823 if instantiated_ok:
1824 ns_state = "READY"
1825 else:
1826 ns_state = "BROKEN"
1827 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1828
1829 self._write_ns_status(
1830 nsr_id=nsr_id,
1831 ns_state=ns_state,
1832 current_operation="IDLE",
1833 current_operation_id=None,
1834 error_description=error_description
1835 )
1836
1837 self._write_op_status(
1838 op_id=nslcmop_id,
1839 error_message=error_description
1840 )
1841
1842 if db_nslcmop_update:
1843 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1844
1845 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1846
1847 except DbException as e:
1848 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1849
1850 if nslcmop_operation_state:
1851 try:
1852 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1853 "operationState": nslcmop_operation_state},
1854 loop=self.loop)
1855 except Exception as e:
1856 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1857
1858 self.logger.debug(logging_text + "Exit")
1859 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1860
1861 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1862
1863 # steps:
1864 # 1. find all relations for this VCA
1865 # 2. wait for other peers related
1866 # 3. add relations
1867
1868 try:
1869
1870 # STEP 1: find all relations for this VCA
1871
1872 # read nsr record
1873 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1874
1875 # this VCA data
1876 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1877
1878 # read all ns-configuration relations
1879 ns_relations = list()
1880 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1881 if db_ns_relations:
1882 for r in db_ns_relations:
1883 # check if this VCA is in the relation
1884 if my_vca.get('member-vnf-index') in\
1885 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1886 ns_relations.append(r)
1887
1888 # read all vnf-configuration relations
1889 vnf_relations = list()
1890 db_vnfd_list = db_nsr.get('vnfd-id')
1891 if db_vnfd_list:
1892 for vnfd in db_vnfd_list:
1893 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1894 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1895 if db_vnf_relations:
1896 for r in db_vnf_relations:
1897 # check if this VCA is in the relation
1898 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1899 vnf_relations.append(r)
1900
1901 # if no relations, terminate
1902 if not ns_relations and not vnf_relations:
1903 self.logger.debug(logging_text + ' No relations')
1904 return True
1905
1906 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1907
1908 # add all relations
1909 start = time()
1910 while True:
1911 # check timeout
1912 now = time()
1913 if now - start >= timeout:
1914 self.logger.error(logging_text + ' : timeout adding relations')
1915 return False
1916
1917 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1918 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1919
1920 # for each defined NS relation, find the VCA's related
1921 for r in ns_relations:
1922 from_vca_ee_id = None
1923 to_vca_ee_id = None
1924 from_vca_endpoint = None
1925 to_vca_endpoint = None
1926 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1927 for vca in vca_list:
1928 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1929 and vca.get('config_sw_installed'):
1930 from_vca_ee_id = vca.get('ee_id')
1931 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1932 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1933 and vca.get('config_sw_installed'):
1934 to_vca_ee_id = vca.get('ee_id')
1935 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1936 if from_vca_ee_id and to_vca_ee_id:
1937 # add relation
1938 await self.n2vc.add_relation(
1939 ee_id_1=from_vca_ee_id,
1940 ee_id_2=to_vca_ee_id,
1941 endpoint_1=from_vca_endpoint,
1942 endpoint_2=to_vca_endpoint)
1943 # remove entry from relations list
1944 ns_relations.remove(r)
1945 else:
1946 # check failed peers
1947 try:
1948 vca_status_list = db_nsr.get('configurationStatus')
1949 if vca_status_list:
1950 for i in range(len(vca_list)):
1951 vca = vca_list[i]
1952 vca_status = vca_status_list[i]
1953 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1954 if vca_status.get('status') == 'BROKEN':
1955 # peer broken: remove relation from list
1956 ns_relations.remove(r)
1957 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1958 if vca_status.get('status') == 'BROKEN':
1959 # peer broken: remove relation from list
1960 ns_relations.remove(r)
1961 except Exception:
1962 # ignore
1963 pass
1964
1965 # for each defined VNF relation, find the VCA's related
1966 for r in vnf_relations:
1967 from_vca_ee_id = None
1968 to_vca_ee_id = None
1969 from_vca_endpoint = None
1970 to_vca_endpoint = None
1971 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1972 for vca in vca_list:
1973 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1974 from_vca_ee_id = vca.get('ee_id')
1975 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1976 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1977 to_vca_ee_id = vca.get('ee_id')
1978 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1979 if from_vca_ee_id and to_vca_ee_id:
1980 # add relation
1981 await self.n2vc.add_relation(
1982 ee_id_1=from_vca_ee_id,
1983 ee_id_2=to_vca_ee_id,
1984 endpoint_1=from_vca_endpoint,
1985 endpoint_2=to_vca_endpoint)
1986 # remove entry from relations list
1987 vnf_relations.remove(r)
1988 else:
1989 # check failed peers
1990 try:
1991 vca_status_list = db_nsr.get('configurationStatus')
1992 if vca_status_list:
1993 for i in range(len(vca_list)):
1994 vca = vca_list[i]
1995 vca_status = vca_status_list[i]
1996 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
1997 if vca_status.get('status') == 'BROKEN':
1998 # peer broken: remove relation from list
1999 ns_relations.remove(r)
2000 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2001 if vca_status.get('status') == 'BROKEN':
2002 # peer broken: remove relation from list
2003 ns_relations.remove(r)
2004 except Exception:
2005 # ignore
2006 pass
2007
2008 # wait for next try
2009 await asyncio.sleep(5.0)
2010
2011 if not ns_relations and not vnf_relations:
2012 self.logger.debug('Relations added')
2013 break
2014
2015 return True
2016
2017 except Exception as e:
2018 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2019 return False
2020
2021 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
2022 # Launch kdus if present in the descriptor
2023
2024 deployed_ok = True
2025
2026 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2027
2028 def _get_cluster_id(cluster_id, cluster_type):
2029 nonlocal k8scluster_id_2_uuic
2030 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2031 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2032
2033 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2034 if not db_k8scluster:
2035 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2036 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2037 if not k8s_id:
2038 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2039 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2040 return k8s_id
2041
2042 logging_text += "Deploy kdus: "
2043 try:
2044 db_nsr_update = {"_admin.deployed.K8s": []}
2045 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2046
2047 # Look for all vnfds
2048 pending_tasks = {}
2049 index = 0
2050 for vnfr_data in db_vnfrs.values():
2051 for kdur in get_iterable(vnfr_data, "kdur"):
2052 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2053 kdumodel = None
2054 k8sclustertype = None
2055 error_text = None
2056 cluster_uuid = None
2057 vnfd_id = vnfr_data.get('vnfd-id')
2058 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2059 if kdur.get("helm-chart"):
2060 kdumodel = kdur["helm-chart"]
2061 k8sclustertype = "chart"
2062 k8sclustertype_full = "helm-chart"
2063 elif kdur.get("juju-bundle"):
2064 kdumodel = kdur["juju-bundle"]
2065 k8sclustertype = "juju"
2066 k8sclustertype_full = "juju-bundle"
2067 else:
2068 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
2069 " running"
2070 # check if kdumodel is a file and exists
2071 try:
2072 # path format: /vnfdid/pkkdir/kdumodel
2073 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
2074 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2075 kdumodel = self.fs.path + filename
2076 except Exception:
2077 # it is not a file
2078 pass
2079 try:
2080 if not error_text:
2081 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
2082 except LcmException as e:
2083 error_text = str(e)
2084 deployed_ok = False
2085
2086 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
2087
2088 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2089 "k8scluster-type": k8sclustertype,
2090 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2091 if error_text:
2092 k8s_instace_info["detailed-status"] = error_text
2093 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2094 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2095 if error_text:
2096 continue
2097
2098 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
2099 "{}".format(index)}
2100 if k8sclustertype == "chart":
2101 task = asyncio.ensure_future(
2102 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2103 params=desc_params, db_dict=db_dict, timeout=3600)
2104 )
2105 else:
2106 task = asyncio.ensure_future(
2107 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2108 atomic=True, params=desc_params,
2109 db_dict=db_dict, timeout=600,
2110 kdu_name=kdur["kdu-name"])
2111 )
2112
2113 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
2114 index += 1
2115
2116 if pending_tasks:
2117 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2118 pending_list = list(pending_tasks.keys())
2119 while pending_list:
2120 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
2121 return_when=asyncio.FIRST_COMPLETED)
2122 if not done_list: # timeout
2123 for task in pending_list:
2124 db_nsr_update[pending_tasks[task] + "detailed-status"] = "Timeout"
2125 deployed_ok = False
2126 break
2127 for task in done_list:
2128 exc = task.exception()
2129 if exc:
2130 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
2131 deployed_ok = False
2132 else:
2133 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
2134
2135 if not deployed_ok:
2136 raise LcmException('Cannot deploy KDUs')
2137
2138 except Exception as e:
2139 msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
2140 self.logger.error(msg)
2141 raise LcmException(msg)
2142 finally:
2143 if db_nsr_update:
2144 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2145
2146 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2147 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2148 base_folder, task_instantiation_list, task_instantiation_info):
2149 # launch instantiate_N2VC in a asyncio task and register task object
2150 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2151 # if not found, create one entry and update database
2152
2153 # fill db_nsr._admin.deployed.VCA.<index>
2154 vca_index = -1
2155 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2156 if not vca_deployed:
2157 continue
2158 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2159 vca_deployed.get("vdu_id") == vdu_id and \
2160 vca_deployed.get("kdu_name") == kdu_name and \
2161 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2162 break
2163 else:
2164 # not found, create one.
2165 vca_deployed = {
2166 "member-vnf-index": member_vnf_index,
2167 "vdu_id": vdu_id,
2168 "kdu_name": kdu_name,
2169 "vdu_count_index": vdu_index,
2170 "operational-status": "init", # TODO revise
2171 "detailed-status": "", # TODO revise
2172 "step": "initial-deploy", # TODO revise
2173 "vnfd_id": vnfd_id,
2174 "vdu_name": vdu_name,
2175 }
2176 vca_index += 1
2177
2178 # create VCA and configurationStatus in db
2179 db_dict = {
2180 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2181 "configurationStatus.{}".format(vca_index): dict()
2182 }
2183 self.update_db_2("nsrs", nsr_id, db_dict)
2184
2185 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2186
2187 # Launch task
2188 task_n2vc = asyncio.ensure_future(
2189 self.instantiate_N2VC(
2190 logging_text=logging_text,
2191 vca_index=vca_index,
2192 nsi_id=nsi_id,
2193 db_nsr=db_nsr,
2194 db_vnfr=db_vnfr,
2195 vdu_id=vdu_id,
2196 kdu_name=kdu_name,
2197 vdu_index=vdu_index,
2198 deploy_params=deploy_params,
2199 config_descriptor=descriptor_config,
2200 base_folder=base_folder,
2201 nslcmop_id=nslcmop_id
2202 )
2203 )
2204 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2205 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2206 task_instantiation_list.append(task_n2vc)
2207
2208 # Check if this VNFD has a configured terminate action
2209 def _has_terminate_config_primitive(self, vnfd):
2210 vnf_config = vnfd.get("vnf-configuration")
2211 if vnf_config and vnf_config.get("terminate-config-primitive"):
2212 return True
2213 else:
2214 return False
2215
2216 @staticmethod
2217 def _get_terminate_config_primitive_seq_list(vnfd):
2218 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2219 # No need to check for existing primitive twice, already done before
2220 vnf_config = vnfd.get("vnf-configuration")
2221 seq_list = vnf_config.get("terminate-config-primitive")
2222 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2223 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2224 return seq_list_sorted
2225
2226 @staticmethod
2227 def _create_nslcmop(nsr_id, operation, params):
2228 """
2229 Creates a ns-lcm-opp content to be stored at database.
2230 :param nsr_id: internal id of the instance
2231 :param operation: instantiate, terminate, scale, action, ...
2232 :param params: user parameters for the operation
2233 :return: dictionary following SOL005 format
2234 """
2235 # Raise exception if invalid arguments
2236 if not (nsr_id and operation and params):
2237 raise LcmException(
2238 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2239 now = time()
2240 _id = str(uuid4())
2241 nslcmop = {
2242 "id": _id,
2243 "_id": _id,
2244 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2245 "operationState": "PROCESSING",
2246 "statusEnteredTime": now,
2247 "nsInstanceId": nsr_id,
2248 "lcmOperationType": operation,
2249 "startTime": now,
2250 "isAutomaticInvocation": False,
2251 "operationParams": params,
2252 "isCancelPending": False,
2253 "links": {
2254 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2255 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2256 }
2257 }
2258 return nslcmop
2259
2260 def _format_additional_params(self, params):
2261 params = params or {}
2262 for key, value in params.items():
2263 if str(value).startswith("!!yaml "):
2264 params[key] = yaml.safe_load(value[7:])
2265 return params
2266
2267 def _get_terminate_primitive_params(self, seq, vnf_index):
2268 primitive = seq.get('name')
2269 primitive_params = {}
2270 params = {
2271 "member_vnf_index": vnf_index,
2272 "primitive": primitive,
2273 "primitive_params": primitive_params,
2274 }
2275 desc_params = {}
2276 return self._map_primitive_params(seq, params, desc_params)
2277
2278 # sub-operations
2279
2280 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2281 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2282 if (op.get('operationState') == 'COMPLETED'):
2283 # b. Skip sub-operation
2284 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2285 return self.SUBOPERATION_STATUS_SKIP
2286 else:
2287 # c. Reintent executing sub-operation
2288 # The sub-operation exists, and operationState != 'COMPLETED'
2289 # Update operationState = 'PROCESSING' to indicate a reintent.
2290 operationState = 'PROCESSING'
2291 detailed_status = 'In progress'
2292 self._update_suboperation_status(
2293 db_nslcmop, op_index, operationState, detailed_status)
2294 # Return the sub-operation index
2295 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2296 # with arguments extracted from the sub-operation
2297 return op_index
2298
2299 # Find a sub-operation where all keys in a matching dictionary must match
2300 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2301 def _find_suboperation(self, db_nslcmop, match):
2302 if (db_nslcmop and match):
2303 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2304 for i, op in enumerate(op_list):
2305 if all(op.get(k) == match[k] for k in match):
2306 return i
2307 return self.SUBOPERATION_STATUS_NOT_FOUND
2308
2309 # Update status for a sub-operation given its index
2310 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2311 # Update DB for HA tasks
2312 q_filter = {'_id': db_nslcmop['_id']}
2313 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2314 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2315 self.db.set_one("nslcmops",
2316 q_filter=q_filter,
2317 update_dict=update_dict,
2318 fail_on_empty=False)
2319
2320 # Add sub-operation, return the index of the added sub-operation
2321 # Optionally, set operationState, detailed-status, and operationType
2322 # Status and type are currently set for 'scale' sub-operations:
2323 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2324 # 'detailed-status' : status message
2325 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2326 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2327 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2328 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2329 RO_nsr_id=None, RO_scaling_info=None):
2330 if not (db_nslcmop):
2331 return self.SUBOPERATION_STATUS_NOT_FOUND
2332 # Get the "_admin.operations" list, if it exists
2333 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2334 op_list = db_nslcmop_admin.get('operations')
2335 # Create or append to the "_admin.operations" list
2336 new_op = {'member_vnf_index': vnf_index,
2337 'vdu_id': vdu_id,
2338 'vdu_count_index': vdu_count_index,
2339 'primitive': primitive,
2340 'primitive_params': mapped_primitive_params}
2341 if operationState:
2342 new_op['operationState'] = operationState
2343 if detailed_status:
2344 new_op['detailed-status'] = detailed_status
2345 if operationType:
2346 new_op['lcmOperationType'] = operationType
2347 if RO_nsr_id:
2348 new_op['RO_nsr_id'] = RO_nsr_id
2349 if RO_scaling_info:
2350 new_op['RO_scaling_info'] = RO_scaling_info
2351 if not op_list:
2352 # No existing operations, create key 'operations' with current operation as first list element
2353 db_nslcmop_admin.update({'operations': [new_op]})
2354 op_list = db_nslcmop_admin.get('operations')
2355 else:
2356 # Existing operations, append operation to list
2357 op_list.append(new_op)
2358
2359 db_nslcmop_update = {'_admin.operations': op_list}
2360 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2361 op_index = len(op_list) - 1
2362 return op_index
2363
2364 # Helper methods for scale() sub-operations
2365
2366 # pre-scale/post-scale:
2367 # Check for 3 different cases:
2368 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2369 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2370 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2371 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2372 operationType, RO_nsr_id=None, RO_scaling_info=None):
2373 # Find this sub-operation
2374 if (RO_nsr_id and RO_scaling_info):
2375 operationType = 'SCALE-RO'
2376 match = {
2377 'member_vnf_index': vnf_index,
2378 'RO_nsr_id': RO_nsr_id,
2379 'RO_scaling_info': RO_scaling_info,
2380 }
2381 else:
2382 match = {
2383 'member_vnf_index': vnf_index,
2384 'primitive': vnf_config_primitive,
2385 'primitive_params': primitive_params,
2386 'lcmOperationType': operationType
2387 }
2388 op_index = self._find_suboperation(db_nslcmop, match)
2389 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2390 # a. New sub-operation
2391 # The sub-operation does not exist, add it.
2392 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2393 # The following parameters are set to None for all kind of scaling:
2394 vdu_id = None
2395 vdu_count_index = None
2396 vdu_name = None
2397 if (RO_nsr_id and RO_scaling_info):
2398 vnf_config_primitive = None
2399 primitive_params = None
2400 else:
2401 RO_nsr_id = None
2402 RO_scaling_info = None
2403 # Initial status for sub-operation
2404 operationState = 'PROCESSING'
2405 detailed_status = 'In progress'
2406 # Add sub-operation for pre/post-scaling (zero or more operations)
2407 self._add_suboperation(db_nslcmop,
2408 vnf_index,
2409 vdu_id,
2410 vdu_count_index,
2411 vdu_name,
2412 vnf_config_primitive,
2413 primitive_params,
2414 operationState,
2415 detailed_status,
2416 operationType,
2417 RO_nsr_id,
2418 RO_scaling_info)
2419 return self.SUBOPERATION_STATUS_NEW
2420 else:
2421 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2422 # or op_index (operationState != 'COMPLETED')
2423 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2424
2425 # Function to return execution_environment id
2426
2427 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2428 for vca in vca_deployed_list:
2429 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2430 return vca["ee_id"]
2431
2432 # Helper methods for terminate()
2433
2434 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2435 """ Create a primitive with params from VNFD
2436 Called from terminate() before deleting instance
2437 Calls action() to execute the primitive """
2438 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2439 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2440 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2441 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2442 db_vnfds = {}
2443 # Loop over VNFRs
2444 for vnfr in db_vnfrs_list:
2445 vnfd_id = vnfr["vnfd-id"]
2446 vnf_index = vnfr["member-vnf-index-ref"]
2447 if vnfd_id not in db_vnfds:
2448 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2449 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2450 db_vnfds[vnfd_id] = vnfd
2451 vnfd = db_vnfds[vnfd_id]
2452 if not self._has_terminate_config_primitive(vnfd):
2453 continue
2454 # Get the primitive's sorted sequence list
2455 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2456 for seq in seq_list:
2457 # For each sequence in list, get primitive and call _ns_execute_primitive()
2458 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2459 vnf_index, seq.get("name"))
2460 self.logger.debug(logging_text + step)
2461 # Create the primitive for each sequence, i.e. "primitive": "touch"
2462 primitive = seq.get('name')
2463 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2464 # The following 3 parameters are currently set to None for 'terminate':
2465 # vdu_id, vdu_count_index, vdu_name
2466 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2467 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2468 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2469 # Add sub-operation
2470 self._add_suboperation(db_nslcmop,
2471 nslcmop_id,
2472 vnf_index,
2473 vdu_id,
2474 vdu_count_index,
2475 vdu_name,
2476 primitive,
2477 mapped_primitive_params)
2478 # Sub-operations: Call _ns_execute_primitive() instead of action()
2479 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2480 # nsr_deployed = db_nsr["_admin"]["deployed"]
2481
2482 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2483 # nsr_id, nslcmop_terminate_action_id)
2484 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2485 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2486 # if result not in result_ok:
2487 # raise LcmException(
2488 # "terminate_primitive_action for vnf_member_index={}",
2489 # " primitive={} fails with error {}".format(
2490 # vnf_index, seq.get("name"), result_detail))
2491
2492 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2493 try:
2494 await self.n2vc.exec_primitive(
2495 ee_id=ee_id,
2496 primitive_name=primitive,
2497 params_dict=mapped_primitive_params
2498 )
2499 except Exception as e:
2500 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2501 raise LcmException(
2502 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2503 .format(vnf_index, seq.get("name"), e),
2504 )
2505
2506 async def _delete_N2VC(self, nsr_id: str):
2507 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2508 namespace = "." + nsr_id
2509 await self.n2vc.delete_namespace(namespace=namespace)
2510 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2511
2512 async def terminate(self, nsr_id, nslcmop_id):
2513
2514 # Try to lock HA task here
2515 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2516 if not task_is_locked_by_me:
2517 return
2518
2519 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2520 self.logger.debug(logging_text + "Enter")
2521 db_nsr = None
2522 db_nslcmop = None
2523 exc = None
2524 failed_detail = [] # annotates all failed error messages
2525 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2526 "_admin.current-operation": nslcmop_id,
2527 "_admin.operation-type": "terminate"}
2528 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2529 db_nslcmop_update = {}
2530 nslcmop_operation_state = None
2531 autoremove = False # autoremove after terminated
2532 pending_tasks = []
2533 try:
2534 # wait for any previous tasks in process
2535 step = "Waiting for previous operations to terminate"
2536 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2537
2538 self._write_ns_status(
2539 nsr_id=nsr_id,
2540 ns_state="TERMINATING",
2541 current_operation="TERMINATING",
2542 current_operation_id=nslcmop_id
2543 )
2544 self._write_op_status(
2545 op_id=nslcmop_id,
2546 queuePosition=0
2547 )
2548
2549 step = "Getting nslcmop={} from db".format(nslcmop_id)
2550 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2551 step = "Getting nsr={} from db".format(nsr_id)
2552 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2553 # nsd = db_nsr["nsd"]
2554 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2555 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2556 return
2557 # #TODO check if VIM is creating and wait
2558 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2559 # Call internal terminate action
2560 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2561
2562 pending_tasks = []
2563
2564 db_nsr_update["operational-status"] = "terminating"
2565 db_nsr_update["config-status"] = "terminating"
2566
2567 # remove NS
2568 try:
2569 step = "delete execution environment"
2570 self.logger.debug(logging_text + step)
2571
2572 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2573 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2574
2575 pending_tasks.append(task_delete_ee)
2576 except Exception as e:
2577 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2578 self.logger.error(msg)
2579 failed_detail.append(msg)
2580
2581 try:
2582 # Delete from k8scluster
2583 step = "delete kdus"
2584 self.logger.debug(logging_text + step)
2585 # print(nsr_deployed)
2586 if nsr_deployed:
2587 for kdu in nsr_deployed.get("K8s", ()):
2588 kdu_instance = kdu.get("kdu-instance")
2589 if not kdu_instance:
2590 continue
2591 if kdu.get("k8scluster-type") == "chart":
2592 task_delete_kdu_instance = asyncio.ensure_future(
2593 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2594 kdu_instance=kdu_instance))
2595 elif kdu.get("k8scluster-type") == "juju":
2596 task_delete_kdu_instance = asyncio.ensure_future(
2597 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2598 kdu_instance=kdu_instance))
2599 else:
2600 self.error(logging_text + "Unknown k8s deployment type {}".
2601 format(kdu.get("k8scluster-type")))
2602 continue
2603 pending_tasks.append(task_delete_kdu_instance)
2604 except LcmException as e:
2605 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2606 self.logger.error(msg)
2607 failed_detail.append(msg)
2608
2609 # remove from RO
2610 RO_fail = False
2611
2612 # Delete ns
2613 RO_nsr_id = RO_delete_action = None
2614 if nsr_deployed and nsr_deployed.get("RO"):
2615 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2616 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2617 try:
2618 if RO_nsr_id:
2619 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2620 "Deleting ns from VIM"
2621 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2622 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2623 self.logger.debug(logging_text + step)
2624 desc = await self.RO.delete("ns", RO_nsr_id)
2625 RO_delete_action = desc["action_id"]
2626 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2627 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2628 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2629 if RO_delete_action:
2630 # wait until NS is deleted from VIM
2631 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2632 format(RO_nsr_id, RO_delete_action)
2633 detailed_status_old = None
2634 self.logger.debug(logging_text + step)
2635
2636 delete_timeout = 20 * 60 # 20 minutes
2637 while delete_timeout > 0:
2638 desc = await self.RO.show(
2639 "ns",
2640 item_id_name=RO_nsr_id,
2641 extra_item="action",
2642 extra_item_id=RO_delete_action)
2643
2644 # deploymentStatus
2645 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2646
2647 ns_status, ns_status_info = self.RO.check_action_status(desc)
2648 if ns_status == "ERROR":
2649 raise ROclient.ROClientException(ns_status_info)
2650 elif ns_status == "BUILD":
2651 detailed_status = step + "; {}".format(ns_status_info)
2652 elif ns_status == "ACTIVE":
2653 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2654 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2655 break
2656 else:
2657 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2658 if detailed_status != detailed_status_old:
2659 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2660 db_nsr_update["detailed-status"] = detailed_status
2661 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2662 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2663 await asyncio.sleep(5, loop=self.loop)
2664 delete_timeout -= 5
2665 else: # delete_timeout <= 0:
2666 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2667
2668 except ROclient.ROClientException as e:
2669 if e.http_code == 404: # not found
2670 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2671 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2672 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2673 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2674 elif e.http_code == 409: # conflict
2675 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2676 self.logger.debug(logging_text + failed_detail[-1])
2677 RO_fail = True
2678 else:
2679 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2680 self.logger.error(logging_text + failed_detail[-1])
2681 RO_fail = True
2682
2683 # Delete nsd
2684 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2685 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2686 try:
2687 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2688 "Deleting nsd from RO"
2689 await self.RO.delete("nsd", RO_nsd_id)
2690 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2691 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2692 except ROclient.ROClientException as e:
2693 if e.http_code == 404: # not found
2694 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2695 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2696 elif e.http_code == 409: # conflict
2697 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2698 self.logger.debug(logging_text + failed_detail[-1])
2699 RO_fail = True
2700 else:
2701 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2702 self.logger.error(logging_text + failed_detail[-1])
2703 RO_fail = True
2704
2705 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2706 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2707 if not vnf_deployed or not vnf_deployed["id"]:
2708 continue
2709 try:
2710 RO_vnfd_id = vnf_deployed["id"]
2711 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2712 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2713 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2714 await self.RO.delete("vnfd", RO_vnfd_id)
2715 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2716 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2717 except ROclient.ROClientException as e:
2718 if e.http_code == 404: # not found
2719 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2720 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2721 elif e.http_code == 409: # conflict
2722 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2723 self.logger.debug(logging_text + failed_detail[-1])
2724 else:
2725 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2726 self.logger.error(logging_text + failed_detail[-1])
2727
2728 if failed_detail:
2729 terminate_ok = False
2730 self.logger.error(logging_text + " ;".join(failed_detail))
2731 db_nsr_update["operational-status"] = "failed"
2732 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2733 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2734 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2735 db_nslcmop_update["statusEnteredTime"] = time()
2736 else:
2737 terminate_ok = True
2738 db_nsr_update["operational-status"] = "terminated"
2739 db_nsr_update["detailed-status"] = "Done"
2740 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2741 db_nslcmop_update["detailed-status"] = "Done"
2742 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2743 db_nslcmop_update["statusEnteredTime"] = time()
2744 if db_nslcmop["operationParams"].get("autoremove"):
2745 autoremove = True
2746
2747 except (ROclient.ROClientException, DbException, LcmException) as e:
2748 self.logger.error(logging_text + "Exit Exception {}".format(e))
2749 exc = e
2750 except asyncio.CancelledError:
2751 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2752 exc = "Operation was cancelled"
2753 except Exception as e:
2754 exc = traceback.format_exc()
2755 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2756 finally:
2757 if exc and db_nslcmop:
2758 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2759 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2760 db_nslcmop_update["statusEnteredTime"] = time()
2761 try:
2762 if db_nslcmop and db_nslcmop_update:
2763 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2764 if db_nsr:
2765 db_nsr_update["_admin.nslcmop"] = None
2766 db_nsr_update["_admin.current-operation"] = None
2767 db_nsr_update["_admin.operation-type"] = None
2768 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2769
2770 if terminate_ok:
2771 ns_state = "IDLE"
2772 error_description = None
2773 error_detail = None
2774 else:
2775 ns_state = "BROKEN"
2776 error_detail = "; ".join(failed_detail)
2777 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2778 .format(nslcmop_id, step, error_detail)
2779
2780 self._write_ns_status(
2781 nsr_id=nsr_id,
2782 ns_state=ns_state,
2783 current_operation="IDLE",
2784 current_operation_id=None,
2785 error_description=error_description
2786 )
2787
2788 self._write_op_status(
2789 op_id=nslcmop_id,
2790 error_message=error_description
2791 )
2792
2793 except DbException as e:
2794 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2795 if nslcmop_operation_state:
2796 try:
2797 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2798 "operationState": nslcmop_operation_state,
2799 "autoremove": autoremove},
2800 loop=self.loop)
2801 except Exception as e:
2802 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2803
2804 # wait for pending tasks
2805 done = None
2806 pending = None
2807 if pending_tasks:
2808 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2809 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2810 if not pending:
2811 self.logger.debug(logging_text + 'All tasks finished...')
2812 else:
2813 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2814
2815 self.logger.debug(logging_text + "Exit")
2816 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2817
2818 @staticmethod
2819 def _map_primitive_params(primitive_desc, params, instantiation_params):
2820 """
2821 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2822 The default-value is used. If it is between < > it look for a value at instantiation_params
2823 :param primitive_desc: portion of VNFD/NSD that describes primitive
2824 :param params: Params provided by user
2825 :param instantiation_params: Instantiation params provided by user
2826 :return: a dictionary with the calculated params
2827 """
2828 calculated_params = {}
2829 for parameter in primitive_desc.get("parameter", ()):
2830 param_name = parameter["name"]
2831 if param_name in params:
2832 calculated_params[param_name] = params[param_name]
2833 elif "default-value" in parameter or "value" in parameter:
2834 if "value" in parameter:
2835 calculated_params[param_name] = parameter["value"]
2836 else:
2837 calculated_params[param_name] = parameter["default-value"]
2838 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2839 and calculated_params[param_name].endswith(">"):
2840 if calculated_params[param_name][1:-1] in instantiation_params:
2841 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2842 else:
2843 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2844 format(calculated_params[param_name], primitive_desc["name"]))
2845 else:
2846 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2847 format(param_name, primitive_desc["name"]))
2848
2849 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2850 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2851 width=256)
2852 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2853 calculated_params[param_name] = calculated_params[param_name][7:]
2854
2855 # add always ns_config_info if primitive name is config
2856 if primitive_desc["name"] == "config":
2857 if "ns_config_info" in instantiation_params:
2858 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2859 return calculated_params
2860
2861 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2862 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2863
2864 # find vca_deployed record for this action
2865 try:
2866 for vca_deployed in db_deployed["VCA"]:
2867 if not vca_deployed:
2868 continue
2869 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2870 continue
2871 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2872 continue
2873 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2874 continue
2875 break
2876 else:
2877 # vca_deployed not found
2878 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2879 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2880
2881 # get ee_id
2882 ee_id = vca_deployed.get("ee_id")
2883 if not ee_id:
2884 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2885 "execution environment"
2886 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2887
2888 if primitive == "config":
2889 primitive_params = {"params": primitive_params}
2890
2891 while retries >= 0:
2892 try:
2893 output = await self.n2vc.exec_primitive(
2894 ee_id=ee_id,
2895 primitive_name=primitive,
2896 params_dict=primitive_params
2897 )
2898 # execution was OK
2899 break
2900 except Exception as e:
2901 retries -= 1
2902 if retries >= 0:
2903 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2904 # wait and retry
2905 await asyncio.sleep(retries_interval, loop=self.loop)
2906 else:
2907 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2908
2909 return output, 'OK'
2910
2911 except Exception as e:
2912 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2913
2914 async def action(self, nsr_id, nslcmop_id):
2915
2916 # Try to lock HA task here
2917 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2918 if not task_is_locked_by_me:
2919 return
2920
2921 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2922 self.logger.debug(logging_text + "Enter")
2923 # get all needed from database
2924 db_nsr = None
2925 db_nslcmop = None
2926 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2927 "_admin.current-operation": nslcmop_id,
2928 "_admin.operation-type": "action"}
2929 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2930 db_nslcmop_update = {}
2931 nslcmop_operation_state = None
2932 nslcmop_operation_state_detail = None
2933 exc = None
2934 try:
2935 # wait for any previous tasks in process
2936 step = "Waiting for previous operations to terminate"
2937 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2938
2939 self._write_ns_status(
2940 nsr_id=nsr_id,
2941 ns_state=None,
2942 current_operation="RUNNING ACTION",
2943 current_operation_id=nslcmop_id
2944 )
2945
2946 step = "Getting information from database"
2947 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2948 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2949
2950 nsr_deployed = db_nsr["_admin"].get("deployed")
2951 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2952 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2953 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2954 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2955 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2956
2957 if vnf_index:
2958 step = "Getting vnfr from database"
2959 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2960 step = "Getting vnfd from database"
2961 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2962 else:
2963 if db_nsr.get("nsd"):
2964 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2965 else:
2966 step = "Getting nsd from database"
2967 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2968
2969 # for backward compatibility
2970 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2971 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2972 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2973 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2974
2975 primitive = db_nslcmop["operationParams"]["primitive"]
2976 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2977
2978 # look for primitive
2979 config_primitive_desc = None
2980 if vdu_id:
2981 for vdu in get_iterable(db_vnfd, "vdu"):
2982 if vdu_id == vdu["id"]:
2983 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2984 if config_primitive["name"] == primitive:
2985 config_primitive_desc = config_primitive
2986 break
2987 elif kdu_name:
2988 self.logger.debug(logging_text + "Checking actions in KDUs")
2989 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2990 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2991 if primitive_params:
2992 desc_params.update(primitive_params)
2993 # TODO Check if we will need something at vnf level
2994 index = 0
2995 for kdu in get_iterable(nsr_deployed, "K8s"):
2996 if kdu_name == kdu["kdu-name"]:
2997 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2998 "path": "_admin.deployed.K8s.{}".format(index)}
2999 if primitive == "upgrade":
3000 if desc_params.get("kdu_model"):
3001 kdu_model = desc_params.get("kdu_model")
3002 del desc_params["kdu_model"]
3003 else:
3004 kdu_model = kdu.get("kdu-model")
3005 parts = kdu_model.split(sep=":")
3006 if len(parts) == 2:
3007 kdu_model = parts[0]
3008
3009 if kdu.get("k8scluster-type") == "chart":
3010 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3011 kdu_instance=kdu.get("kdu-instance"),
3012 atomic=True, kdu_model=kdu_model,
3013 params=desc_params, db_dict=db_dict,
3014 timeout=300)
3015 elif kdu.get("k8scluster-type") == "juju":
3016 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3017 kdu_instance=kdu.get("kdu-instance"),
3018 atomic=True, kdu_model=kdu_model,
3019 params=desc_params, db_dict=db_dict,
3020 timeout=300)
3021
3022 else:
3023 msg = "k8scluster-type not defined"
3024 raise LcmException(msg)
3025
3026 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3027 break
3028 elif primitive == "rollback":
3029 if kdu.get("k8scluster-type") == "chart":
3030 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3031 kdu_instance=kdu.get("kdu-instance"),
3032 db_dict=db_dict)
3033 elif kdu.get("k8scluster-type") == "juju":
3034 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3035 kdu_instance=kdu.get("kdu-instance"),
3036 db_dict=db_dict)
3037 else:
3038 msg = "k8scluster-type not defined"
3039 raise LcmException(msg)
3040 break
3041 elif primitive == "status":
3042 if kdu.get("k8scluster-type") == "chart":
3043 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3044 kdu_instance=kdu.get("kdu-instance"))
3045 elif kdu.get("k8scluster-type") == "juju":
3046 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3047 kdu_instance=kdu.get("kdu-instance"))
3048 else:
3049 msg = "k8scluster-type not defined"
3050 raise LcmException(msg)
3051 break
3052 index += 1
3053
3054 else:
3055 raise LcmException("KDU '{}' not found".format(kdu_name))
3056 if output:
3057 db_nslcmop_update["detailed-status"] = output
3058 db_nslcmop_update["operationState"] = 'COMPLETED'
3059 db_nslcmop_update["statusEnteredTime"] = time()
3060 else:
3061 db_nslcmop_update["detailed-status"] = ''
3062 db_nslcmop_update["operationState"] = 'FAILED'
3063 db_nslcmop_update["statusEnteredTime"] = time()
3064 return
3065 elif vnf_index:
3066 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3067 if config_primitive["name"] == primitive:
3068 config_primitive_desc = config_primitive
3069 break
3070 else:
3071 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3072 if config_primitive["name"] == primitive:
3073 config_primitive_desc = config_primitive
3074 break
3075
3076 if not config_primitive_desc:
3077 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3078 format(primitive))
3079
3080 desc_params = {}
3081 if vnf_index:
3082 if db_vnfr.get("additionalParamsForVnf"):
3083 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3084 if vdu_id:
3085 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3086 if vdur.get("additionalParams"):
3087 desc_params = self._format_additional_params(vdur["additionalParams"])
3088 else:
3089 if db_nsr.get("additionalParamsForNs"):
3090 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3091
3092 # TODO check if ns is in a proper status
3093 output, detail = await self._ns_execute_primitive(
3094 db_deployed=nsr_deployed,
3095 member_vnf_index=vnf_index,
3096 vdu_id=vdu_id,
3097 vdu_name=vdu_name,
3098 vdu_count_index=vdu_count_index,
3099 primitive=primitive,
3100 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3101
3102 detailed_status = output
3103 if detail == 'OK':
3104 result = 'COMPLETED'
3105 else:
3106 result = 'FAILED'
3107
3108 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3109 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3110 db_nslcmop_update["statusEnteredTime"] = time()
3111 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3112 return # database update is called inside finally
3113
3114 except (DbException, LcmException) as e:
3115 self.logger.error(logging_text + "Exit Exception {}".format(e))
3116 exc = e
3117 except asyncio.CancelledError:
3118 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3119 exc = "Operation was cancelled"
3120 except Exception as e:
3121 exc = traceback.format_exc()
3122 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3123 finally:
3124 if exc and db_nslcmop:
3125 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3126 "FAILED {}: {}".format(step, exc)
3127 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3128 db_nslcmop_update["statusEnteredTime"] = time()
3129 try:
3130 if db_nslcmop_update:
3131 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3132 if db_nsr:
3133 db_nsr_update["_admin.nslcmop"] = None
3134 db_nsr_update["_admin.operation-type"] = None
3135 db_nsr_update["_admin.nslcmop"] = None
3136 db_nsr_update["_admin.current-operation"] = None
3137 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3138 self._write_ns_status(
3139 nsr_id=nsr_id,
3140 ns_state=None,
3141 current_operation="IDLE",
3142 current_operation_id=None
3143 )
3144 if exc:
3145 self._write_op_status(
3146 op_id=nslcmop_id,
3147 error_message=nslcmop_operation_state_detail
3148 )
3149 except DbException as e:
3150 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3151 self.logger.debug(logging_text + "Exit")
3152 if nslcmop_operation_state:
3153 try:
3154 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3155 "operationState": nslcmop_operation_state},
3156 loop=self.loop)
3157 except Exception as e:
3158 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3159 self.logger.debug(logging_text + "Exit")
3160 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3161 return nslcmop_operation_state, nslcmop_operation_state_detail
3162
3163 async def scale(self, nsr_id, nslcmop_id):
3164
3165 # Try to lock HA task here
3166 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3167 if not task_is_locked_by_me:
3168 return
3169
3170 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3171 self.logger.debug(logging_text + "Enter")
3172 # get all needed from database
3173 db_nsr = None
3174 db_nslcmop = None
3175 db_nslcmop_update = {}
3176 nslcmop_operation_state = None
3177 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3178 "_admin.current-operation": nslcmop_id,
3179 "_admin.operation-type": "scale"}
3180 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3181 exc = None
3182 # in case of error, indicates what part of scale was failed to put nsr at error status
3183 scale_process = None
3184 old_operational_status = ""
3185 old_config_status = ""
3186 vnfr_scaled = False
3187 try:
3188 # wait for any previous tasks in process
3189 step = "Waiting for previous operations to terminate"
3190 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3191
3192 self._write_ns_status(
3193 nsr_id=nsr_id,
3194 ns_state=None,
3195 current_operation="SCALING",
3196 current_operation_id=nslcmop_id
3197 )
3198
3199 step = "Getting nslcmop from database"
3200 self.logger.debug(step + " after having waited for previous tasks to be completed")
3201 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3202 step = "Getting nsr from database"
3203 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3204
3205 old_operational_status = db_nsr["operational-status"]
3206 old_config_status = db_nsr["config-status"]
3207 step = "Parsing scaling parameters"
3208 # self.logger.debug(step)
3209 db_nsr_update["operational-status"] = "scaling"
3210 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3211 nsr_deployed = db_nsr["_admin"].get("deployed")
3212
3213 #######
3214 nsr_deployed = db_nsr["_admin"].get("deployed")
3215 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3216 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3217 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3218 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3219 #######
3220
3221 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3222 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3223 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3224 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3225 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3226
3227 # for backward compatibility
3228 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3229 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3230 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3231 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3232
3233 step = "Getting vnfr from database"
3234 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3235 step = "Getting vnfd from database"
3236 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3237
3238 step = "Getting scaling-group-descriptor"
3239 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3240 if scaling_descriptor["name"] == scaling_group:
3241 break
3242 else:
3243 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3244 "at vnfd:scaling-group-descriptor".format(scaling_group))
3245
3246 # cooldown_time = 0
3247 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3248 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3249 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3250 # break
3251
3252 # TODO check if ns is in a proper status
3253 step = "Sending scale order to VIM"
3254 nb_scale_op = 0
3255 if not db_nsr["_admin"].get("scaling-group"):
3256 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3257 admin_scale_index = 0
3258 else:
3259 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3260 if admin_scale_info["name"] == scaling_group:
3261 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3262 break
3263 else: # not found, set index one plus last element and add new entry with the name
3264 admin_scale_index += 1
3265 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3266 RO_scaling_info = []
3267 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3268 if scaling_type == "SCALE_OUT":
3269 # count if max-instance-count is reached
3270 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3271 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3272 if nb_scale_op >= max_instance_count:
3273 raise LcmException("reached the limit of {} (max-instance-count) "
3274 "scaling-out operations for the "
3275 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3276
3277 nb_scale_op += 1
3278 vdu_scaling_info["scaling_direction"] = "OUT"
3279 vdu_scaling_info["vdu-create"] = {}
3280 for vdu_scale_info in scaling_descriptor["vdu"]:
3281 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3282 "type": "create", "count": vdu_scale_info.get("count", 1)})
3283 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3284
3285 elif scaling_type == "SCALE_IN":
3286 # count if min-instance-count is reached
3287 min_instance_count = 0
3288 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3289 min_instance_count = int(scaling_descriptor["min-instance-count"])
3290 if nb_scale_op <= min_instance_count:
3291 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3292 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3293 nb_scale_op -= 1
3294 vdu_scaling_info["scaling_direction"] = "IN"
3295 vdu_scaling_info["vdu-delete"] = {}
3296 for vdu_scale_info in scaling_descriptor["vdu"]:
3297 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3298 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3299 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3300
3301 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3302 vdu_create = vdu_scaling_info.get("vdu-create")
3303 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3304 if vdu_scaling_info["scaling_direction"] == "IN":
3305 for vdur in reversed(db_vnfr["vdur"]):
3306 if vdu_delete.get(vdur["vdu-id-ref"]):
3307 vdu_delete[vdur["vdu-id-ref"]] -= 1
3308 vdu_scaling_info["vdu"].append({
3309 "name": vdur["name"],
3310 "vdu_id": vdur["vdu-id-ref"],
3311 "interface": []
3312 })
3313 for interface in vdur["interfaces"]:
3314 vdu_scaling_info["vdu"][-1]["interface"].append({
3315 "name": interface["name"],
3316 "ip_address": interface["ip-address"],
3317 "mac_address": interface.get("mac-address"),
3318 })
3319 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3320
3321 # PRE-SCALE BEGIN
3322 step = "Executing pre-scale vnf-config-primitive"
3323 if scaling_descriptor.get("scaling-config-action"):
3324 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3325 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3326 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3327 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3328 step = db_nslcmop_update["detailed-status"] = \
3329 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3330
3331 # look for primitive
3332 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3333 if config_primitive["name"] == vnf_config_primitive:
3334 break
3335 else:
3336 raise LcmException(
3337 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3338 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3339 "primitive".format(scaling_group, config_primitive))
3340
3341 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3342 if db_vnfr.get("additionalParamsForVnf"):
3343 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3344
3345 scale_process = "VCA"
3346 db_nsr_update["config-status"] = "configuring pre-scaling"
3347 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3348
3349 # Pre-scale reintent check: Check if this sub-operation has been executed before
3350 op_index = self._check_or_add_scale_suboperation(
3351 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3352 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3353 # Skip sub-operation
3354 result = 'COMPLETED'
3355 result_detail = 'Done'
3356 self.logger.debug(logging_text +
3357 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3358 vnf_config_primitive, result, result_detail))
3359 else:
3360 if (op_index == self.SUBOPERATION_STATUS_NEW):
3361 # New sub-operation: Get index of this sub-operation
3362 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3363 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3364 format(vnf_config_primitive))
3365 else:
3366 # Reintent: Get registered params for this existing sub-operation
3367 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3368 vnf_index = op.get('member_vnf_index')
3369 vnf_config_primitive = op.get('primitive')
3370 primitive_params = op.get('primitive_params')
3371 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3372 format(vnf_config_primitive))
3373 # Execute the primitive, either with new (first-time) or registered (reintent) args
3374 result, result_detail = await self._ns_execute_primitive(
3375 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3376 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3377 vnf_config_primitive, result, result_detail))
3378 # Update operationState = COMPLETED | FAILED
3379 self._update_suboperation_status(
3380 db_nslcmop, op_index, result, result_detail)
3381
3382 if result == "FAILED":
3383 raise LcmException(result_detail)
3384 db_nsr_update["config-status"] = old_config_status
3385 scale_process = None
3386 # PRE-SCALE END
3387
3388 # SCALE RO - BEGIN
3389 # Should this block be skipped if 'RO_nsr_id' == None ?
3390 # if (RO_nsr_id and RO_scaling_info):
3391 if RO_scaling_info:
3392 scale_process = "RO"
3393 # Scale RO reintent check: Check if this sub-operation has been executed before
3394 op_index = self._check_or_add_scale_suboperation(
3395 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3396 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3397 # Skip sub-operation
3398 result = 'COMPLETED'
3399 result_detail = 'Done'
3400 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3401 result, result_detail))
3402 else:
3403 if (op_index == self.SUBOPERATION_STATUS_NEW):
3404 # New sub-operation: Get index of this sub-operation
3405 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3406 self.logger.debug(logging_text + "New sub-operation RO")
3407 else:
3408 # Reintent: Get registered params for this existing sub-operation
3409 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3410 RO_nsr_id = op.get('RO_nsr_id')
3411 RO_scaling_info = op.get('RO_scaling_info')
3412 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3413 vnf_config_primitive))
3414
3415 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3416 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3417 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3418 # wait until ready
3419 RO_nslcmop_id = RO_desc["instance_action_id"]
3420 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3421
3422 RO_task_done = False
3423 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3424 detailed_status_old = None
3425 self.logger.debug(logging_text + step)
3426
3427 deployment_timeout = 1 * 3600 # One hour
3428 while deployment_timeout > 0:
3429 if not RO_task_done:
3430 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3431 extra_item_id=RO_nslcmop_id)
3432
3433 # deploymentStatus
3434 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3435
3436 ns_status, ns_status_info = self.RO.check_action_status(desc)
3437 if ns_status == "ERROR":
3438 raise ROclient.ROClientException(ns_status_info)
3439 elif ns_status == "BUILD":
3440 detailed_status = step + "; {}".format(ns_status_info)
3441 elif ns_status == "ACTIVE":
3442 RO_task_done = True
3443 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3444 self.logger.debug(logging_text + step)
3445 else:
3446 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3447 else:
3448
3449 if ns_status == "ERROR":
3450 raise ROclient.ROClientException(ns_status_info)
3451 elif ns_status == "BUILD":
3452 detailed_status = step + "; {}".format(ns_status_info)
3453 elif ns_status == "ACTIVE":
3454 step = detailed_status = \
3455 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3456 if not vnfr_scaled:
3457 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3458 vnfr_scaled = True
3459 try:
3460 desc = await self.RO.show("ns", RO_nsr_id)
3461
3462 # deploymentStatus
3463 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3464
3465 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3466 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3467 break
3468 except LcmExceptionNoMgmtIP:
3469 pass
3470 else:
3471 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3472 if detailed_status != detailed_status_old:
3473 self._update_suboperation_status(
3474 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3475 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3476 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3477
3478 await asyncio.sleep(5, loop=self.loop)
3479 deployment_timeout -= 5
3480 if deployment_timeout <= 0:
3481 self._update_suboperation_status(
3482 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3483 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3484
3485 # update VDU_SCALING_INFO with the obtained ip_addresses
3486 if vdu_scaling_info["scaling_direction"] == "OUT":
3487 for vdur in reversed(db_vnfr["vdur"]):
3488 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3489 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3490 vdu_scaling_info["vdu"].append({
3491 "name": vdur["name"],
3492 "vdu_id": vdur["vdu-id-ref"],
3493 "interface": []
3494 })
3495 for interface in vdur["interfaces"]:
3496 vdu_scaling_info["vdu"][-1]["interface"].append({
3497 "name": interface["name"],
3498 "ip_address": interface["ip-address"],
3499 "mac_address": interface.get("mac-address"),
3500 })
3501 del vdu_scaling_info["vdu-create"]
3502
3503 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3504 # SCALE RO - END
3505
3506 scale_process = None
3507 if db_nsr_update:
3508 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3509
3510 # POST-SCALE BEGIN
3511 # execute primitive service POST-SCALING
3512 step = "Executing post-scale vnf-config-primitive"
3513 if scaling_descriptor.get("scaling-config-action"):
3514 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3515 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3516 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3517 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3518 step = db_nslcmop_update["detailed-status"] = \
3519 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3520
3521 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3522 if db_vnfr.get("additionalParamsForVnf"):
3523 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3524
3525 # look for primitive
3526 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3527 if config_primitive["name"] == vnf_config_primitive:
3528 break
3529 else:
3530 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3531 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3532 "match any vnf-configuration:config-primitive".format(scaling_group,
3533 config_primitive))
3534 scale_process = "VCA"
3535 db_nsr_update["config-status"] = "configuring post-scaling"
3536 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3537
3538 # Post-scale reintent check: Check if this sub-operation has been executed before
3539 op_index = self._check_or_add_scale_suboperation(
3540 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3541 if op_index == self.SUBOPERATION_STATUS_SKIP:
3542 # Skip sub-operation
3543 result = 'COMPLETED'
3544 result_detail = 'Done'
3545 self.logger.debug(logging_text +
3546 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3547 format(vnf_config_primitive, result, result_detail))
3548 else:
3549 if op_index == self.SUBOPERATION_STATUS_NEW:
3550 # New sub-operation: Get index of this sub-operation
3551 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3552 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3553 format(vnf_config_primitive))
3554 else:
3555 # Reintent: Get registered params for this existing sub-operation
3556 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3557 vnf_index = op.get('member_vnf_index')
3558 vnf_config_primitive = op.get('primitive')
3559 primitive_params = op.get('primitive_params')
3560 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3561 format(vnf_config_primitive))
3562 # Execute the primitive, either with new (first-time) or registered (reintent) args
3563 result, result_detail = await self._ns_execute_primitive(
3564 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3565 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3566 vnf_config_primitive, result, result_detail))
3567 # Update operationState = COMPLETED | FAILED
3568 self._update_suboperation_status(
3569 db_nslcmop, op_index, result, result_detail)
3570
3571 if result == "FAILED":
3572 raise LcmException(result_detail)
3573 db_nsr_update["config-status"] = old_config_status
3574 scale_process = None
3575 # POST-SCALE END
3576
3577 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3578 db_nslcmop_update["statusEnteredTime"] = time()
3579 db_nslcmop_update["detailed-status"] = "done"
3580 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3581 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3582 else old_operational_status
3583 db_nsr_update["config-status"] = old_config_status
3584 return
3585 except (ROclient.ROClientException, DbException, LcmException) as e:
3586 self.logger.error(logging_text + "Exit Exception {}".format(e))
3587 exc = e
3588 except asyncio.CancelledError:
3589 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3590 exc = "Operation was cancelled"
3591 except Exception as e:
3592 exc = traceback.format_exc()
3593 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3594 finally:
3595 self._write_ns_status(
3596 nsr_id=nsr_id,
3597 ns_state=None,
3598 current_operation="IDLE",
3599 current_operation_id=None
3600 )
3601 if exc:
3602 if db_nslcmop:
3603 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3604 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3605 db_nslcmop_update["statusEnteredTime"] = time()
3606 if db_nsr:
3607 db_nsr_update["operational-status"] = old_operational_status
3608 db_nsr_update["config-status"] = old_config_status
3609 db_nsr_update["detailed-status"] = ""
3610 db_nsr_update["_admin.nslcmop"] = None
3611 if scale_process:
3612 if "VCA" in scale_process:
3613 db_nsr_update["config-status"] = "failed"
3614 if "RO" in scale_process:
3615 db_nsr_update["operational-status"] = "failed"
3616 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3617 exc)
3618 try:
3619 if db_nslcmop and db_nslcmop_update:
3620 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3621 if db_nsr:
3622 db_nsr_update["_admin.current-operation"] = None
3623 db_nsr_update["_admin.operation-type"] = None
3624 db_nsr_update["_admin.nslcmop"] = None
3625 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3626
3627 self._write_ns_status(
3628 nsr_id=nsr_id,
3629 ns_state=None,
3630 current_operation="IDLE",
3631 current_operation_id=None
3632 )
3633
3634 except DbException as e:
3635 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3636 if nslcmop_operation_state:
3637 try:
3638 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3639 "operationState": nslcmop_operation_state},
3640 loop=self.loop)
3641 # if cooldown_time:
3642 # await asyncio.sleep(cooldown_time, loop=self.loop)
3643 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3644 except Exception as e:
3645 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3646 self.logger.debug(logging_text + "Exit")
3647 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")