fix issues with k8scluster and repos
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
619 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
620
621 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
622 vdur_RO_count_index = 0
623 if vdur.get("pdu-type"):
624 continue
625 for vdur_RO in get_iterable(vnf_RO, "vms"):
626 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
627 continue
628 if vdur["count-index"] != vdur_RO_count_index:
629 vdur_RO_count_index += 1
630 continue
631 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
632 if vdur_RO.get("ip_address"):
633 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
634 else:
635 vdur["ip-address"] = None
636 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
637 vdur["name"] = vdur_RO.get("vim_name")
638 vdur["status"] = vdur_RO.get("status")
639 vdur["status-detailed"] = vdur_RO.get("error_msg")
640 for ifacer in get_iterable(vdur, "interfaces"):
641 for interface_RO in get_iterable(vdur_RO, "interfaces"):
642 if ifacer["name"] == interface_RO.get("internal_name"):
643 ifacer["ip-address"] = interface_RO.get("ip_address")
644 ifacer["mac-address"] = interface_RO.get("mac_address")
645 break
646 else:
647 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
648 "from VIM info"
649 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
650 vnfr_update["vdur.{}".format(vdu_index)] = vdur
651 break
652 else:
653 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
654 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
655
656 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
657 for net_RO in get_iterable(nsr_desc_RO, "nets"):
658 if vld["id"] != net_RO.get("vnf_net_osm_id"):
659 continue
660 vld["vim-id"] = net_RO.get("vim_net_id")
661 vld["name"] = net_RO.get("vim_name")
662 vld["status"] = net_RO.get("status")
663 vld["status-detailed"] = net_RO.get("error_msg")
664 vnfr_update["vld.{}".format(vld_index)] = vld
665 break
666 else:
667 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
668 vnf_index, vld["id"]))
669
670 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
671 break
672
673 else:
674 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
675
676 def _get_ns_config_info(self, nsr_id):
677 """
678 Generates a mapping between vnf,vdu elements and the N2VC id
679 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
680 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
681 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
682 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
683 """
684 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
685 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
686 mapping = {}
687 ns_config_info = {"osm-config-mapping": mapping}
688 for vca in vca_deployed_list:
689 if not vca["member-vnf-index"]:
690 continue
691 if not vca["vdu_id"]:
692 mapping[vca["member-vnf-index"]] = vca["application"]
693 else:
694 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
695 vca["application"]
696 return ns_config_info
697
698 @staticmethod
699 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
700 """
701 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
702 primitives as verify-ssh-credentials, or config when needed
703 :param desc_primitive_list: information of the descriptor
704 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
705 this element contains a ssh public key
706 :return: The modified list. Can ba an empty list, but always a list
707 """
708 if desc_primitive_list:
709 primitive_list = desc_primitive_list.copy()
710 else:
711 primitive_list = []
712 # look for primitive config, and get the position. None if not present
713 config_position = None
714 for index, primitive in enumerate(primitive_list):
715 if primitive["name"] == "config":
716 config_position = index
717 break
718
719 # for NS, add always a config primitive if not present (bug 874)
720 if not vca_deployed["member-vnf-index"] and config_position is None:
721 primitive_list.insert(0, {"name": "config", "parameter": []})
722 config_position = 0
723 # for VNF/VDU add verify-ssh-credentials after config
724 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
725 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
726 return primitive_list
727
728 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
729 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
730
731 db_nsr_update = {}
732 RO_descriptor_number = 0 # number of descriptors created at RO
733 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
734 start_deploy = time()
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # Check for and optionally request placement optimization. Database will be updated if placement activated
742 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
743
744 # deploy RO
745
746 # get vnfds, instantiate at RO
747
748 for c_vnf in nsd.get("constituent-vnfd", ()):
749 member_vnf_index = c_vnf["member-vnf-index"]
750 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
751 vnfd_ref = vnfd["id"]
752 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
753 " RO".format(vnfd_ref, member_vnf_index)
754 # self.logger.debug(logging_text + step)
755 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
756 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
757 RO_descriptor_number += 1
758
759 # look position at deployed.RO.vnfd if not present it will be appended at the end
760 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
761 if vnf_deployed["member-vnf-index"] == member_vnf_index:
762 break
763 else:
764 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
765 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
766
767 # look if present
768 RO_update = {"member-vnf-index": member_vnf_index}
769 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
770 if vnfd_list:
771 RO_update["id"] = vnfd_list[0]["uuid"]
772 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
773 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
774 else:
775 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
776 get("additionalParamsForVnf"), nsr_id)
777 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
778 RO_update["id"] = desc["uuid"]
779 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
780 vnfd_ref, member_vnf_index, desc["uuid"]))
781 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
782 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
784
785 # create nsd at RO
786 nsd_ref = nsd["id"]
787 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
788 # self.logger.debug(logging_text + step)
789
790 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
791 RO_descriptor_number += 1
792 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
793 if nsd_list:
794 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
795 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
796 nsd_ref, RO_nsd_uuid))
797 else:
798 nsd_RO = deepcopy(nsd)
799 nsd_RO["id"] = RO_osm_nsd_id
800 nsd_RO.pop("_id", None)
801 nsd_RO.pop("_admin", None)
802 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
803 member_vnf_index = c_vnf["member-vnf-index"]
804 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
805 for c_vld in nsd_RO.get("vld", ()):
806 for cp in c_vld.get("vnfd-connection-point-ref", ()):
807 member_vnf_index = cp["member-vnf-index-ref"]
808 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
809
810 desc = await self.RO.create("nsd", descriptor=nsd_RO)
811 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
812 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
813 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
814 self.update_db_2("nsrs", nsr_id, db_nsr_update)
815
816 # Crate ns at RO
817 # if present use it unless in error status
818 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
819 if RO_nsr_id:
820 try:
821 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
822 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
823 desc = await self.RO.show("ns", RO_nsr_id)
824
825 except ROclient.ROClientException as e:
826 if e.http_code != HTTPStatus.NOT_FOUND:
827 raise
828 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
829 if RO_nsr_id:
830 ns_status, ns_status_info = self.RO.check_ns_status(desc)
831 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
832 if ns_status == "ERROR":
833 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
834 .format(RO_nsr_id)
835 self.logger.debug(logging_text + step)
836 await self.RO.delete("ns", RO_nsr_id)
837 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
838 if not RO_nsr_id:
839 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
840 # self.logger.debug(logging_text + step)
841
842 # check if VIM is creating and wait look if previous tasks in process
843 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
844 if task_dependency:
845 step = "Waiting for related tasks to be completed: {}".format(task_name)
846 self.logger.debug(logging_text + step)
847 await asyncio.wait(task_dependency, timeout=3600)
848 if ns_params.get("vnf"):
849 for vnf in ns_params["vnf"]:
850 if "vimAccountId" in vnf:
851 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
852 vnf["vimAccountId"])
853 if task_dependency:
854 step = "Waiting for related tasks to be completed: {}".format(task_name)
855 self.logger.debug(logging_text + step)
856 await asyncio.wait(task_dependency, timeout=3600)
857
858 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
859
860 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
861
862 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
863 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
864 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
865 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
866 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
867 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
868 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
870
871 # wait until NS is ready
872 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
873 detailed_status_old = None
874 self.logger.debug(logging_text + step)
875
876 old_desc = None
877 while time() <= start_deploy + timeout_ns_deploy:
878 desc = await self.RO.show("ns", RO_nsr_id)
879
880 # deploymentStatus
881 if desc != old_desc:
882 # desc has changed => update db
883 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
884 old_desc = desc
885
886 ns_status, ns_status_info = self.RO.check_ns_status(desc)
887 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
888 if ns_status == "ERROR":
889 raise ROclient.ROClientException(ns_status_info)
890 elif ns_status == "BUILD":
891 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
892 elif ns_status == "ACTIVE":
893 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
894 try:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
968 vdur = db_vnfr["vdur"][0]
969 if not vdur:
970 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
971 vdu_index))
972
973 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
974 ip_address = vdur.get("ip-address")
975 if not ip_address:
976 continue
977 target_vdu_id = vdur["vdu-id-ref"]
978 elif vdur.get("status") == "ERROR":
979 raise LcmException("Cannot inject ssh-key because target VM is in error state")
980
981 if not target_vdu_id:
982 continue
983
984 # inject public key into machine
985 if pub_key and user:
986 # self.logger.debug(logging_text + "Inserting RO key")
987 if vdur.get("pdu-type"):
988 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
989 return ip_address
990 try:
991 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
992 result_dict = await self.RO.create_action(
993 item="ns",
994 item_id_name=ro_nsr_id,
995 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
996 )
997 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
998 if not result_dict or not isinstance(result_dict, dict):
999 raise LcmException("Unknown response from RO when injecting key")
1000 for result in result_dict.values():
1001 if result.get("vim_result") == 200:
1002 break
1003 else:
1004 raise ROclient.ROClientException("error injecting key: {}".format(
1005 result.get("description")))
1006 break
1007 except ROclient.ROClientException as e:
1008 if not nb_tries:
1009 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1010 format(e, 20*10))
1011 nb_tries += 1
1012 if nb_tries >= 20:
1013 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1014 else:
1015 break
1016
1017 return ip_address
1018
1019 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1020 """
1021 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1022 """
1023 my_vca = vca_deployed_list[vca_index]
1024 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1025 # vdu or kdu: no dependencies
1026 return
1027 timeout = 300
1028 while timeout >= 0:
1029 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1030 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1031 configuration_status_list = db_nsr["configurationStatus"]
1032 for index, vca_deployed in enumerate(configuration_status_list):
1033 if index == vca_index:
1034 # myself
1035 continue
1036 if not my_vca.get("member-vnf-index") or \
1037 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1038 internal_status = configuration_status_list[index].get("status")
1039 if internal_status == 'READY':
1040 continue
1041 elif internal_status == 'BROKEN':
1042 raise LcmException("Configuration aborted because dependent charm/s has failed")
1043 else:
1044 break
1045 else:
1046 # no dependencies, return
1047 return
1048 await asyncio.sleep(10)
1049 timeout -= 1
1050
1051 raise LcmException("Configuration aborted because dependent charm/s timeout")
1052
1053 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1054 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1055 nsr_id = db_nsr["_id"]
1056 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1057 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1058 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1059 db_dict = {
1060 'collection': 'nsrs',
1061 'filter': {'_id': nsr_id},
1062 'path': db_update_entry
1063 }
1064 step = ""
1065 try:
1066
1067 element_type = 'NS'
1068 element_under_configuration = nsr_id
1069
1070 vnfr_id = None
1071 if db_vnfr:
1072 vnfr_id = db_vnfr["_id"]
1073
1074 namespace = "{nsi}.{ns}".format(
1075 nsi=nsi_id if nsi_id else "",
1076 ns=nsr_id)
1077
1078 if vnfr_id:
1079 element_type = 'VNF'
1080 element_under_configuration = vnfr_id
1081 namespace += ".{}".format(vnfr_id)
1082 if vdu_id:
1083 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1084 element_type = 'VDU'
1085 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1086
1087 # Get artifact path
1088 self.fs.sync() # Sync from FSMongo
1089 artifact_path = "{}/{}/charms/{}".format(
1090 base_folder["folder"],
1091 base_folder["pkg-dir"],
1092 config_descriptor["juju"]["charm"]
1093 )
1094
1095 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1096 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1097 is_proxy_charm = False
1098
1099 # n2vc_redesign STEP 3.1
1100
1101 # find old ee_id if exists
1102 ee_id = vca_deployed.get("ee_id")
1103
1104 # create or register execution environment in VCA
1105 if is_proxy_charm:
1106
1107 self._write_configuration_status(
1108 nsr_id=nsr_id,
1109 vca_index=vca_index,
1110 status='CREATING',
1111 element_under_configuration=element_under_configuration,
1112 element_type=element_type
1113 )
1114
1115 step = "create execution environment"
1116 self.logger.debug(logging_text + step)
1117 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1118 reuse_ee_id=ee_id,
1119 db_dict=db_dict)
1120
1121 else:
1122 step = "Waiting to VM being up and getting IP address"
1123 self.logger.debug(logging_text + step)
1124 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1125 user=None, pub_key=None)
1126 credentials = {"hostname": rw_mgmt_ip}
1127 # get username
1128 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1129 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1130 # merged. Meanwhile let's get username from initial-config-primitive
1131 if not username and config_descriptor.get("initial-config-primitive"):
1132 for config_primitive in config_descriptor["initial-config-primitive"]:
1133 for param in config_primitive.get("parameter", ()):
1134 if param["name"] == "ssh-username":
1135 username = param["value"]
1136 break
1137 if not username:
1138 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1139 "'config-access.ssh-access.default-user'")
1140 credentials["username"] = username
1141 # n2vc_redesign STEP 3.2
1142
1143 self._write_configuration_status(
1144 nsr_id=nsr_id,
1145 vca_index=vca_index,
1146 status='REGISTERING',
1147 element_under_configuration=element_under_configuration,
1148 element_type=element_type
1149 )
1150
1151 step = "register execution environment {}".format(credentials)
1152 self.logger.debug(logging_text + step)
1153 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1154 db_dict=db_dict)
1155
1156 # for compatibility with MON/POL modules, the need model and application name at database
1157 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1158 ee_id_parts = ee_id.split('.')
1159 model_name = ee_id_parts[0]
1160 application_name = ee_id_parts[1]
1161 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1162 db_update_entry + "application": application_name,
1163 db_update_entry + "ee_id": ee_id})
1164
1165 # n2vc_redesign STEP 3.3
1166
1167 step = "Install configuration Software"
1168
1169 self._write_configuration_status(
1170 nsr_id=nsr_id,
1171 vca_index=vca_index,
1172 status='INSTALLING SW',
1173 element_under_configuration=element_under_configuration,
1174 element_type=element_type
1175 )
1176
1177 # TODO check if already done
1178 self.logger.debug(logging_text + step)
1179 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1180
1181 # write in db flag of configuration_sw already installed
1182 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1183
1184 # add relations for this VCA (wait for other peers related with this VCA)
1185 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1186
1187 # if SSH access is required, then get execution environment SSH public
1188 if is_proxy_charm: # if native charm we have waited already to VM be UP
1189 pub_key = None
1190 user = None
1191 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1192 # Needed to inject a ssh key
1193 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1194 step = "Install configuration Software, getting public ssh key"
1195 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1196
1197 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1198 else:
1199 step = "Waiting to VM being up and getting IP address"
1200 self.logger.debug(logging_text + step)
1201
1202 # n2vc_redesign STEP 5.1
1203 # wait for RO (ip-address) Insert pub_key into VM
1204 if vnfr_id:
1205 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1206 user=user, pub_key=pub_key)
1207 else:
1208 rw_mgmt_ip = None # This is for a NS configuration
1209
1210 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1211
1212 # store rw_mgmt_ip in deploy params for later replacement
1213 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1214
1215 # n2vc_redesign STEP 6 Execute initial config primitive
1216 step = 'execute initial config primitive'
1217 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1218
1219 # sort initial config primitives by 'seq'
1220 if initial_config_primitive_list:
1221 try:
1222 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1223 except Exception as e:
1224 self.logger.error(logging_text + step + ": " + str(e))
1225 else:
1226 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1227
1228 # add config if not present for NS charm
1229 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1230 vca_deployed)
1231
1232 # wait for dependent primitives execution (NS -> VNF -> VDU)
1233 if initial_config_primitive_list:
1234 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1235
1236 # stage, in function of element type: vdu, kdu, vnf or ns
1237 my_vca = vca_deployed_list[vca_index]
1238 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1239 # VDU or KDU
1240 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1241 elif my_vca.get("member-vnf-index"):
1242 # VNF
1243 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1244 else:
1245 # NS
1246 stage = 'Stage 5/5: running Day-1 primitives for NS'
1247
1248 self._write_configuration_status(
1249 nsr_id=nsr_id,
1250 vca_index=vca_index,
1251 status='EXECUTING PRIMITIVE'
1252 )
1253
1254 self._write_op_status(
1255 op_id=nslcmop_id,
1256 stage=stage
1257 )
1258
1259 for initial_config_primitive in initial_config_primitive_list:
1260 # adding information on the vca_deployed if it is a NS execution environment
1261 if not vca_deployed["member-vnf-index"]:
1262 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1263 # TODO check if already done
1264 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1265
1266 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1267 self.logger.debug(logging_text + step)
1268 await self.n2vc.exec_primitive(
1269 ee_id=ee_id,
1270 primitive_name=initial_config_primitive["name"],
1271 params_dict=primitive_params_,
1272 db_dict=db_dict
1273 )
1274
1275 # TODO register in database that primitive is done
1276
1277 step = "instantiated at VCA"
1278 self.logger.debug(logging_text + step)
1279
1280 self._write_configuration_status(
1281 nsr_id=nsr_id,
1282 vca_index=vca_index,
1283 status='READY'
1284 )
1285
1286 except Exception as e: # TODO not use Exception but N2VC exception
1287 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1288 self._write_configuration_status(
1289 nsr_id=nsr_id,
1290 vca_index=vca_index,
1291 status='BROKEN'
1292 )
1293 raise Exception("{} {}".format(step, e)) from e
1294 # TODO raise N2VC exception with 'step' extra information
1295
1296 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1297 error_description: str = None):
1298 try:
1299 db_dict = dict()
1300 if ns_state:
1301 db_dict["nsState"] = ns_state
1302 db_dict["currentOperation"] = current_operation
1303 db_dict["currentOperationID"] = current_operation_id
1304 db_dict["errorDescription"] = error_description
1305 self.update_db_2("nsrs", nsr_id, db_dict)
1306 except Exception as e:
1307 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1308
1309 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1310 try:
1311 db_dict = dict()
1312 db_dict['queuePosition'] = queuePosition
1313 db_dict['stage'] = stage
1314 if error_message:
1315 db_dict['errorMessage'] = error_message
1316 self.update_db_2("nslcmops", op_id, db_dict)
1317 except Exception as e:
1318 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1319
1320 def _write_all_config_status(self, nsr_id: str, status: str):
1321 try:
1322 # nsrs record
1323 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1324 # configurationStatus
1325 config_status = db_nsr.get('configurationStatus')
1326 if config_status:
1327 # update status
1328 db_dict = dict()
1329 db_dict['configurationStatus'] = list()
1330 for c in config_status:
1331 c['status'] = status
1332 db_dict['configurationStatus'].append(c)
1333 self.update_db_2("nsrs", nsr_id, db_dict)
1334
1335 except Exception as e:
1336 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1337
1338 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1339 element_under_configuration: str = None, element_type: str = None):
1340
1341 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1342 # .format(vca_index, status))
1343
1344 try:
1345 db_path = 'configurationStatus.{}.'.format(vca_index)
1346 db_dict = dict()
1347 if status:
1348 db_dict[db_path + 'status'] = status
1349 if element_under_configuration:
1350 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1351 if element_type:
1352 db_dict[db_path + 'elementType'] = element_type
1353 self.update_db_2("nsrs", nsr_id, db_dict)
1354 except Exception as e:
1355 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1356 .format(status, nsr_id, vca_index, e))
1357
1358 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1359 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1360 if placement_engine == "PLA":
1361 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1362 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1363 db_poll_interval = 5
1364 wait = db_poll_interval * 4
1365 pla_result = None
1366 while not pla_result and wait >= 0:
1367 await asyncio.sleep(db_poll_interval)
1368 wait -= db_poll_interval
1369 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1370 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1371
1372 if not pla_result:
1373 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1374
1375 for pla_vnf in pla_result['vnf']:
1376 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1377 if not pla_vnf.get('vimAccountId') or not vnfr:
1378 continue
1379 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1380 return
1381
1382 def update_nsrs_with_pla_result(self, params):
1383 try:
1384 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1385 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1386 except Exception as e:
1387 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1388
1389 async def instantiate(self, nsr_id, nslcmop_id):
1390 """
1391
1392 :param nsr_id: ns instance to deploy
1393 :param nslcmop_id: operation to run
1394 :return:
1395 """
1396
1397 # Try to lock HA task here
1398 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1399 if not task_is_locked_by_me:
1400 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1401 return
1402
1403 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1404 self.logger.debug(logging_text + "Enter")
1405
1406 # get all needed from database
1407
1408 # database nsrs record
1409 db_nsr = None
1410
1411 # database nslcmops record
1412 db_nslcmop = None
1413
1414 # update operation on nsrs
1415 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1416 "_admin.current-operation": nslcmop_id,
1417 "_admin.operation-type": "instantiate"}
1418 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1419
1420 # update operation on nslcmops
1421 db_nslcmop_update = {}
1422
1423 nslcmop_operation_state = None
1424 db_vnfrs = {} # vnf's info indexed by member-index
1425 # n2vc_info = {}
1426 task_instantiation_list = []
1427 task_instantiation_info = {} # from task to info text
1428 exc = None
1429 try:
1430 # wait for any previous tasks in process
1431 step = "Waiting for previous operations to terminate"
1432 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1433
1434 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1435
1436 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1437 self._write_ns_status(
1438 nsr_id=nsr_id,
1439 ns_state="BUILDING",
1440 current_operation="INSTANTIATING",
1441 current_operation_id=nslcmop_id
1442 )
1443
1444 # read from db: operation
1445 step = "Getting nslcmop={} from db".format(nslcmop_id)
1446 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1447 ns_params = db_nslcmop.get("operationParams")
1448 if ns_params and ns_params.get("timeout_ns_deploy"):
1449 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1450 else:
1451 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1452
1453 # read from db: ns
1454 step = "Getting nsr={} from db".format(nsr_id)
1455 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1456 # nsd is replicated into ns (no db read)
1457 nsd = db_nsr["nsd"]
1458 # nsr_name = db_nsr["name"] # TODO short-name??
1459
1460 # read from db: vnf's of this ns
1461 step = "Getting vnfrs from db"
1462 self.logger.debug(logging_text + step)
1463 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1464
1465 # read from db: vnfd's for every vnf
1466 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1467 db_vnfds = {} # every vnfd data indexed by vnf id
1468 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1469
1470 self._write_op_status(
1471 op_id=nslcmop_id,
1472 stage='Stage 1/5: preparation of the environment',
1473 queuePosition=0
1474 )
1475
1476 # for each vnf in ns, read vnfd
1477 for vnfr in db_vnfrs_list:
1478 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1479 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1480 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1481 # if we haven't this vnfd, read it from db
1482 if vnfd_id not in db_vnfds:
1483 # read from db
1484 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1485 self.logger.debug(logging_text + step)
1486 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1487
1488 # store vnfd
1489 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1490 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1491 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1492
1493 # Get or generates the _admin.deployed.VCA list
1494 vca_deployed_list = None
1495 if db_nsr["_admin"].get("deployed"):
1496 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1497 if vca_deployed_list is None:
1498 vca_deployed_list = []
1499 configuration_status_list = []
1500 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1501 db_nsr_update["configurationStatus"] = configuration_status_list
1502 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1503 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1504 elif isinstance(vca_deployed_list, dict):
1505 # maintain backward compatibility. Change a dict to list at database
1506 vca_deployed_list = list(vca_deployed_list.values())
1507 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1508 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1509
1510 db_nsr_update["detailed-status"] = "creating"
1511 db_nsr_update["operational-status"] = "init"
1512
1513 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1514 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1515 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1516
1517 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1518 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1519 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1520
1521 # n2vc_redesign STEP 2 Deploy Network Scenario
1522
1523 self._write_op_status(
1524 op_id=nslcmop_id,
1525 stage='Stage 2/5: deployment of VMs and execution environments'
1526 )
1527
1528 self.logger.debug(logging_text + "Before deploy_kdus")
1529 # Call to deploy_kdus in case exists the "vdu:kdu" param
1530 task_kdu = asyncio.ensure_future(
1531 self.deploy_kdus(
1532 logging_text=logging_text,
1533 nsr_id=nsr_id,
1534 db_nsr=db_nsr,
1535 db_vnfrs=db_vnfrs,
1536 db_vnfds=db_vnfds
1537 )
1538 )
1539 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1540 task_instantiation_info[task_kdu] = "Deploy KDUs"
1541 task_instantiation_list.append(task_kdu)
1542 # n2vc_redesign STEP 1 Get VCA public ssh-key
1543 # feature 1429. Add n2vc public key to needed VMs
1544 n2vc_key = self.n2vc.get_public_key()
1545 n2vc_key_list = [n2vc_key]
1546 if self.vca_config.get("public_key"):
1547 n2vc_key_list.append(self.vca_config["public_key"])
1548
1549 task_ro = asyncio.ensure_future(
1550 self.instantiate_RO(
1551 logging_text=logging_text,
1552 nsr_id=nsr_id,
1553 nsd=nsd,
1554 db_nsr=db_nsr,
1555 db_nslcmop=db_nslcmop,
1556 db_vnfrs=db_vnfrs,
1557 db_vnfds_ref=db_vnfds_ref,
1558 n2vc_key_list=n2vc_key_list
1559 )
1560 )
1561 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1562 task_instantiation_info[task_ro] = "Deploy at VIM"
1563 task_instantiation_list.append(task_ro)
1564
1565 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1566 step = "Deploying proxy and native charms"
1567 self.logger.debug(logging_text + step)
1568
1569 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1570 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1571 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1572 vnfd_id = c_vnf["vnfd-id-ref"]
1573 vnfd = db_vnfds_ref[vnfd_id]
1574 member_vnf_index = str(c_vnf["member-vnf-index"])
1575 db_vnfr = db_vnfrs[member_vnf_index]
1576 base_folder = vnfd["_admin"]["storage"]
1577 vdu_id = None
1578 vdu_index = 0
1579 vdu_name = None
1580 kdu_name = None
1581
1582 # Get additional parameters
1583 deploy_params = {}
1584 if db_vnfr.get("additionalParamsForVnf"):
1585 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1586
1587 descriptor_config = vnfd.get("vnf-configuration")
1588 if descriptor_config and descriptor_config.get("juju"):
1589 self._deploy_n2vc(
1590 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1591 db_nsr=db_nsr,
1592 db_vnfr=db_vnfr,
1593 nslcmop_id=nslcmop_id,
1594 nsr_id=nsr_id,
1595 nsi_id=nsi_id,
1596 vnfd_id=vnfd_id,
1597 vdu_id=vdu_id,
1598 kdu_name=kdu_name,
1599 member_vnf_index=member_vnf_index,
1600 vdu_index=vdu_index,
1601 vdu_name=vdu_name,
1602 deploy_params=deploy_params,
1603 descriptor_config=descriptor_config,
1604 base_folder=base_folder,
1605 task_instantiation_list=task_instantiation_list,
1606 task_instantiation_info=task_instantiation_info
1607 )
1608
1609 # Deploy charms for each VDU that supports one.
1610 for vdud in get_iterable(vnfd, 'vdu'):
1611 vdu_id = vdud["id"]
1612 descriptor_config = vdud.get('vdu-configuration')
1613 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1614 if vdur.get("additionalParams"):
1615 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1616 else:
1617 deploy_params_vdu = deploy_params
1618 if descriptor_config and descriptor_config.get("juju"):
1619 # look for vdu index in the db_vnfr["vdu"] section
1620 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1621 # if vdur["vdu-id-ref"] == vdu_id:
1622 # break
1623 # else:
1624 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1625 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1626 # vdu_name = vdur.get("name")
1627 vdu_name = None
1628 kdu_name = None
1629 for vdu_index in range(int(vdud.get("count", 1))):
1630 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1631 self._deploy_n2vc(
1632 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1633 member_vnf_index, vdu_id, vdu_index),
1634 db_nsr=db_nsr,
1635 db_vnfr=db_vnfr,
1636 nslcmop_id=nslcmop_id,
1637 nsr_id=nsr_id,
1638 nsi_id=nsi_id,
1639 vnfd_id=vnfd_id,
1640 vdu_id=vdu_id,
1641 kdu_name=kdu_name,
1642 member_vnf_index=member_vnf_index,
1643 vdu_index=vdu_index,
1644 vdu_name=vdu_name,
1645 deploy_params=deploy_params_vdu,
1646 descriptor_config=descriptor_config,
1647 base_folder=base_folder,
1648 task_instantiation_list=task_instantiation_list,
1649 task_instantiation_info=task_instantiation_info
1650 )
1651 for kdud in get_iterable(vnfd, 'kdu'):
1652 kdu_name = kdud["name"]
1653 descriptor_config = kdud.get('kdu-configuration')
1654 if descriptor_config and descriptor_config.get("juju"):
1655 vdu_id = None
1656 vdu_index = 0
1657 vdu_name = None
1658 # look for vdu index in the db_vnfr["vdu"] section
1659 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1660 # if vdur["vdu-id-ref"] == vdu_id:
1661 # break
1662 # else:
1663 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1664 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1665 # vdu_name = vdur.get("name")
1666 # vdu_name = None
1667
1668 self._deploy_n2vc(
1669 logging_text=logging_text,
1670 db_nsr=db_nsr,
1671 db_vnfr=db_vnfr,
1672 nslcmop_id=nslcmop_id,
1673 nsr_id=nsr_id,
1674 nsi_id=nsi_id,
1675 vnfd_id=vnfd_id,
1676 vdu_id=vdu_id,
1677 kdu_name=kdu_name,
1678 member_vnf_index=member_vnf_index,
1679 vdu_index=vdu_index,
1680 vdu_name=vdu_name,
1681 deploy_params=deploy_params,
1682 descriptor_config=descriptor_config,
1683 base_folder=base_folder,
1684 task_instantiation_list=task_instantiation_list,
1685 task_instantiation_info=task_instantiation_info
1686 )
1687
1688 # Check if this NS has a charm configuration
1689 descriptor_config = nsd.get("ns-configuration")
1690 if descriptor_config and descriptor_config.get("juju"):
1691 vnfd_id = None
1692 db_vnfr = None
1693 member_vnf_index = None
1694 vdu_id = None
1695 kdu_name = None
1696 vdu_index = 0
1697 vdu_name = None
1698
1699 # Get additional parameters
1700 deploy_params = {}
1701 if db_nsr.get("additionalParamsForNs"):
1702 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1703 base_folder = nsd["_admin"]["storage"]
1704 self._deploy_n2vc(
1705 logging_text=logging_text,
1706 db_nsr=db_nsr,
1707 db_vnfr=db_vnfr,
1708 nslcmop_id=nslcmop_id,
1709 nsr_id=nsr_id,
1710 nsi_id=nsi_id,
1711 vnfd_id=vnfd_id,
1712 vdu_id=vdu_id,
1713 kdu_name=kdu_name,
1714 member_vnf_index=member_vnf_index,
1715 vdu_index=vdu_index,
1716 vdu_name=vdu_name,
1717 deploy_params=deploy_params,
1718 descriptor_config=descriptor_config,
1719 base_folder=base_folder,
1720 task_instantiation_list=task_instantiation_list,
1721 task_instantiation_info=task_instantiation_info
1722 )
1723
1724 # Wait until all tasks of "task_instantiation_list" have been finished
1725
1726 error_text_list = []
1727
1728 # let's begin with all OK
1729 instantiated_ok = True
1730 # let's begin with RO 'running' status (later we can change it)
1731 db_nsr_update["operational-status"] = "running"
1732 # let's begin with VCA 'configured' status (later we can change it)
1733 db_nsr_update["config-status"] = "configured"
1734
1735 step = "Waiting for tasks to be finished"
1736 if task_instantiation_list:
1737 # wait for all tasks completion
1738 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1739
1740 for task in pending:
1741 instantiated_ok = False
1742 if task in (task_ro, task_kdu):
1743 # RO or KDU task is pending
1744 db_nsr_update["operational-status"] = "failed"
1745 else:
1746 # A N2VC task is pending
1747 db_nsr_update["config-status"] = "failed"
1748 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1749 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1750 for task in done:
1751 if task.cancelled():
1752 instantiated_ok = False
1753 if task in (task_ro, task_kdu):
1754 # RO or KDU task was cancelled
1755 db_nsr_update["operational-status"] = "failed"
1756 else:
1757 # A N2VC was cancelled
1758 db_nsr_update["config-status"] = "failed"
1759 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1760 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1761 else:
1762 exc = task.exception()
1763 if exc:
1764 instantiated_ok = False
1765 if task in (task_ro, task_kdu):
1766 # RO or KDU task raised an exception
1767 db_nsr_update["operational-status"] = "failed"
1768 else:
1769 # A N2VC task raised an exception
1770 db_nsr_update["config-status"] = "failed"
1771 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1772
1773 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1774 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1775 else:
1776 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1777 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1778 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1779 else:
1780 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1781
1782 if error_text_list:
1783 error_text = "\n".join(error_text_list)
1784 db_nsr_update["detailed-status"] = error_text
1785 db_nslcmop_update["detailed-status"] = error_text
1786 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1787 db_nslcmop_update["statusEnteredTime"] = time()
1788 else:
1789 # all is done
1790 db_nsr_update["detailed-status"] = "done"
1791 db_nslcmop_update["detailed-status"] = "done"
1792 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1793 db_nslcmop_update["statusEnteredTime"] = time()
1794
1795 except (ROclient.ROClientException, DbException, LcmException) as e:
1796 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1797 exc = e
1798 except asyncio.CancelledError:
1799 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1800 exc = "Operation was cancelled"
1801 except Exception as e:
1802 exc = traceback.format_exc()
1803 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1804 exc_info=True)
1805 finally:
1806 if exc:
1807 instantiated_ok = False
1808 if db_nsr:
1809 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1810 db_nsr_update["operational-status"] = "failed"
1811 db_nsr_update["config-status"] = "failed"
1812 if db_nslcmop:
1813 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1814 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1815 db_nslcmop_update["statusEnteredTime"] = time()
1816 try:
1817 if db_nsr:
1818 db_nsr_update["_admin.nslcmop"] = None
1819 db_nsr_update["_admin.current-operation"] = None
1820 db_nsr_update["_admin.operation-type"] = None
1821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1822
1823 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1824 ns_state = None
1825 error_description = None
1826 if instantiated_ok:
1827 ns_state = "READY"
1828 else:
1829 ns_state = "BROKEN"
1830 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1831
1832 self._write_ns_status(
1833 nsr_id=nsr_id,
1834 ns_state=ns_state,
1835 current_operation="IDLE",
1836 current_operation_id=None,
1837 error_description=error_description
1838 )
1839
1840 self._write_op_status(
1841 op_id=nslcmop_id,
1842 error_message=error_description
1843 )
1844
1845 if db_nslcmop_update:
1846 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1847
1848 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1849
1850 except DbException as e:
1851 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1852
1853 if nslcmop_operation_state:
1854 try:
1855 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1856 "operationState": nslcmop_operation_state},
1857 loop=self.loop)
1858 except Exception as e:
1859 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1860
1861 self.logger.debug(logging_text + "Exit")
1862 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1863
1864 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1865
1866 # steps:
1867 # 1. find all relations for this VCA
1868 # 2. wait for other peers related
1869 # 3. add relations
1870
1871 try:
1872
1873 # STEP 1: find all relations for this VCA
1874
1875 # read nsr record
1876 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1877
1878 # this VCA data
1879 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1880
1881 # read all ns-configuration relations
1882 ns_relations = list()
1883 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1884 if db_ns_relations:
1885 for r in db_ns_relations:
1886 # check if this VCA is in the relation
1887 if my_vca.get('member-vnf-index') in\
1888 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1889 ns_relations.append(r)
1890
1891 # read all vnf-configuration relations
1892 vnf_relations = list()
1893 db_vnfd_list = db_nsr.get('vnfd-id')
1894 if db_vnfd_list:
1895 for vnfd in db_vnfd_list:
1896 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1897 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1898 if db_vnf_relations:
1899 for r in db_vnf_relations:
1900 # check if this VCA is in the relation
1901 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1902 vnf_relations.append(r)
1903
1904 # if no relations, terminate
1905 if not ns_relations and not vnf_relations:
1906 self.logger.debug(logging_text + ' No relations')
1907 return True
1908
1909 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1910
1911 # add all relations
1912 start = time()
1913 while True:
1914 # check timeout
1915 now = time()
1916 if now - start >= timeout:
1917 self.logger.error(logging_text + ' : timeout adding relations')
1918 return False
1919
1920 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1921 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1922
1923 # for each defined NS relation, find the VCA's related
1924 for r in ns_relations:
1925 from_vca_ee_id = None
1926 to_vca_ee_id = None
1927 from_vca_endpoint = None
1928 to_vca_endpoint = None
1929 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1930 for vca in vca_list:
1931 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1932 and vca.get('config_sw_installed'):
1933 from_vca_ee_id = vca.get('ee_id')
1934 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1935 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1936 and vca.get('config_sw_installed'):
1937 to_vca_ee_id = vca.get('ee_id')
1938 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1939 if from_vca_ee_id and to_vca_ee_id:
1940 # add relation
1941 await self.n2vc.add_relation(
1942 ee_id_1=from_vca_ee_id,
1943 ee_id_2=to_vca_ee_id,
1944 endpoint_1=from_vca_endpoint,
1945 endpoint_2=to_vca_endpoint)
1946 # remove entry from relations list
1947 ns_relations.remove(r)
1948 else:
1949 # check failed peers
1950 try:
1951 vca_status_list = db_nsr.get('configurationStatus')
1952 if vca_status_list:
1953 for i in range(len(vca_list)):
1954 vca = vca_list[i]
1955 vca_status = vca_status_list[i]
1956 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1957 if vca_status.get('status') == 'BROKEN':
1958 # peer broken: remove relation from list
1959 ns_relations.remove(r)
1960 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1961 if vca_status.get('status') == 'BROKEN':
1962 # peer broken: remove relation from list
1963 ns_relations.remove(r)
1964 except Exception:
1965 # ignore
1966 pass
1967
1968 # for each defined VNF relation, find the VCA's related
1969 for r in vnf_relations:
1970 from_vca_ee_id = None
1971 to_vca_ee_id = None
1972 from_vca_endpoint = None
1973 to_vca_endpoint = None
1974 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1975 for vca in vca_list:
1976 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1977 from_vca_ee_id = vca.get('ee_id')
1978 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1979 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1980 to_vca_ee_id = vca.get('ee_id')
1981 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1982 if from_vca_ee_id and to_vca_ee_id:
1983 # add relation
1984 await self.n2vc.add_relation(
1985 ee_id_1=from_vca_ee_id,
1986 ee_id_2=to_vca_ee_id,
1987 endpoint_1=from_vca_endpoint,
1988 endpoint_2=to_vca_endpoint)
1989 # remove entry from relations list
1990 vnf_relations.remove(r)
1991 else:
1992 # check failed peers
1993 try:
1994 vca_status_list = db_nsr.get('configurationStatus')
1995 if vca_status_list:
1996 for i in range(len(vca_list)):
1997 vca = vca_list[i]
1998 vca_status = vca_status_list[i]
1999 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2000 if vca_status.get('status') == 'BROKEN':
2001 # peer broken: remove relation from list
2002 ns_relations.remove(r)
2003 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2004 if vca_status.get('status') == 'BROKEN':
2005 # peer broken: remove relation from list
2006 ns_relations.remove(r)
2007 except Exception:
2008 # ignore
2009 pass
2010
2011 # wait for next try
2012 await asyncio.sleep(5.0)
2013
2014 if not ns_relations and not vnf_relations:
2015 self.logger.debug('Relations added')
2016 break
2017
2018 return True
2019
2020 except Exception as e:
2021 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2022 return False
2023
2024 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
2025 # Launch kdus if present in the descriptor
2026
2027 deployed_ok = True
2028
2029 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2030
2031 def _get_cluster_id(cluster_id, cluster_type):
2032 nonlocal k8scluster_id_2_uuic
2033 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2034 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2035
2036 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2037 if not db_k8scluster:
2038 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2039 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2040 if not k8s_id:
2041 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2042 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2043 return k8s_id
2044
2045 logging_text += "Deploy kdus: "
2046 try:
2047 db_nsr_update = {"_admin.deployed.K8s": []}
2048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2049
2050 # Look for all vnfds
2051 pending_tasks = {}
2052 index = 0
2053 for vnfr_data in db_vnfrs.values():
2054 for kdur in get_iterable(vnfr_data, "kdur"):
2055 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2056 kdumodel = None
2057 k8sclustertype = None
2058 error_text = None
2059 cluster_uuid = None
2060 vnfd_id = vnfr_data.get('vnfd-id')
2061 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2062 if kdur.get("helm-chart"):
2063 kdumodel = kdur["helm-chart"]
2064 k8sclustertype = "chart"
2065 k8sclustertype_full = "helm-chart"
2066 elif kdur.get("juju-bundle"):
2067 kdumodel = kdur["juju-bundle"]
2068 k8sclustertype = "juju"
2069 k8sclustertype_full = "juju-bundle"
2070 else:
2071 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
2072 " running"
2073 # check if kdumodel is a file and exists
2074 try:
2075 # path format: /vnfdid/pkkdir/kdumodel
2076 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
2077 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2078 kdumodel = self.fs.path + filename
2079 except Exception:
2080 # it is not a file
2081 pass
2082
2083 step = "Prepare instantiate KDU {} in k8s cluster {}".format(
2084 kdur["kdu-name"], kdur["k8s-cluster"]["id"])
2085
2086 try:
2087 if not error_text:
2088 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
2089
2090 updated_cluster_list = []
2091 if k8sclustertype == "chart" and cluster_uuid not in updated_cluster_list:
2092 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2093 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2094 if del_repo_list or added_repo_dict:
2095 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2096 updated = {'_admin.helm_charts_added.' +
2097 item: name for item, name in added_repo_dict.items()}
2098 self.logger.debug(logging_text + "repos synchronized, to_delete: {}, to_add: {}".
2099 format(del_repo_list, added_repo_dict))
2100 self.db.set_one("k8sclusters", {"_id": kdur["k8s-cluster"]["id"]},
2101 updated, unset=unset)
2102 updated_cluster_list.append(cluster_uuid)
2103
2104 except LcmException as e:
2105 error_text = str(e)
2106 deployed_ok = False
2107
2108 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
2109
2110 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2111 "k8scluster-type": k8sclustertype,
2112 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2113 if error_text:
2114 k8s_instace_info["detailed-status"] = error_text
2115 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2116 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2117 if error_text:
2118 continue
2119
2120 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
2121 "{}".format(index)}
2122
2123 if k8sclustertype == "chart":
2124 task = asyncio.ensure_future(
2125 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2126 params=desc_params, db_dict=db_dict, timeout=3600)
2127 )
2128 else:
2129 task = asyncio.ensure_future(
2130 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2131 atomic=True, params=desc_params,
2132 db_dict=db_dict, timeout=600,
2133 kdu_name=kdur["kdu-name"])
2134 )
2135
2136 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
2137 index += 1
2138
2139 if pending_tasks:
2140 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2141 pending_list = list(pending_tasks.keys())
2142 while pending_list:
2143 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
2144 return_when=asyncio.FIRST_COMPLETED)
2145 if not done_list: # timeout
2146 for task in pending_list:
2147 db_nsr_update[pending_tasks[task] + "detailed-status"] = "Timeout"
2148 deployed_ok = False
2149 break
2150 for task in done_list:
2151 exc = task.exception()
2152 if exc:
2153 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
2154 deployed_ok = False
2155 else:
2156 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
2157
2158 if not deployed_ok:
2159 raise LcmException('Cannot deploy KDUs')
2160
2161 except Exception as e:
2162 msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
2163 self.logger.error(msg)
2164 raise LcmException(msg)
2165 finally:
2166 if db_nsr_update:
2167 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2168
2169 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2170 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2171 base_folder, task_instantiation_list, task_instantiation_info):
2172 # launch instantiate_N2VC in a asyncio task and register task object
2173 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2174 # if not found, create one entry and update database
2175
2176 # fill db_nsr._admin.deployed.VCA.<index>
2177 vca_index = -1
2178 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2179 if not vca_deployed:
2180 continue
2181 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2182 vca_deployed.get("vdu_id") == vdu_id and \
2183 vca_deployed.get("kdu_name") == kdu_name and \
2184 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2185 break
2186 else:
2187 # not found, create one.
2188 vca_deployed = {
2189 "member-vnf-index": member_vnf_index,
2190 "vdu_id": vdu_id,
2191 "kdu_name": kdu_name,
2192 "vdu_count_index": vdu_index,
2193 "operational-status": "init", # TODO revise
2194 "detailed-status": "", # TODO revise
2195 "step": "initial-deploy", # TODO revise
2196 "vnfd_id": vnfd_id,
2197 "vdu_name": vdu_name,
2198 }
2199 vca_index += 1
2200
2201 # create VCA and configurationStatus in db
2202 db_dict = {
2203 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2204 "configurationStatus.{}".format(vca_index): dict()
2205 }
2206 self.update_db_2("nsrs", nsr_id, db_dict)
2207
2208 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2209
2210 # Launch task
2211 task_n2vc = asyncio.ensure_future(
2212 self.instantiate_N2VC(
2213 logging_text=logging_text,
2214 vca_index=vca_index,
2215 nsi_id=nsi_id,
2216 db_nsr=db_nsr,
2217 db_vnfr=db_vnfr,
2218 vdu_id=vdu_id,
2219 kdu_name=kdu_name,
2220 vdu_index=vdu_index,
2221 deploy_params=deploy_params,
2222 config_descriptor=descriptor_config,
2223 base_folder=base_folder,
2224 nslcmop_id=nslcmop_id
2225 )
2226 )
2227 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2228 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2229 task_instantiation_list.append(task_n2vc)
2230
2231 # Check if this VNFD has a configured terminate action
2232 def _has_terminate_config_primitive(self, vnfd):
2233 vnf_config = vnfd.get("vnf-configuration")
2234 if vnf_config and vnf_config.get("terminate-config-primitive"):
2235 return True
2236 else:
2237 return False
2238
2239 @staticmethod
2240 def _get_terminate_config_primitive_seq_list(vnfd):
2241 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2242 # No need to check for existing primitive twice, already done before
2243 vnf_config = vnfd.get("vnf-configuration")
2244 seq_list = vnf_config.get("terminate-config-primitive")
2245 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2246 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2247 return seq_list_sorted
2248
2249 @staticmethod
2250 def _create_nslcmop(nsr_id, operation, params):
2251 """
2252 Creates a ns-lcm-opp content to be stored at database.
2253 :param nsr_id: internal id of the instance
2254 :param operation: instantiate, terminate, scale, action, ...
2255 :param params: user parameters for the operation
2256 :return: dictionary following SOL005 format
2257 """
2258 # Raise exception if invalid arguments
2259 if not (nsr_id and operation and params):
2260 raise LcmException(
2261 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2262 now = time()
2263 _id = str(uuid4())
2264 nslcmop = {
2265 "id": _id,
2266 "_id": _id,
2267 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2268 "operationState": "PROCESSING",
2269 "statusEnteredTime": now,
2270 "nsInstanceId": nsr_id,
2271 "lcmOperationType": operation,
2272 "startTime": now,
2273 "isAutomaticInvocation": False,
2274 "operationParams": params,
2275 "isCancelPending": False,
2276 "links": {
2277 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2278 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2279 }
2280 }
2281 return nslcmop
2282
2283 def _format_additional_params(self, params):
2284 params = params or {}
2285 for key, value in params.items():
2286 if str(value).startswith("!!yaml "):
2287 params[key] = yaml.safe_load(value[7:])
2288 return params
2289
2290 def _get_terminate_primitive_params(self, seq, vnf_index):
2291 primitive = seq.get('name')
2292 primitive_params = {}
2293 params = {
2294 "member_vnf_index": vnf_index,
2295 "primitive": primitive,
2296 "primitive_params": primitive_params,
2297 }
2298 desc_params = {}
2299 return self._map_primitive_params(seq, params, desc_params)
2300
2301 # sub-operations
2302
2303 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2304 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2305 if (op.get('operationState') == 'COMPLETED'):
2306 # b. Skip sub-operation
2307 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2308 return self.SUBOPERATION_STATUS_SKIP
2309 else:
2310 # c. Reintent executing sub-operation
2311 # The sub-operation exists, and operationState != 'COMPLETED'
2312 # Update operationState = 'PROCESSING' to indicate a reintent.
2313 operationState = 'PROCESSING'
2314 detailed_status = 'In progress'
2315 self._update_suboperation_status(
2316 db_nslcmop, op_index, operationState, detailed_status)
2317 # Return the sub-operation index
2318 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2319 # with arguments extracted from the sub-operation
2320 return op_index
2321
2322 # Find a sub-operation where all keys in a matching dictionary must match
2323 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2324 def _find_suboperation(self, db_nslcmop, match):
2325 if (db_nslcmop and match):
2326 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2327 for i, op in enumerate(op_list):
2328 if all(op.get(k) == match[k] for k in match):
2329 return i
2330 return self.SUBOPERATION_STATUS_NOT_FOUND
2331
2332 # Update status for a sub-operation given its index
2333 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2334 # Update DB for HA tasks
2335 q_filter = {'_id': db_nslcmop['_id']}
2336 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2337 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2338 self.db.set_one("nslcmops",
2339 q_filter=q_filter,
2340 update_dict=update_dict,
2341 fail_on_empty=False)
2342
2343 # Add sub-operation, return the index of the added sub-operation
2344 # Optionally, set operationState, detailed-status, and operationType
2345 # Status and type are currently set for 'scale' sub-operations:
2346 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2347 # 'detailed-status' : status message
2348 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2349 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2350 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2351 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2352 RO_nsr_id=None, RO_scaling_info=None):
2353 if not (db_nslcmop):
2354 return self.SUBOPERATION_STATUS_NOT_FOUND
2355 # Get the "_admin.operations" list, if it exists
2356 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2357 op_list = db_nslcmop_admin.get('operations')
2358 # Create or append to the "_admin.operations" list
2359 new_op = {'member_vnf_index': vnf_index,
2360 'vdu_id': vdu_id,
2361 'vdu_count_index': vdu_count_index,
2362 'primitive': primitive,
2363 'primitive_params': mapped_primitive_params}
2364 if operationState:
2365 new_op['operationState'] = operationState
2366 if detailed_status:
2367 new_op['detailed-status'] = detailed_status
2368 if operationType:
2369 new_op['lcmOperationType'] = operationType
2370 if RO_nsr_id:
2371 new_op['RO_nsr_id'] = RO_nsr_id
2372 if RO_scaling_info:
2373 new_op['RO_scaling_info'] = RO_scaling_info
2374 if not op_list:
2375 # No existing operations, create key 'operations' with current operation as first list element
2376 db_nslcmop_admin.update({'operations': [new_op]})
2377 op_list = db_nslcmop_admin.get('operations')
2378 else:
2379 # Existing operations, append operation to list
2380 op_list.append(new_op)
2381
2382 db_nslcmop_update = {'_admin.operations': op_list}
2383 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2384 op_index = len(op_list) - 1
2385 return op_index
2386
2387 # Helper methods for scale() sub-operations
2388
2389 # pre-scale/post-scale:
2390 # Check for 3 different cases:
2391 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2392 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2393 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2394 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2395 operationType, RO_nsr_id=None, RO_scaling_info=None):
2396 # Find this sub-operation
2397 if (RO_nsr_id and RO_scaling_info):
2398 operationType = 'SCALE-RO'
2399 match = {
2400 'member_vnf_index': vnf_index,
2401 'RO_nsr_id': RO_nsr_id,
2402 'RO_scaling_info': RO_scaling_info,
2403 }
2404 else:
2405 match = {
2406 'member_vnf_index': vnf_index,
2407 'primitive': vnf_config_primitive,
2408 'primitive_params': primitive_params,
2409 'lcmOperationType': operationType
2410 }
2411 op_index = self._find_suboperation(db_nslcmop, match)
2412 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2413 # a. New sub-operation
2414 # The sub-operation does not exist, add it.
2415 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2416 # The following parameters are set to None for all kind of scaling:
2417 vdu_id = None
2418 vdu_count_index = None
2419 vdu_name = None
2420 if (RO_nsr_id and RO_scaling_info):
2421 vnf_config_primitive = None
2422 primitive_params = None
2423 else:
2424 RO_nsr_id = None
2425 RO_scaling_info = None
2426 # Initial status for sub-operation
2427 operationState = 'PROCESSING'
2428 detailed_status = 'In progress'
2429 # Add sub-operation for pre/post-scaling (zero or more operations)
2430 self._add_suboperation(db_nslcmop,
2431 vnf_index,
2432 vdu_id,
2433 vdu_count_index,
2434 vdu_name,
2435 vnf_config_primitive,
2436 primitive_params,
2437 operationState,
2438 detailed_status,
2439 operationType,
2440 RO_nsr_id,
2441 RO_scaling_info)
2442 return self.SUBOPERATION_STATUS_NEW
2443 else:
2444 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2445 # or op_index (operationState != 'COMPLETED')
2446 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2447
2448 # Function to return execution_environment id
2449
2450 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2451 for vca in vca_deployed_list:
2452 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2453 return vca["ee_id"]
2454
2455 # Helper methods for terminate()
2456
2457 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2458 """ Create a primitive with params from VNFD
2459 Called from terminate() before deleting instance
2460 Calls action() to execute the primitive """
2461 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2462 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2463 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2464 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2465 db_vnfds = {}
2466 # Loop over VNFRs
2467 for vnfr in db_vnfrs_list:
2468 vnfd_id = vnfr["vnfd-id"]
2469 vnf_index = vnfr["member-vnf-index-ref"]
2470 if vnfd_id not in db_vnfds:
2471 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2472 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2473 db_vnfds[vnfd_id] = vnfd
2474 vnfd = db_vnfds[vnfd_id]
2475 if not self._has_terminate_config_primitive(vnfd):
2476 continue
2477 # Get the primitive's sorted sequence list
2478 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2479 for seq in seq_list:
2480 # For each sequence in list, get primitive and call _ns_execute_primitive()
2481 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2482 vnf_index, seq.get("name"))
2483 self.logger.debug(logging_text + step)
2484 # Create the primitive for each sequence, i.e. "primitive": "touch"
2485 primitive = seq.get('name')
2486 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2487 # The following 3 parameters are currently set to None for 'terminate':
2488 # vdu_id, vdu_count_index, vdu_name
2489 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2490 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2491 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2492 # Add sub-operation
2493 self._add_suboperation(db_nslcmop,
2494 nslcmop_id,
2495 vnf_index,
2496 vdu_id,
2497 vdu_count_index,
2498 vdu_name,
2499 primitive,
2500 mapped_primitive_params)
2501 # Sub-operations: Call _ns_execute_primitive() instead of action()
2502 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2503 # nsr_deployed = db_nsr["_admin"]["deployed"]
2504
2505 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2506 # nsr_id, nslcmop_terminate_action_id)
2507 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2508 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2509 # if result not in result_ok:
2510 # raise LcmException(
2511 # "terminate_primitive_action for vnf_member_index={}",
2512 # " primitive={} fails with error {}".format(
2513 # vnf_index, seq.get("name"), result_detail))
2514
2515 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2516 try:
2517 await self.n2vc.exec_primitive(
2518 ee_id=ee_id,
2519 primitive_name=primitive,
2520 params_dict=mapped_primitive_params
2521 )
2522 except Exception as e:
2523 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2524 raise LcmException(
2525 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2526 .format(vnf_index, seq.get("name"), e),
2527 )
2528
2529 async def _delete_N2VC(self, nsr_id: str):
2530 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2531 namespace = "." + nsr_id
2532 await self.n2vc.delete_namespace(namespace=namespace)
2533 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2534
2535 async def terminate(self, nsr_id, nslcmop_id):
2536
2537 # Try to lock HA task here
2538 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2539 if not task_is_locked_by_me:
2540 return
2541
2542 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2543 self.logger.debug(logging_text + "Enter")
2544 db_nsr = None
2545 db_nslcmop = None
2546 exc = None
2547 failed_detail = [] # annotates all failed error messages
2548 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2549 "_admin.current-operation": nslcmop_id,
2550 "_admin.operation-type": "terminate"}
2551 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2552 db_nslcmop_update = {}
2553 nslcmop_operation_state = None
2554 autoremove = False # autoremove after terminated
2555 pending_tasks = []
2556 try:
2557 # wait for any previous tasks in process
2558 step = "Waiting for previous operations to terminate"
2559 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2560
2561 self._write_ns_status(
2562 nsr_id=nsr_id,
2563 ns_state="TERMINATING",
2564 current_operation="TERMINATING",
2565 current_operation_id=nslcmop_id
2566 )
2567 self._write_op_status(
2568 op_id=nslcmop_id,
2569 queuePosition=0
2570 )
2571
2572 step = "Getting nslcmop={} from db".format(nslcmop_id)
2573 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2574 step = "Getting nsr={} from db".format(nsr_id)
2575 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2576 # nsd = db_nsr["nsd"]
2577 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2578 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2579 return
2580 # #TODO check if VIM is creating and wait
2581 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2582 # Call internal terminate action
2583 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2584
2585 pending_tasks = []
2586
2587 db_nsr_update["operational-status"] = "terminating"
2588 db_nsr_update["config-status"] = "terminating"
2589
2590 # remove NS
2591 try:
2592 step = "delete execution environment"
2593 self.logger.debug(logging_text + step)
2594
2595 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2596 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2597
2598 pending_tasks.append(task_delete_ee)
2599 except Exception as e:
2600 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2601 self.logger.error(msg)
2602 failed_detail.append(msg)
2603
2604 try:
2605 # Delete from k8scluster
2606 step = "delete kdus"
2607 self.logger.debug(logging_text + step)
2608 # print(nsr_deployed)
2609 if nsr_deployed:
2610 for kdu in nsr_deployed.get("K8s", ()):
2611 kdu_instance = kdu.get("kdu-instance")
2612 if not kdu_instance:
2613 continue
2614 if kdu.get("k8scluster-type") == "chart":
2615 task_delete_kdu_instance = asyncio.ensure_future(
2616 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2617 kdu_instance=kdu_instance))
2618 elif kdu.get("k8scluster-type") == "juju":
2619 task_delete_kdu_instance = asyncio.ensure_future(
2620 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2621 kdu_instance=kdu_instance))
2622 else:
2623 self.error(logging_text + "Unknown k8s deployment type {}".
2624 format(kdu.get("k8scluster-type")))
2625 continue
2626 pending_tasks.append(task_delete_kdu_instance)
2627 except LcmException as e:
2628 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2629 self.logger.error(msg)
2630 failed_detail.append(msg)
2631
2632 # remove from RO
2633 RO_fail = False
2634
2635 # Delete ns
2636 RO_nsr_id = RO_delete_action = None
2637 if nsr_deployed and nsr_deployed.get("RO"):
2638 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2639 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2640 try:
2641 if RO_nsr_id:
2642 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2643 "Deleting ns from VIM"
2644 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2645 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2646 self.logger.debug(logging_text + step)
2647 desc = await self.RO.delete("ns", RO_nsr_id)
2648 RO_delete_action = desc["action_id"]
2649 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2650 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2651 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2652 if RO_delete_action:
2653 # wait until NS is deleted from VIM
2654 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2655 format(RO_nsr_id, RO_delete_action)
2656 detailed_status_old = None
2657 self.logger.debug(logging_text + step)
2658
2659 delete_timeout = 20 * 60 # 20 minutes
2660 while delete_timeout > 0:
2661 desc = await self.RO.show(
2662 "ns",
2663 item_id_name=RO_nsr_id,
2664 extra_item="action",
2665 extra_item_id=RO_delete_action)
2666
2667 # deploymentStatus
2668 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2669
2670 ns_status, ns_status_info = self.RO.check_action_status(desc)
2671 if ns_status == "ERROR":
2672 raise ROclient.ROClientException(ns_status_info)
2673 elif ns_status == "BUILD":
2674 detailed_status = step + "; {}".format(ns_status_info)
2675 elif ns_status == "ACTIVE":
2676 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2677 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2678 break
2679 else:
2680 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2681 if detailed_status != detailed_status_old:
2682 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2683 db_nsr_update["detailed-status"] = detailed_status
2684 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2685 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2686 await asyncio.sleep(5, loop=self.loop)
2687 delete_timeout -= 5
2688 else: # delete_timeout <= 0:
2689 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2690
2691 except ROclient.ROClientException as e:
2692 if e.http_code == 404: # not found
2693 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2694 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2695 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2696 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2697 elif e.http_code == 409: # conflict
2698 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2699 self.logger.debug(logging_text + failed_detail[-1])
2700 RO_fail = True
2701 else:
2702 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2703 self.logger.error(logging_text + failed_detail[-1])
2704 RO_fail = True
2705
2706 # Delete nsd
2707 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2708 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2709 try:
2710 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2711 "Deleting nsd from RO"
2712 await self.RO.delete("nsd", RO_nsd_id)
2713 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2714 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2715 except ROclient.ROClientException as e:
2716 if e.http_code == 404: # not found
2717 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2718 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2719 elif e.http_code == 409: # conflict
2720 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2721 self.logger.debug(logging_text + failed_detail[-1])
2722 RO_fail = True
2723 else:
2724 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2725 self.logger.error(logging_text + failed_detail[-1])
2726 RO_fail = True
2727
2728 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2729 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2730 if not vnf_deployed or not vnf_deployed["id"]:
2731 continue
2732 try:
2733 RO_vnfd_id = vnf_deployed["id"]
2734 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2735 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2736 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2737 await self.RO.delete("vnfd", RO_vnfd_id)
2738 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2739 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2740 except ROclient.ROClientException as e:
2741 if e.http_code == 404: # not found
2742 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2743 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2744 elif e.http_code == 409: # conflict
2745 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2746 self.logger.debug(logging_text + failed_detail[-1])
2747 else:
2748 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2749 self.logger.error(logging_text + failed_detail[-1])
2750
2751 if failed_detail:
2752 terminate_ok = False
2753 self.logger.error(logging_text + " ;".join(failed_detail))
2754 db_nsr_update["operational-status"] = "failed"
2755 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2756 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2757 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2758 db_nslcmop_update["statusEnteredTime"] = time()
2759 else:
2760 terminate_ok = True
2761 db_nsr_update["operational-status"] = "terminated"
2762 db_nsr_update["detailed-status"] = "Done"
2763 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2764 db_nslcmop_update["detailed-status"] = "Done"
2765 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2766 db_nslcmop_update["statusEnteredTime"] = time()
2767 if db_nslcmop["operationParams"].get("autoremove"):
2768 autoremove = True
2769
2770 except (ROclient.ROClientException, DbException, LcmException) as e:
2771 self.logger.error(logging_text + "Exit Exception {}".format(e))
2772 exc = e
2773 except asyncio.CancelledError:
2774 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2775 exc = "Operation was cancelled"
2776 except Exception as e:
2777 exc = traceback.format_exc()
2778 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2779 finally:
2780 if exc and db_nslcmop:
2781 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2782 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2783 db_nslcmop_update["statusEnteredTime"] = time()
2784 try:
2785 if db_nslcmop and db_nslcmop_update:
2786 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2787 if db_nsr:
2788 db_nsr_update["_admin.nslcmop"] = None
2789 db_nsr_update["_admin.current-operation"] = None
2790 db_nsr_update["_admin.operation-type"] = None
2791 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2792
2793 if terminate_ok:
2794 ns_state = "IDLE"
2795 error_description = None
2796 error_detail = None
2797 else:
2798 ns_state = "BROKEN"
2799 error_detail = "; ".join(failed_detail)
2800 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2801 .format(nslcmop_id, step, error_detail)
2802
2803 self._write_ns_status(
2804 nsr_id=nsr_id,
2805 ns_state=ns_state,
2806 current_operation="IDLE",
2807 current_operation_id=None,
2808 error_description=error_description
2809 )
2810
2811 self._write_op_status(
2812 op_id=nslcmop_id,
2813 error_message=error_description
2814 )
2815
2816 except DbException as e:
2817 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2818 if nslcmop_operation_state:
2819 try:
2820 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2821 "operationState": nslcmop_operation_state,
2822 "autoremove": autoremove},
2823 loop=self.loop)
2824 except Exception as e:
2825 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2826
2827 # wait for pending tasks
2828 done = None
2829 pending = None
2830 if pending_tasks:
2831 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2832 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2833 if not pending:
2834 self.logger.debug(logging_text + 'All tasks finished...')
2835 else:
2836 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2837
2838 self.logger.debug(logging_text + "Exit")
2839 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2840
2841 @staticmethod
2842 def _map_primitive_params(primitive_desc, params, instantiation_params):
2843 """
2844 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2845 The default-value is used. If it is between < > it look for a value at instantiation_params
2846 :param primitive_desc: portion of VNFD/NSD that describes primitive
2847 :param params: Params provided by user
2848 :param instantiation_params: Instantiation params provided by user
2849 :return: a dictionary with the calculated params
2850 """
2851 calculated_params = {}
2852 for parameter in primitive_desc.get("parameter", ()):
2853 param_name = parameter["name"]
2854 if param_name in params:
2855 calculated_params[param_name] = params[param_name]
2856 elif "default-value" in parameter or "value" in parameter:
2857 if "value" in parameter:
2858 calculated_params[param_name] = parameter["value"]
2859 else:
2860 calculated_params[param_name] = parameter["default-value"]
2861 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2862 and calculated_params[param_name].endswith(">"):
2863 if calculated_params[param_name][1:-1] in instantiation_params:
2864 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2865 else:
2866 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2867 format(calculated_params[param_name], primitive_desc["name"]))
2868 else:
2869 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2870 format(param_name, primitive_desc["name"]))
2871
2872 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2873 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2874 width=256)
2875 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2876 calculated_params[param_name] = calculated_params[param_name][7:]
2877
2878 # add always ns_config_info if primitive name is config
2879 if primitive_desc["name"] == "config":
2880 if "ns_config_info" in instantiation_params:
2881 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2882 return calculated_params
2883
2884 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2885 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2886
2887 # find vca_deployed record for this action
2888 try:
2889 for vca_deployed in db_deployed["VCA"]:
2890 if not vca_deployed:
2891 continue
2892 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2893 continue
2894 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2895 continue
2896 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2897 continue
2898 break
2899 else:
2900 # vca_deployed not found
2901 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2902 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2903
2904 # get ee_id
2905 ee_id = vca_deployed.get("ee_id")
2906 if not ee_id:
2907 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2908 "execution environment"
2909 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2910
2911 if primitive == "config":
2912 primitive_params = {"params": primitive_params}
2913
2914 while retries >= 0:
2915 try:
2916 output = await self.n2vc.exec_primitive(
2917 ee_id=ee_id,
2918 primitive_name=primitive,
2919 params_dict=primitive_params
2920 )
2921 # execution was OK
2922 break
2923 except Exception as e:
2924 retries -= 1
2925 if retries >= 0:
2926 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2927 # wait and retry
2928 await asyncio.sleep(retries_interval, loop=self.loop)
2929 else:
2930 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2931
2932 return output, 'OK'
2933
2934 except Exception as e:
2935 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2936
2937 async def action(self, nsr_id, nslcmop_id):
2938
2939 # Try to lock HA task here
2940 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2941 if not task_is_locked_by_me:
2942 return
2943
2944 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2945 self.logger.debug(logging_text + "Enter")
2946 # get all needed from database
2947 db_nsr = None
2948 db_nslcmop = None
2949 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2950 "_admin.current-operation": nslcmop_id,
2951 "_admin.operation-type": "action"}
2952 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2953 db_nslcmop_update = {}
2954 nslcmop_operation_state = None
2955 nslcmop_operation_state_detail = None
2956 exc = None
2957 try:
2958 # wait for any previous tasks in process
2959 step = "Waiting for previous operations to terminate"
2960 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2961
2962 self._write_ns_status(
2963 nsr_id=nsr_id,
2964 ns_state=None,
2965 current_operation="RUNNING ACTION",
2966 current_operation_id=nslcmop_id
2967 )
2968
2969 step = "Getting information from database"
2970 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2971 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2972
2973 nsr_deployed = db_nsr["_admin"].get("deployed")
2974 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2975 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2976 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2977 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2978 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2979
2980 if vnf_index:
2981 step = "Getting vnfr from database"
2982 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2983 step = "Getting vnfd from database"
2984 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2985 else:
2986 if db_nsr.get("nsd"):
2987 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2988 else:
2989 step = "Getting nsd from database"
2990 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2991
2992 # for backward compatibility
2993 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2994 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2995 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2996 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2997
2998 primitive = db_nslcmop["operationParams"]["primitive"]
2999 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3000
3001 # look for primitive
3002 config_primitive_desc = None
3003 if vdu_id:
3004 for vdu in get_iterable(db_vnfd, "vdu"):
3005 if vdu_id == vdu["id"]:
3006 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
3007 if config_primitive["name"] == primitive:
3008 config_primitive_desc = config_primitive
3009 break
3010 elif kdu_name:
3011 self.logger.debug(logging_text + "Checking actions in KDUs")
3012 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3013 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
3014 if primitive_params:
3015 desc_params.update(primitive_params)
3016 # TODO Check if we will need something at vnf level
3017 index = 0
3018 for kdu in get_iterable(nsr_deployed, "K8s"):
3019 if kdu_name == kdu["kdu-name"]:
3020 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3021 "path": "_admin.deployed.K8s.{}".format(index)}
3022 if primitive == "upgrade":
3023 if desc_params.get("kdu_model"):
3024 kdu_model = desc_params.get("kdu_model")
3025 del desc_params["kdu_model"]
3026 else:
3027 kdu_model = kdu.get("kdu-model")
3028 parts = kdu_model.split(sep=":")
3029 if len(parts) == 2:
3030 kdu_model = parts[0]
3031
3032 if kdu.get("k8scluster-type") == "chart":
3033 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3034 kdu_instance=kdu.get("kdu-instance"),
3035 atomic=True, kdu_model=kdu_model,
3036 params=desc_params, db_dict=db_dict,
3037 timeout=300)
3038 elif kdu.get("k8scluster-type") == "juju":
3039 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3040 kdu_instance=kdu.get("kdu-instance"),
3041 atomic=True, kdu_model=kdu_model,
3042 params=desc_params, db_dict=db_dict,
3043 timeout=300)
3044
3045 else:
3046 msg = "k8scluster-type not defined"
3047 raise LcmException(msg)
3048
3049 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3050 break
3051 elif primitive == "rollback":
3052 if kdu.get("k8scluster-type") == "chart":
3053 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3054 kdu_instance=kdu.get("kdu-instance"),
3055 db_dict=db_dict)
3056 elif kdu.get("k8scluster-type") == "juju":
3057 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3058 kdu_instance=kdu.get("kdu-instance"),
3059 db_dict=db_dict)
3060 else:
3061 msg = "k8scluster-type not defined"
3062 raise LcmException(msg)
3063 break
3064 elif primitive == "status":
3065 if kdu.get("k8scluster-type") == "chart":
3066 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3067 kdu_instance=kdu.get("kdu-instance"))
3068 elif kdu.get("k8scluster-type") == "juju":
3069 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3070 kdu_instance=kdu.get("kdu-instance"))
3071 else:
3072 msg = "k8scluster-type not defined"
3073 raise LcmException(msg)
3074 break
3075 index += 1
3076
3077 else:
3078 raise LcmException("KDU '{}' not found".format(kdu_name))
3079 if output:
3080 db_nslcmop_update["detailed-status"] = output
3081 db_nslcmop_update["operationState"] = 'COMPLETED'
3082 db_nslcmop_update["statusEnteredTime"] = time()
3083 else:
3084 db_nslcmop_update["detailed-status"] = ''
3085 db_nslcmop_update["operationState"] = 'FAILED'
3086 db_nslcmop_update["statusEnteredTime"] = time()
3087 return
3088 elif vnf_index:
3089 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3090 if config_primitive["name"] == primitive:
3091 config_primitive_desc = config_primitive
3092 break
3093 else:
3094 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3095 if config_primitive["name"] == primitive:
3096 config_primitive_desc = config_primitive
3097 break
3098
3099 if not config_primitive_desc:
3100 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3101 format(primitive))
3102
3103 desc_params = {}
3104 if vnf_index:
3105 if db_vnfr.get("additionalParamsForVnf"):
3106 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3107 if vdu_id:
3108 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3109 if vdur.get("additionalParams"):
3110 desc_params = self._format_additional_params(vdur["additionalParams"])
3111 else:
3112 if db_nsr.get("additionalParamsForNs"):
3113 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3114
3115 # TODO check if ns is in a proper status
3116 output, detail = await self._ns_execute_primitive(
3117 db_deployed=nsr_deployed,
3118 member_vnf_index=vnf_index,
3119 vdu_id=vdu_id,
3120 vdu_name=vdu_name,
3121 vdu_count_index=vdu_count_index,
3122 primitive=primitive,
3123 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3124
3125 detailed_status = output
3126 if detail == 'OK':
3127 result = 'COMPLETED'
3128 else:
3129 result = 'FAILED'
3130
3131 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3132 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3133 db_nslcmop_update["statusEnteredTime"] = time()
3134 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3135 return # database update is called inside finally
3136
3137 except (DbException, LcmException) as e:
3138 self.logger.error(logging_text + "Exit Exception {}".format(e))
3139 exc = e
3140 except asyncio.CancelledError:
3141 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3142 exc = "Operation was cancelled"
3143 except Exception as e:
3144 exc = traceback.format_exc()
3145 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3146 finally:
3147 if exc and db_nslcmop:
3148 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3149 "FAILED {}: {}".format(step, exc)
3150 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3151 db_nslcmop_update["statusEnteredTime"] = time()
3152 try:
3153 if db_nslcmop_update:
3154 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3155 if db_nsr:
3156 db_nsr_update["_admin.nslcmop"] = None
3157 db_nsr_update["_admin.operation-type"] = None
3158 db_nsr_update["_admin.nslcmop"] = None
3159 db_nsr_update["_admin.current-operation"] = None
3160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3161 self._write_ns_status(
3162 nsr_id=nsr_id,
3163 ns_state=None,
3164 current_operation="IDLE",
3165 current_operation_id=None
3166 )
3167 if exc:
3168 self._write_op_status(
3169 op_id=nslcmop_id,
3170 error_message=nslcmop_operation_state_detail
3171 )
3172 except DbException as e:
3173 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3174 self.logger.debug(logging_text + "Exit")
3175 if nslcmop_operation_state:
3176 try:
3177 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3178 "operationState": nslcmop_operation_state},
3179 loop=self.loop)
3180 except Exception as e:
3181 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3182 self.logger.debug(logging_text + "Exit")
3183 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3184 return nslcmop_operation_state, nslcmop_operation_state_detail
3185
3186 async def scale(self, nsr_id, nslcmop_id):
3187
3188 # Try to lock HA task here
3189 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3190 if not task_is_locked_by_me:
3191 return
3192
3193 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3194 self.logger.debug(logging_text + "Enter")
3195 # get all needed from database
3196 db_nsr = None
3197 db_nslcmop = None
3198 db_nslcmop_update = {}
3199 nslcmop_operation_state = None
3200 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3201 "_admin.current-operation": nslcmop_id,
3202 "_admin.operation-type": "scale"}
3203 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3204 exc = None
3205 # in case of error, indicates what part of scale was failed to put nsr at error status
3206 scale_process = None
3207 old_operational_status = ""
3208 old_config_status = ""
3209 vnfr_scaled = False
3210 try:
3211 # wait for any previous tasks in process
3212 step = "Waiting for previous operations to terminate"
3213 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3214
3215 self._write_ns_status(
3216 nsr_id=nsr_id,
3217 ns_state=None,
3218 current_operation="SCALING",
3219 current_operation_id=nslcmop_id
3220 )
3221
3222 step = "Getting nslcmop from database"
3223 self.logger.debug(step + " after having waited for previous tasks to be completed")
3224 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3225 step = "Getting nsr from database"
3226 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3227
3228 old_operational_status = db_nsr["operational-status"]
3229 old_config_status = db_nsr["config-status"]
3230 step = "Parsing scaling parameters"
3231 # self.logger.debug(step)
3232 db_nsr_update["operational-status"] = "scaling"
3233 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3234 nsr_deployed = db_nsr["_admin"].get("deployed")
3235
3236 #######
3237 nsr_deployed = db_nsr["_admin"].get("deployed")
3238 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3239 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3240 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3241 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3242 #######
3243
3244 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3245 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3246 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3247 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3248 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3249
3250 # for backward compatibility
3251 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3252 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3253 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3254 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3255
3256 step = "Getting vnfr from database"
3257 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3258 step = "Getting vnfd from database"
3259 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3260
3261 step = "Getting scaling-group-descriptor"
3262 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3263 if scaling_descriptor["name"] == scaling_group:
3264 break
3265 else:
3266 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3267 "at vnfd:scaling-group-descriptor".format(scaling_group))
3268
3269 # cooldown_time = 0
3270 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3271 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3272 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3273 # break
3274
3275 # TODO check if ns is in a proper status
3276 step = "Sending scale order to VIM"
3277 nb_scale_op = 0
3278 if not db_nsr["_admin"].get("scaling-group"):
3279 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3280 admin_scale_index = 0
3281 else:
3282 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3283 if admin_scale_info["name"] == scaling_group:
3284 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3285 break
3286 else: # not found, set index one plus last element and add new entry with the name
3287 admin_scale_index += 1
3288 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3289 RO_scaling_info = []
3290 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3291 if scaling_type == "SCALE_OUT":
3292 # count if max-instance-count is reached
3293 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3294 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3295 if nb_scale_op >= max_instance_count:
3296 raise LcmException("reached the limit of {} (max-instance-count) "
3297 "scaling-out operations for the "
3298 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3299
3300 nb_scale_op += 1
3301 vdu_scaling_info["scaling_direction"] = "OUT"
3302 vdu_scaling_info["vdu-create"] = {}
3303 for vdu_scale_info in scaling_descriptor["vdu"]:
3304 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3305 "type": "create", "count": vdu_scale_info.get("count", 1)})
3306 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3307
3308 elif scaling_type == "SCALE_IN":
3309 # count if min-instance-count is reached
3310 min_instance_count = 0
3311 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3312 min_instance_count = int(scaling_descriptor["min-instance-count"])
3313 if nb_scale_op <= min_instance_count:
3314 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3315 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3316 nb_scale_op -= 1
3317 vdu_scaling_info["scaling_direction"] = "IN"
3318 vdu_scaling_info["vdu-delete"] = {}
3319 for vdu_scale_info in scaling_descriptor["vdu"]:
3320 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3321 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3322 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3323
3324 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3325 vdu_create = vdu_scaling_info.get("vdu-create")
3326 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3327 if vdu_scaling_info["scaling_direction"] == "IN":
3328 for vdur in reversed(db_vnfr["vdur"]):
3329 if vdu_delete.get(vdur["vdu-id-ref"]):
3330 vdu_delete[vdur["vdu-id-ref"]] -= 1
3331 vdu_scaling_info["vdu"].append({
3332 "name": vdur["name"],
3333 "vdu_id": vdur["vdu-id-ref"],
3334 "interface": []
3335 })
3336 for interface in vdur["interfaces"]:
3337 vdu_scaling_info["vdu"][-1]["interface"].append({
3338 "name": interface["name"],
3339 "ip_address": interface["ip-address"],
3340 "mac_address": interface.get("mac-address"),
3341 })
3342 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3343
3344 # PRE-SCALE BEGIN
3345 step = "Executing pre-scale vnf-config-primitive"
3346 if scaling_descriptor.get("scaling-config-action"):
3347 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3348 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3349 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3350 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3351 step = db_nslcmop_update["detailed-status"] = \
3352 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3353
3354 # look for primitive
3355 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3356 if config_primitive["name"] == vnf_config_primitive:
3357 break
3358 else:
3359 raise LcmException(
3360 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3361 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3362 "primitive".format(scaling_group, config_primitive))
3363
3364 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3365 if db_vnfr.get("additionalParamsForVnf"):
3366 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3367
3368 scale_process = "VCA"
3369 db_nsr_update["config-status"] = "configuring pre-scaling"
3370 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3371
3372 # Pre-scale reintent check: Check if this sub-operation has been executed before
3373 op_index = self._check_or_add_scale_suboperation(
3374 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3375 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3376 # Skip sub-operation
3377 result = 'COMPLETED'
3378 result_detail = 'Done'
3379 self.logger.debug(logging_text +
3380 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3381 vnf_config_primitive, result, result_detail))
3382 else:
3383 if (op_index == self.SUBOPERATION_STATUS_NEW):
3384 # New sub-operation: Get index of this sub-operation
3385 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3386 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3387 format(vnf_config_primitive))
3388 else:
3389 # Reintent: Get registered params for this existing sub-operation
3390 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3391 vnf_index = op.get('member_vnf_index')
3392 vnf_config_primitive = op.get('primitive')
3393 primitive_params = op.get('primitive_params')
3394 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3395 format(vnf_config_primitive))
3396 # Execute the primitive, either with new (first-time) or registered (reintent) args
3397 result, result_detail = await self._ns_execute_primitive(
3398 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3399 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3400 vnf_config_primitive, result, result_detail))
3401 # Update operationState = COMPLETED | FAILED
3402 self._update_suboperation_status(
3403 db_nslcmop, op_index, result, result_detail)
3404
3405 if result == "FAILED":
3406 raise LcmException(result_detail)
3407 db_nsr_update["config-status"] = old_config_status
3408 scale_process = None
3409 # PRE-SCALE END
3410
3411 # SCALE RO - BEGIN
3412 # Should this block be skipped if 'RO_nsr_id' == None ?
3413 # if (RO_nsr_id and RO_scaling_info):
3414 if RO_scaling_info:
3415 scale_process = "RO"
3416 # Scale RO reintent check: Check if this sub-operation has been executed before
3417 op_index = self._check_or_add_scale_suboperation(
3418 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3419 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3420 # Skip sub-operation
3421 result = 'COMPLETED'
3422 result_detail = 'Done'
3423 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3424 result, result_detail))
3425 else:
3426 if (op_index == self.SUBOPERATION_STATUS_NEW):
3427 # New sub-operation: Get index of this sub-operation
3428 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3429 self.logger.debug(logging_text + "New sub-operation RO")
3430 else:
3431 # Reintent: Get registered params for this existing sub-operation
3432 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3433 RO_nsr_id = op.get('RO_nsr_id')
3434 RO_scaling_info = op.get('RO_scaling_info')
3435 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3436 vnf_config_primitive))
3437
3438 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3439 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3440 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3441 # wait until ready
3442 RO_nslcmop_id = RO_desc["instance_action_id"]
3443 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3444
3445 RO_task_done = False
3446 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3447 detailed_status_old = None
3448 self.logger.debug(logging_text + step)
3449
3450 deployment_timeout = 1 * 3600 # One hour
3451 while deployment_timeout > 0:
3452 if not RO_task_done:
3453 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3454 extra_item_id=RO_nslcmop_id)
3455
3456 # deploymentStatus
3457 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3458
3459 ns_status, ns_status_info = self.RO.check_action_status(desc)
3460 if ns_status == "ERROR":
3461 raise ROclient.ROClientException(ns_status_info)
3462 elif ns_status == "BUILD":
3463 detailed_status = step + "; {}".format(ns_status_info)
3464 elif ns_status == "ACTIVE":
3465 RO_task_done = True
3466 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3467 self.logger.debug(logging_text + step)
3468 else:
3469 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3470 else:
3471
3472 if ns_status == "ERROR":
3473 raise ROclient.ROClientException(ns_status_info)
3474 elif ns_status == "BUILD":
3475 detailed_status = step + "; {}".format(ns_status_info)
3476 elif ns_status == "ACTIVE":
3477 step = detailed_status = \
3478 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3479 if not vnfr_scaled:
3480 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3481 vnfr_scaled = True
3482 try:
3483 desc = await self.RO.show("ns", RO_nsr_id)
3484
3485 # deploymentStatus
3486 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3487
3488 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3489 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3490 break
3491 except LcmExceptionNoMgmtIP:
3492 pass
3493 else:
3494 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3495 if detailed_status != detailed_status_old:
3496 self._update_suboperation_status(
3497 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3498 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3499 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3500
3501 await asyncio.sleep(5, loop=self.loop)
3502 deployment_timeout -= 5
3503 if deployment_timeout <= 0:
3504 self._update_suboperation_status(
3505 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3506 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3507
3508 # update VDU_SCALING_INFO with the obtained ip_addresses
3509 if vdu_scaling_info["scaling_direction"] == "OUT":
3510 for vdur in reversed(db_vnfr["vdur"]):
3511 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3512 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3513 vdu_scaling_info["vdu"].append({
3514 "name": vdur["name"],
3515 "vdu_id": vdur["vdu-id-ref"],
3516 "interface": []
3517 })
3518 for interface in vdur["interfaces"]:
3519 vdu_scaling_info["vdu"][-1]["interface"].append({
3520 "name": interface["name"],
3521 "ip_address": interface["ip-address"],
3522 "mac_address": interface.get("mac-address"),
3523 })
3524 del vdu_scaling_info["vdu-create"]
3525
3526 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3527 # SCALE RO - END
3528
3529 scale_process = None
3530 if db_nsr_update:
3531 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3532
3533 # POST-SCALE BEGIN
3534 # execute primitive service POST-SCALING
3535 step = "Executing post-scale vnf-config-primitive"
3536 if scaling_descriptor.get("scaling-config-action"):
3537 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3538 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3539 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3540 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3541 step = db_nslcmop_update["detailed-status"] = \
3542 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3543
3544 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3545 if db_vnfr.get("additionalParamsForVnf"):
3546 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3547
3548 # look for primitive
3549 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3550 if config_primitive["name"] == vnf_config_primitive:
3551 break
3552 else:
3553 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3554 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3555 "match any vnf-configuration:config-primitive".format(scaling_group,
3556 config_primitive))
3557 scale_process = "VCA"
3558 db_nsr_update["config-status"] = "configuring post-scaling"
3559 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3560
3561 # Post-scale reintent check: Check if this sub-operation has been executed before
3562 op_index = self._check_or_add_scale_suboperation(
3563 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3564 if op_index == self.SUBOPERATION_STATUS_SKIP:
3565 # Skip sub-operation
3566 result = 'COMPLETED'
3567 result_detail = 'Done'
3568 self.logger.debug(logging_text +
3569 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3570 format(vnf_config_primitive, result, result_detail))
3571 else:
3572 if op_index == self.SUBOPERATION_STATUS_NEW:
3573 # New sub-operation: Get index of this sub-operation
3574 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3575 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3576 format(vnf_config_primitive))
3577 else:
3578 # Reintent: Get registered params for this existing sub-operation
3579 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3580 vnf_index = op.get('member_vnf_index')
3581 vnf_config_primitive = op.get('primitive')
3582 primitive_params = op.get('primitive_params')
3583 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3584 format(vnf_config_primitive))
3585 # Execute the primitive, either with new (first-time) or registered (reintent) args
3586 result, result_detail = await self._ns_execute_primitive(
3587 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3588 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3589 vnf_config_primitive, result, result_detail))
3590 # Update operationState = COMPLETED | FAILED
3591 self._update_suboperation_status(
3592 db_nslcmop, op_index, result, result_detail)
3593
3594 if result == "FAILED":
3595 raise LcmException(result_detail)
3596 db_nsr_update["config-status"] = old_config_status
3597 scale_process = None
3598 # POST-SCALE END
3599
3600 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3601 db_nslcmop_update["statusEnteredTime"] = time()
3602 db_nslcmop_update["detailed-status"] = "done"
3603 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3604 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3605 else old_operational_status
3606 db_nsr_update["config-status"] = old_config_status
3607 return
3608 except (ROclient.ROClientException, DbException, LcmException) as e:
3609 self.logger.error(logging_text + "Exit Exception {}".format(e))
3610 exc = e
3611 except asyncio.CancelledError:
3612 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3613 exc = "Operation was cancelled"
3614 except Exception as e:
3615 exc = traceback.format_exc()
3616 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3617 finally:
3618 self._write_ns_status(
3619 nsr_id=nsr_id,
3620 ns_state=None,
3621 current_operation="IDLE",
3622 current_operation_id=None
3623 )
3624 if exc:
3625 if db_nslcmop:
3626 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3627 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3628 db_nslcmop_update["statusEnteredTime"] = time()
3629 if db_nsr:
3630 db_nsr_update["operational-status"] = old_operational_status
3631 db_nsr_update["config-status"] = old_config_status
3632 db_nsr_update["detailed-status"] = ""
3633 db_nsr_update["_admin.nslcmop"] = None
3634 if scale_process:
3635 if "VCA" in scale_process:
3636 db_nsr_update["config-status"] = "failed"
3637 if "RO" in scale_process:
3638 db_nsr_update["operational-status"] = "failed"
3639 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3640 exc)
3641 try:
3642 if db_nslcmop and db_nslcmop_update:
3643 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3644 if db_nsr:
3645 db_nsr_update["_admin.current-operation"] = None
3646 db_nsr_update["_admin.operation-type"] = None
3647 db_nsr_update["_admin.nslcmop"] = None
3648 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3649
3650 self._write_ns_status(
3651 nsr_id=nsr_id,
3652 ns_state=None,
3653 current_operation="IDLE",
3654 current_operation_id=None
3655 )
3656
3657 except DbException as e:
3658 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3659 if nslcmop_operation_state:
3660 try:
3661 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3662 "operationState": nslcmop_operation_state},
3663 loop=self.loop)
3664 # if cooldown_time:
3665 # await asyncio.sleep(cooldown_time, loop=self.loop)
3666 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3667 except Exception as e:
3668 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3669 self.logger.debug(logging_text + "Exit")
3670 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")