Synchronize helm repos on ns instantiation instead of creation
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
619 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
620
621 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
622 vdur_RO_count_index = 0
623 if vdur.get("pdu-type"):
624 continue
625 for vdur_RO in get_iterable(vnf_RO, "vms"):
626 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
627 continue
628 if vdur["count-index"] != vdur_RO_count_index:
629 vdur_RO_count_index += 1
630 continue
631 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
632 if vdur_RO.get("ip_address"):
633 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
634 else:
635 vdur["ip-address"] = None
636 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
637 vdur["name"] = vdur_RO.get("vim_name")
638 vdur["status"] = vdur_RO.get("status")
639 vdur["status-detailed"] = vdur_RO.get("error_msg")
640 for ifacer in get_iterable(vdur, "interfaces"):
641 for interface_RO in get_iterable(vdur_RO, "interfaces"):
642 if ifacer["name"] == interface_RO.get("internal_name"):
643 ifacer["ip-address"] = interface_RO.get("ip_address")
644 ifacer["mac-address"] = interface_RO.get("mac_address")
645 break
646 else:
647 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
648 "from VIM info"
649 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
650 vnfr_update["vdur.{}".format(vdu_index)] = vdur
651 break
652 else:
653 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
654 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
655
656 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
657 for net_RO in get_iterable(nsr_desc_RO, "nets"):
658 if vld["id"] != net_RO.get("vnf_net_osm_id"):
659 continue
660 vld["vim-id"] = net_RO.get("vim_net_id")
661 vld["name"] = net_RO.get("vim_name")
662 vld["status"] = net_RO.get("status")
663 vld["status-detailed"] = net_RO.get("error_msg")
664 vnfr_update["vld.{}".format(vld_index)] = vld
665 break
666 else:
667 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
668 vnf_index, vld["id"]))
669
670 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
671 break
672
673 else:
674 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
675
676 def _get_ns_config_info(self, nsr_id):
677 """
678 Generates a mapping between vnf,vdu elements and the N2VC id
679 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
680 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
681 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
682 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
683 """
684 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
685 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
686 mapping = {}
687 ns_config_info = {"osm-config-mapping": mapping}
688 for vca in vca_deployed_list:
689 if not vca["member-vnf-index"]:
690 continue
691 if not vca["vdu_id"]:
692 mapping[vca["member-vnf-index"]] = vca["application"]
693 else:
694 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
695 vca["application"]
696 return ns_config_info
697
698 @staticmethod
699 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
700 """
701 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
702 primitives as verify-ssh-credentials, or config when needed
703 :param desc_primitive_list: information of the descriptor
704 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
705 this element contains a ssh public key
706 :return: The modified list. Can ba an empty list, but always a list
707 """
708 if desc_primitive_list:
709 primitive_list = desc_primitive_list.copy()
710 else:
711 primitive_list = []
712 # look for primitive config, and get the position. None if not present
713 config_position = None
714 for index, primitive in enumerate(primitive_list):
715 if primitive["name"] == "config":
716 config_position = index
717 break
718
719 # for NS, add always a config primitive if not present (bug 874)
720 if not vca_deployed["member-vnf-index"] and config_position is None:
721 primitive_list.insert(0, {"name": "config", "parameter": []})
722 config_position = 0
723 # for VNF/VDU add verify-ssh-credentials after config
724 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
725 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
726 return primitive_list
727
728 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
729 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
730
731 db_nsr_update = {}
732 RO_descriptor_number = 0 # number of descriptors created at RO
733 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
734 start_deploy = time()
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # Check for and optionally request placement optimization. Database will be updated if placement activated
742 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
743
744 # deploy RO
745
746 # get vnfds, instantiate at RO
747
748 for c_vnf in nsd.get("constituent-vnfd", ()):
749 member_vnf_index = c_vnf["member-vnf-index"]
750 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
751 vnfd_ref = vnfd["id"]
752 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
753 " RO".format(vnfd_ref, member_vnf_index)
754 # self.logger.debug(logging_text + step)
755 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
756 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
757 RO_descriptor_number += 1
758
759 # look position at deployed.RO.vnfd if not present it will be appended at the end
760 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
761 if vnf_deployed["member-vnf-index"] == member_vnf_index:
762 break
763 else:
764 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
765 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
766
767 # look if present
768 RO_update = {"member-vnf-index": member_vnf_index}
769 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
770 if vnfd_list:
771 RO_update["id"] = vnfd_list[0]["uuid"]
772 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
773 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
774 else:
775 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
776 get("additionalParamsForVnf"), nsr_id)
777 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
778 RO_update["id"] = desc["uuid"]
779 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
780 vnfd_ref, member_vnf_index, desc["uuid"]))
781 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
782 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
784
785 # create nsd at RO
786 nsd_ref = nsd["id"]
787 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
788 # self.logger.debug(logging_text + step)
789
790 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
791 RO_descriptor_number += 1
792 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
793 if nsd_list:
794 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
795 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
796 nsd_ref, RO_nsd_uuid))
797 else:
798 nsd_RO = deepcopy(nsd)
799 nsd_RO["id"] = RO_osm_nsd_id
800 nsd_RO.pop("_id", None)
801 nsd_RO.pop("_admin", None)
802 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
803 member_vnf_index = c_vnf["member-vnf-index"]
804 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
805 for c_vld in nsd_RO.get("vld", ()):
806 for cp in c_vld.get("vnfd-connection-point-ref", ()):
807 member_vnf_index = cp["member-vnf-index-ref"]
808 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
809
810 desc = await self.RO.create("nsd", descriptor=nsd_RO)
811 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
812 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
813 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
814 self.update_db_2("nsrs", nsr_id, db_nsr_update)
815
816 # Crate ns at RO
817 # if present use it unless in error status
818 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
819 if RO_nsr_id:
820 try:
821 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
822 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
823 desc = await self.RO.show("ns", RO_nsr_id)
824
825 except ROclient.ROClientException as e:
826 if e.http_code != HTTPStatus.NOT_FOUND:
827 raise
828 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
829 if RO_nsr_id:
830 ns_status, ns_status_info = self.RO.check_ns_status(desc)
831 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
832 if ns_status == "ERROR":
833 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
834 .format(RO_nsr_id)
835 self.logger.debug(logging_text + step)
836 await self.RO.delete("ns", RO_nsr_id)
837 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
838 if not RO_nsr_id:
839 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
840 # self.logger.debug(logging_text + step)
841
842 # check if VIM is creating and wait look if previous tasks in process
843 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
844 if task_dependency:
845 step = "Waiting for related tasks to be completed: {}".format(task_name)
846 self.logger.debug(logging_text + step)
847 await asyncio.wait(task_dependency, timeout=3600)
848 if ns_params.get("vnf"):
849 for vnf in ns_params["vnf"]:
850 if "vimAccountId" in vnf:
851 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
852 vnf["vimAccountId"])
853 if task_dependency:
854 step = "Waiting for related tasks to be completed: {}".format(task_name)
855 self.logger.debug(logging_text + step)
856 await asyncio.wait(task_dependency, timeout=3600)
857
858 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
859
860 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
861
862 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
863 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
864 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
865 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
866 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
867 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
868 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
870
871 # wait until NS is ready
872 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
873 detailed_status_old = None
874 self.logger.debug(logging_text + step)
875
876 old_desc = None
877 while time() <= start_deploy + timeout_ns_deploy:
878 desc = await self.RO.show("ns", RO_nsr_id)
879
880 # deploymentStatus
881 if desc != old_desc:
882 # desc has changed => update db
883 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
884 old_desc = desc
885
886 ns_status, ns_status_info = self.RO.check_ns_status(desc)
887 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
888 if ns_status == "ERROR":
889 raise ROclient.ROClientException(ns_status_info)
890 elif ns_status == "BUILD":
891 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
892 elif ns_status == "ACTIVE":
893 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
894 try:
895 self.ns_update_vnfr(db_vnfrs, desc)
896 break
897 except LcmExceptionNoMgmtIP:
898 pass
899 else:
900 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
901 if detailed_status != detailed_status_old:
902 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 await asyncio.sleep(5, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise ROclient.ROClientException("Timeout waiting ns to be ready")
907
908 step = "Updating NSR"
909 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
910
911 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
912 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
913 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
915 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
916
917 step = "Deployed at VIM"
918 self.logger.debug(logging_text + step)
919
920 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
921 """
922 Wait for ip addres at RO, and optionally, insert public key in virtual machine
923 :param logging_text: prefix use for logging
924 :param nsr_id:
925 :param vnfr_id:
926 :param vdu_id:
927 :param vdu_index:
928 :param pub_key: public ssh key to inject, None to skip
929 :param user: user to apply the public ssh key
930 :return: IP address
931 """
932
933 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
934 ro_nsr_id = None
935 ip_address = None
936 nb_tries = 0
937 target_vdu_id = None
938 ro_retries = 0
939
940 while True:
941
942 ro_retries += 1
943 if ro_retries >= 360: # 1 hour
944 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
945
946 await asyncio.sleep(10, loop=self.loop)
947 # wait until NS is deployed at RO
948 if not ro_nsr_id:
949 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
950 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
951 if not ro_nsr_id:
952 continue
953
954 # get ip address
955 if not target_vdu_id:
956 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
957
958 if not vdu_id: # for the VNF case
959 ip_address = db_vnfr.get("ip-address")
960 if not ip_address:
961 continue
962 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
963 else: # VDU case
964 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
965 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
966
967 if not vdur:
968 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
969 vnfr_id, vdu_id, vdu_index
970 ))
971
972 if vdur.get("status") == "ACTIVE":
973 ip_address = vdur.get("ip-address")
974 if not ip_address:
975 continue
976 target_vdu_id = vdur["vdu-id-ref"]
977 elif vdur.get("status") == "ERROR":
978 raise LcmException("Cannot inject ssh-key because target VM is in error state")
979
980 if not target_vdu_id:
981 continue
982
983 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
984
985 # inject public key into machine
986 if pub_key and user:
987 # self.logger.debug(logging_text + "Inserting RO key")
988 try:
989 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
990 result_dict = await self.RO.create_action(
991 item="ns",
992 item_id_name=ro_nsr_id,
993 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
994 )
995 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
996 if not result_dict or not isinstance(result_dict, dict):
997 raise LcmException("Unknown response from RO when injecting key")
998 for result in result_dict.values():
999 if result.get("vim_result") == 200:
1000 break
1001 else:
1002 raise ROclient.ROClientException("error injecting key: {}".format(
1003 result.get("description")))
1004 break
1005 except ROclient.ROClientException as e:
1006 if not nb_tries:
1007 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1008 format(e, 20*10))
1009 nb_tries += 1
1010 if nb_tries >= 20:
1011 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1012 else:
1013 break
1014
1015 return ip_address
1016
1017 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1018 """
1019 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1020 """
1021 my_vca = vca_deployed_list[vca_index]
1022 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1023 # vdu or kdu: no dependencies
1024 return
1025 timeout = 300
1026 while timeout >= 0:
1027 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1028 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1029 configuration_status_list = db_nsr["configurationStatus"]
1030 for index, vca_deployed in enumerate(configuration_status_list):
1031 if index == vca_index:
1032 # myself
1033 continue
1034 if not my_vca.get("member-vnf-index") or \
1035 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1036 internal_status = configuration_status_list[index].get("status")
1037 if internal_status == 'READY':
1038 continue
1039 elif internal_status == 'BROKEN':
1040 raise LcmException("Configuration aborted because dependent charm/s has failed")
1041 else:
1042 break
1043 else:
1044 # no dependencies, return
1045 return
1046 await asyncio.sleep(10)
1047 timeout -= 1
1048
1049 raise LcmException("Configuration aborted because dependent charm/s timeout")
1050
1051 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1052 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1053 nsr_id = db_nsr["_id"]
1054 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1055 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1056 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1057 db_dict = {
1058 'collection': 'nsrs',
1059 'filter': {'_id': nsr_id},
1060 'path': db_update_entry
1061 }
1062 step = ""
1063 try:
1064
1065 element_type = 'NS'
1066 element_under_configuration = nsr_id
1067
1068 vnfr_id = None
1069 if db_vnfr:
1070 vnfr_id = db_vnfr["_id"]
1071
1072 namespace = "{nsi}.{ns}".format(
1073 nsi=nsi_id if nsi_id else "",
1074 ns=nsr_id)
1075
1076 if vnfr_id:
1077 element_type = 'VNF'
1078 element_under_configuration = vnfr_id
1079 namespace += ".{}".format(vnfr_id)
1080 if vdu_id:
1081 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1082 element_type = 'VDU'
1083 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1084
1085 # Get artifact path
1086 self.fs.sync() # Sync from FSMongo
1087 artifact_path = "{}/{}/charms/{}".format(
1088 base_folder["folder"],
1089 base_folder["pkg-dir"],
1090 config_descriptor["juju"]["charm"]
1091 )
1092
1093 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1094 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1095 is_proxy_charm = False
1096
1097 # n2vc_redesign STEP 3.1
1098
1099 # find old ee_id if exists
1100 ee_id = vca_deployed.get("ee_id")
1101
1102 # create or register execution environment in VCA
1103 if is_proxy_charm:
1104
1105 self._write_configuration_status(
1106 nsr_id=nsr_id,
1107 vca_index=vca_index,
1108 status='CREATING',
1109 element_under_configuration=element_under_configuration,
1110 element_type=element_type
1111 )
1112
1113 step = "create execution environment"
1114 self.logger.debug(logging_text + step)
1115 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1116 reuse_ee_id=ee_id,
1117 db_dict=db_dict)
1118
1119 else:
1120 step = "Waiting to VM being up and getting IP address"
1121 self.logger.debug(logging_text + step)
1122 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1123 user=None, pub_key=None)
1124 credentials = {"hostname": rw_mgmt_ip}
1125 # get username
1126 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1127 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1128 # merged. Meanwhile let's get username from initial-config-primitive
1129 if not username and config_descriptor.get("initial-config-primitive"):
1130 for config_primitive in config_descriptor["initial-config-primitive"]:
1131 for param in config_primitive.get("parameter", ()):
1132 if param["name"] == "ssh-username":
1133 username = param["value"]
1134 break
1135 if not username:
1136 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1137 "'config-access.ssh-access.default-user'")
1138 credentials["username"] = username
1139 # n2vc_redesign STEP 3.2
1140
1141 self._write_configuration_status(
1142 nsr_id=nsr_id,
1143 vca_index=vca_index,
1144 status='REGISTERING',
1145 element_under_configuration=element_under_configuration,
1146 element_type=element_type
1147 )
1148
1149 step = "register execution environment {}".format(credentials)
1150 self.logger.debug(logging_text + step)
1151 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1152 db_dict=db_dict)
1153
1154 # for compatibility with MON/POL modules, the need model and application name at database
1155 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1156 ee_id_parts = ee_id.split('.')
1157 model_name = ee_id_parts[0]
1158 application_name = ee_id_parts[1]
1159 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1160 db_update_entry + "application": application_name,
1161 db_update_entry + "ee_id": ee_id})
1162
1163 # n2vc_redesign STEP 3.3
1164
1165 step = "Install configuration Software"
1166
1167 self._write_configuration_status(
1168 nsr_id=nsr_id,
1169 vca_index=vca_index,
1170 status='INSTALLING SW',
1171 element_under_configuration=element_under_configuration,
1172 element_type=element_type
1173 )
1174
1175 # TODO check if already done
1176 self.logger.debug(logging_text + step)
1177 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1178
1179 # write in db flag of configuration_sw already installed
1180 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1181
1182 # add relations for this VCA (wait for other peers related with this VCA)
1183 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1184
1185 # if SSH access is required, then get execution environment SSH public
1186 if is_proxy_charm: # if native charm we have waited already to VM be UP
1187 pub_key = None
1188 user = None
1189 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1190 # Needed to inject a ssh key
1191 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1192 step = "Install configuration Software, getting public ssh key"
1193 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1194
1195 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1196 else:
1197 step = "Waiting to VM being up and getting IP address"
1198 self.logger.debug(logging_text + step)
1199
1200 # n2vc_redesign STEP 5.1
1201 # wait for RO (ip-address) Insert pub_key into VM
1202 if vnfr_id:
1203 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1204 user=user, pub_key=pub_key)
1205 else:
1206 rw_mgmt_ip = None # This is for a NS configuration
1207
1208 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1209
1210 # store rw_mgmt_ip in deploy params for later replacement
1211 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1212
1213 # n2vc_redesign STEP 6 Execute initial config primitive
1214 step = 'execute initial config primitive'
1215 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1216
1217 # sort initial config primitives by 'seq'
1218 if initial_config_primitive_list:
1219 try:
1220 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1221 except Exception as e:
1222 self.logger.error(logging_text + step + ": " + str(e))
1223 else:
1224 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1225
1226 # add config if not present for NS charm
1227 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1228 vca_deployed)
1229
1230 # wait for dependent primitives execution (NS -> VNF -> VDU)
1231 if initial_config_primitive_list:
1232 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1233
1234 # stage, in function of element type: vdu, kdu, vnf or ns
1235 my_vca = vca_deployed_list[vca_index]
1236 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1237 # VDU or KDU
1238 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1239 elif my_vca.get("member-vnf-index"):
1240 # VNF
1241 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1242 else:
1243 # NS
1244 stage = 'Stage 5/5: running Day-1 primitives for NS'
1245
1246 self._write_configuration_status(
1247 nsr_id=nsr_id,
1248 vca_index=vca_index,
1249 status='EXECUTING PRIMITIVE'
1250 )
1251
1252 self._write_op_status(
1253 op_id=nslcmop_id,
1254 stage=stage
1255 )
1256
1257 for initial_config_primitive in initial_config_primitive_list:
1258 # adding information on the vca_deployed if it is a NS execution environment
1259 if not vca_deployed["member-vnf-index"]:
1260 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1261 # TODO check if already done
1262 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1263
1264 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1265 self.logger.debug(logging_text + step)
1266 await self.n2vc.exec_primitive(
1267 ee_id=ee_id,
1268 primitive_name=initial_config_primitive["name"],
1269 params_dict=primitive_params_,
1270 db_dict=db_dict
1271 )
1272
1273 # TODO register in database that primitive is done
1274
1275 step = "instantiated at VCA"
1276 self.logger.debug(logging_text + step)
1277
1278 self._write_configuration_status(
1279 nsr_id=nsr_id,
1280 vca_index=vca_index,
1281 status='READY'
1282 )
1283
1284 except Exception as e: # TODO not use Exception but N2VC exception
1285 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1286 self._write_configuration_status(
1287 nsr_id=nsr_id,
1288 vca_index=vca_index,
1289 status='BROKEN'
1290 )
1291 raise Exception("{} {}".format(step, e)) from e
1292 # TODO raise N2VC exception with 'step' extra information
1293
1294 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1295 error_description: str = None):
1296 try:
1297 db_dict = dict()
1298 if ns_state:
1299 db_dict["nsState"] = ns_state
1300 db_dict["currentOperation"] = current_operation
1301 db_dict["currentOperationID"] = current_operation_id
1302 db_dict["errorDescription"] = error_description
1303 self.update_db_2("nsrs", nsr_id, db_dict)
1304 except Exception as e:
1305 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1306
1307 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1308 try:
1309 db_dict = dict()
1310 db_dict['queuePosition'] = queuePosition
1311 db_dict['stage'] = stage
1312 if error_message:
1313 db_dict['errorMessage'] = error_message
1314 self.update_db_2("nslcmops", op_id, db_dict)
1315 except Exception as e:
1316 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1317
1318 def _write_all_config_status(self, nsr_id: str, status: str):
1319 try:
1320 # nsrs record
1321 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1322 # configurationStatus
1323 config_status = db_nsr.get('configurationStatus')
1324 if config_status:
1325 # update status
1326 db_dict = dict()
1327 db_dict['configurationStatus'] = list()
1328 for c in config_status:
1329 c['status'] = status
1330 db_dict['configurationStatus'].append(c)
1331 self.update_db_2("nsrs", nsr_id, db_dict)
1332
1333 except Exception as e:
1334 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1335
1336 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1337 element_under_configuration: str = None, element_type: str = None):
1338
1339 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1340 # .format(vca_index, status))
1341
1342 try:
1343 db_path = 'configurationStatus.{}.'.format(vca_index)
1344 db_dict = dict()
1345 if status:
1346 db_dict[db_path + 'status'] = status
1347 if element_under_configuration:
1348 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1349 if element_type:
1350 db_dict[db_path + 'elementType'] = element_type
1351 self.update_db_2("nsrs", nsr_id, db_dict)
1352 except Exception as e:
1353 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1354 .format(status, nsr_id, vca_index, e))
1355
1356 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1357 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1358 if placement_engine == "PLA":
1359 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1360 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1361 db_poll_interval = 5
1362 wait = db_poll_interval * 4
1363 pla_result = None
1364 while not pla_result and wait >= 0:
1365 await asyncio.sleep(db_poll_interval)
1366 wait -= db_poll_interval
1367 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1368 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1369
1370 if not pla_result:
1371 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1372
1373 for pla_vnf in pla_result['vnf']:
1374 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1375 if not pla_vnf.get('vimAccountId') or not vnfr:
1376 continue
1377 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1378 return
1379
1380 def update_nsrs_with_pla_result(self, params):
1381 try:
1382 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1383 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1384 except Exception as e:
1385 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1386
1387 async def instantiate(self, nsr_id, nslcmop_id):
1388 """
1389
1390 :param nsr_id: ns instance to deploy
1391 :param nslcmop_id: operation to run
1392 :return:
1393 """
1394
1395 # Try to lock HA task here
1396 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1397 if not task_is_locked_by_me:
1398 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1399 return
1400
1401 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1402 self.logger.debug(logging_text + "Enter")
1403
1404 # get all needed from database
1405
1406 # database nsrs record
1407 db_nsr = None
1408
1409 # database nslcmops record
1410 db_nslcmop = None
1411
1412 # update operation on nsrs
1413 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1414 "_admin.current-operation": nslcmop_id,
1415 "_admin.operation-type": "instantiate"}
1416 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1417
1418 # update operation on nslcmops
1419 db_nslcmop_update = {}
1420
1421 nslcmop_operation_state = None
1422 db_vnfrs = {} # vnf's info indexed by member-index
1423 # n2vc_info = {}
1424 task_instantiation_list = []
1425 task_instantiation_info = {} # from task to info text
1426 exc = None
1427 try:
1428 # wait for any previous tasks in process
1429 step = "Waiting for previous operations to terminate"
1430 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1431
1432 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1433
1434 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1435 self._write_ns_status(
1436 nsr_id=nsr_id,
1437 ns_state="BUILDING",
1438 current_operation="INSTANTIATING",
1439 current_operation_id=nslcmop_id
1440 )
1441
1442 # read from db: operation
1443 step = "Getting nslcmop={} from db".format(nslcmop_id)
1444 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1445 ns_params = db_nslcmop.get("operationParams")
1446 if ns_params and ns_params.get("timeout_ns_deploy"):
1447 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1448 else:
1449 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1450
1451 # read from db: ns
1452 step = "Getting nsr={} from db".format(nsr_id)
1453 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1454 # nsd is replicated into ns (no db read)
1455 nsd = db_nsr["nsd"]
1456 # nsr_name = db_nsr["name"] # TODO short-name??
1457
1458 # read from db: vnf's of this ns
1459 step = "Getting vnfrs from db"
1460 self.logger.debug(logging_text + step)
1461 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1462
1463 # read from db: vnfd's for every vnf
1464 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1465 db_vnfds = {} # every vnfd data indexed by vnf id
1466 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1467
1468 self._write_op_status(
1469 op_id=nslcmop_id,
1470 stage='Stage 1/5: preparation of the environment',
1471 queuePosition=0
1472 )
1473
1474 # for each vnf in ns, read vnfd
1475 for vnfr in db_vnfrs_list:
1476 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1477 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1478 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1479 # if we haven't this vnfd, read it from db
1480 if vnfd_id not in db_vnfds:
1481 # read from db
1482 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1483 self.logger.debug(logging_text + step)
1484 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1485
1486 # store vnfd
1487 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1488 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1489 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1490
1491 # Get or generates the _admin.deployed.VCA list
1492 vca_deployed_list = None
1493 if db_nsr["_admin"].get("deployed"):
1494 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1495 if vca_deployed_list is None:
1496 vca_deployed_list = []
1497 configuration_status_list = []
1498 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1499 db_nsr_update["configurationStatus"] = configuration_status_list
1500 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1501 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1502 elif isinstance(vca_deployed_list, dict):
1503 # maintain backward compatibility. Change a dict to list at database
1504 vca_deployed_list = list(vca_deployed_list.values())
1505 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1506 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1507
1508 db_nsr_update["detailed-status"] = "creating"
1509 db_nsr_update["operational-status"] = "init"
1510
1511 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1512 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1513 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1514
1515 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1516 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1517 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1518
1519 # n2vc_redesign STEP 2 Deploy Network Scenario
1520
1521 self._write_op_status(
1522 op_id=nslcmop_id,
1523 stage='Stage 2/5: deployment of VMs and execution environments'
1524 )
1525
1526 self.logger.debug(logging_text + "Before deploy_kdus")
1527 # Call to deploy_kdus in case exists the "vdu:kdu" param
1528 task_kdu = asyncio.ensure_future(
1529 self.deploy_kdus(
1530 logging_text=logging_text,
1531 nsr_id=nsr_id,
1532 db_nsr=db_nsr,
1533 db_vnfrs=db_vnfrs,
1534 db_vnfds=db_vnfds
1535 )
1536 )
1537 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1538 task_instantiation_info[task_kdu] = "Deploy KDUs"
1539 task_instantiation_list.append(task_kdu)
1540 # n2vc_redesign STEP 1 Get VCA public ssh-key
1541 # feature 1429. Add n2vc public key to needed VMs
1542 n2vc_key = self.n2vc.get_public_key()
1543 n2vc_key_list = [n2vc_key]
1544 if self.vca_config.get("public_key"):
1545 n2vc_key_list.append(self.vca_config["public_key"])
1546
1547 task_ro = asyncio.ensure_future(
1548 self.instantiate_RO(
1549 logging_text=logging_text,
1550 nsr_id=nsr_id,
1551 nsd=nsd,
1552 db_nsr=db_nsr,
1553 db_nslcmop=db_nslcmop,
1554 db_vnfrs=db_vnfrs,
1555 db_vnfds_ref=db_vnfds_ref,
1556 n2vc_key_list=n2vc_key_list
1557 )
1558 )
1559 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1560 task_instantiation_info[task_ro] = "Deploy at VIM"
1561 task_instantiation_list.append(task_ro)
1562
1563 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1564 step = "Deploying proxy and native charms"
1565 self.logger.debug(logging_text + step)
1566
1567 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1568 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1569 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1570 vnfd_id = c_vnf["vnfd-id-ref"]
1571 vnfd = db_vnfds_ref[vnfd_id]
1572 member_vnf_index = str(c_vnf["member-vnf-index"])
1573 db_vnfr = db_vnfrs[member_vnf_index]
1574 base_folder = vnfd["_admin"]["storage"]
1575 vdu_id = None
1576 vdu_index = 0
1577 vdu_name = None
1578 kdu_name = None
1579
1580 # Get additional parameters
1581 deploy_params = {}
1582 if db_vnfr.get("additionalParamsForVnf"):
1583 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1584
1585 descriptor_config = vnfd.get("vnf-configuration")
1586 if descriptor_config and descriptor_config.get("juju"):
1587 self._deploy_n2vc(
1588 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1589 db_nsr=db_nsr,
1590 db_vnfr=db_vnfr,
1591 nslcmop_id=nslcmop_id,
1592 nsr_id=nsr_id,
1593 nsi_id=nsi_id,
1594 vnfd_id=vnfd_id,
1595 vdu_id=vdu_id,
1596 kdu_name=kdu_name,
1597 member_vnf_index=member_vnf_index,
1598 vdu_index=vdu_index,
1599 vdu_name=vdu_name,
1600 deploy_params=deploy_params,
1601 descriptor_config=descriptor_config,
1602 base_folder=base_folder,
1603 task_instantiation_list=task_instantiation_list,
1604 task_instantiation_info=task_instantiation_info
1605 )
1606
1607 # Deploy charms for each VDU that supports one.
1608 for vdud in get_iterable(vnfd, 'vdu'):
1609 vdu_id = vdud["id"]
1610 descriptor_config = vdud.get('vdu-configuration')
1611 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1612 if vdur.get("additionalParams"):
1613 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1614 else:
1615 deploy_params_vdu = deploy_params
1616 if descriptor_config and descriptor_config.get("juju"):
1617 # look for vdu index in the db_vnfr["vdu"] section
1618 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1619 # if vdur["vdu-id-ref"] == vdu_id:
1620 # break
1621 # else:
1622 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1623 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1624 # vdu_name = vdur.get("name")
1625 vdu_name = None
1626 kdu_name = None
1627 for vdu_index in range(int(vdud.get("count", 1))):
1628 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1629 self._deploy_n2vc(
1630 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1631 member_vnf_index, vdu_id, vdu_index),
1632 db_nsr=db_nsr,
1633 db_vnfr=db_vnfr,
1634 nslcmop_id=nslcmop_id,
1635 nsr_id=nsr_id,
1636 nsi_id=nsi_id,
1637 vnfd_id=vnfd_id,
1638 vdu_id=vdu_id,
1639 kdu_name=kdu_name,
1640 member_vnf_index=member_vnf_index,
1641 vdu_index=vdu_index,
1642 vdu_name=vdu_name,
1643 deploy_params=deploy_params_vdu,
1644 descriptor_config=descriptor_config,
1645 base_folder=base_folder,
1646 task_instantiation_list=task_instantiation_list,
1647 task_instantiation_info=task_instantiation_info
1648 )
1649 for kdud in get_iterable(vnfd, 'kdu'):
1650 kdu_name = kdud["name"]
1651 descriptor_config = kdud.get('kdu-configuration')
1652 if descriptor_config and descriptor_config.get("juju"):
1653 vdu_id = None
1654 vdu_index = 0
1655 vdu_name = None
1656 # look for vdu index in the db_vnfr["vdu"] section
1657 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1658 # if vdur["vdu-id-ref"] == vdu_id:
1659 # break
1660 # else:
1661 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1662 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1663 # vdu_name = vdur.get("name")
1664 # vdu_name = None
1665
1666 self._deploy_n2vc(
1667 logging_text=logging_text,
1668 db_nsr=db_nsr,
1669 db_vnfr=db_vnfr,
1670 nslcmop_id=nslcmop_id,
1671 nsr_id=nsr_id,
1672 nsi_id=nsi_id,
1673 vnfd_id=vnfd_id,
1674 vdu_id=vdu_id,
1675 kdu_name=kdu_name,
1676 member_vnf_index=member_vnf_index,
1677 vdu_index=vdu_index,
1678 vdu_name=vdu_name,
1679 deploy_params=deploy_params,
1680 descriptor_config=descriptor_config,
1681 base_folder=base_folder,
1682 task_instantiation_list=task_instantiation_list,
1683 task_instantiation_info=task_instantiation_info
1684 )
1685
1686 # Check if this NS has a charm configuration
1687 descriptor_config = nsd.get("ns-configuration")
1688 if descriptor_config and descriptor_config.get("juju"):
1689 vnfd_id = None
1690 db_vnfr = None
1691 member_vnf_index = None
1692 vdu_id = None
1693 kdu_name = None
1694 vdu_index = 0
1695 vdu_name = None
1696
1697 # Get additional parameters
1698 deploy_params = {}
1699 if db_nsr.get("additionalParamsForNs"):
1700 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1701 base_folder = nsd["_admin"]["storage"]
1702 self._deploy_n2vc(
1703 logging_text=logging_text,
1704 db_nsr=db_nsr,
1705 db_vnfr=db_vnfr,
1706 nslcmop_id=nslcmop_id,
1707 nsr_id=nsr_id,
1708 nsi_id=nsi_id,
1709 vnfd_id=vnfd_id,
1710 vdu_id=vdu_id,
1711 kdu_name=kdu_name,
1712 member_vnf_index=member_vnf_index,
1713 vdu_index=vdu_index,
1714 vdu_name=vdu_name,
1715 deploy_params=deploy_params,
1716 descriptor_config=descriptor_config,
1717 base_folder=base_folder,
1718 task_instantiation_list=task_instantiation_list,
1719 task_instantiation_info=task_instantiation_info
1720 )
1721
1722 # Wait until all tasks of "task_instantiation_list" have been finished
1723
1724 error_text_list = []
1725
1726 # let's begin with all OK
1727 instantiated_ok = True
1728 # let's begin with RO 'running' status (later we can change it)
1729 db_nsr_update["operational-status"] = "running"
1730 # let's begin with VCA 'configured' status (later we can change it)
1731 db_nsr_update["config-status"] = "configured"
1732
1733 step = "Waiting for tasks to be finished"
1734 if task_instantiation_list:
1735 # wait for all tasks completion
1736 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1737
1738 for task in pending:
1739 instantiated_ok = False
1740 if task in (task_ro, task_kdu):
1741 # RO or KDU task is pending
1742 db_nsr_update["operational-status"] = "failed"
1743 else:
1744 # A N2VC task is pending
1745 db_nsr_update["config-status"] = "failed"
1746 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1747 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1748 for task in done:
1749 if task.cancelled():
1750 instantiated_ok = False
1751 if task in (task_ro, task_kdu):
1752 # RO or KDU task was cancelled
1753 db_nsr_update["operational-status"] = "failed"
1754 else:
1755 # A N2VC was cancelled
1756 db_nsr_update["config-status"] = "failed"
1757 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1758 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1759 else:
1760 exc = task.exception()
1761 if exc:
1762 instantiated_ok = False
1763 if task in (task_ro, task_kdu):
1764 # RO or KDU task raised an exception
1765 db_nsr_update["operational-status"] = "failed"
1766 else:
1767 # A N2VC task raised an exception
1768 db_nsr_update["config-status"] = "failed"
1769 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1770
1771 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1772 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1773 else:
1774 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1775 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1776 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1777 else:
1778 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1779
1780 if error_text_list:
1781 error_text = "\n".join(error_text_list)
1782 db_nsr_update["detailed-status"] = error_text
1783 db_nslcmop_update["detailed-status"] = error_text
1784 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1785 db_nslcmop_update["statusEnteredTime"] = time()
1786 else:
1787 # all is done
1788 db_nsr_update["detailed-status"] = "done"
1789 db_nslcmop_update["detailed-status"] = "done"
1790 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1791 db_nslcmop_update["statusEnteredTime"] = time()
1792
1793 except (ROclient.ROClientException, DbException, LcmException) as e:
1794 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1795 exc = e
1796 except asyncio.CancelledError:
1797 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1798 exc = "Operation was cancelled"
1799 except Exception as e:
1800 exc = traceback.format_exc()
1801 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1802 exc_info=True)
1803 finally:
1804 if exc:
1805 if db_nsr:
1806 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1807 db_nsr_update["operational-status"] = "failed"
1808 db_nsr_update["config-status"] = "failed"
1809 if db_nslcmop:
1810 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1811 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1812 db_nslcmop_update["statusEnteredTime"] = time()
1813 try:
1814 if db_nsr:
1815 db_nsr_update["_admin.nslcmop"] = None
1816 db_nsr_update["_admin.current-operation"] = None
1817 db_nsr_update["_admin.operation-type"] = None
1818 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1819
1820 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1821 ns_state = None
1822 error_description = None
1823 if instantiated_ok:
1824 ns_state = "READY"
1825 else:
1826 ns_state = "BROKEN"
1827 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1828
1829 self._write_ns_status(
1830 nsr_id=nsr_id,
1831 ns_state=ns_state,
1832 current_operation="IDLE",
1833 current_operation_id=None,
1834 error_description=error_description
1835 )
1836
1837 self._write_op_status(
1838 op_id=nslcmop_id,
1839 error_message=error_description
1840 )
1841
1842 if db_nslcmop_update:
1843 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1844
1845 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1846
1847 except DbException as e:
1848 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1849
1850 if nslcmop_operation_state:
1851 try:
1852 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1853 "operationState": nslcmop_operation_state},
1854 loop=self.loop)
1855 except Exception as e:
1856 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1857
1858 self.logger.debug(logging_text + "Exit")
1859 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1860
1861 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1862
1863 # steps:
1864 # 1. find all relations for this VCA
1865 # 2. wait for other peers related
1866 # 3. add relations
1867
1868 try:
1869
1870 # STEP 1: find all relations for this VCA
1871
1872 # read nsr record
1873 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1874
1875 # this VCA data
1876 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1877
1878 # read all ns-configuration relations
1879 ns_relations = list()
1880 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1881 if db_ns_relations:
1882 for r in db_ns_relations:
1883 # check if this VCA is in the relation
1884 if my_vca.get('member-vnf-index') in\
1885 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1886 ns_relations.append(r)
1887
1888 # read all vnf-configuration relations
1889 vnf_relations = list()
1890 db_vnfd_list = db_nsr.get('vnfd-id')
1891 if db_vnfd_list:
1892 for vnfd in db_vnfd_list:
1893 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1894 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1895 if db_vnf_relations:
1896 for r in db_vnf_relations:
1897 # check if this VCA is in the relation
1898 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1899 vnf_relations.append(r)
1900
1901 # if no relations, terminate
1902 if not ns_relations and not vnf_relations:
1903 self.logger.debug(logging_text + ' No relations')
1904 return True
1905
1906 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1907
1908 # add all relations
1909 start = time()
1910 while True:
1911 # check timeout
1912 now = time()
1913 if now - start >= timeout:
1914 self.logger.error(logging_text + ' : timeout adding relations')
1915 return False
1916
1917 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1918 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1919
1920 # for each defined NS relation, find the VCA's related
1921 for r in ns_relations:
1922 from_vca_ee_id = None
1923 to_vca_ee_id = None
1924 from_vca_endpoint = None
1925 to_vca_endpoint = None
1926 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1927 for vca in vca_list:
1928 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1929 and vca.get('config_sw_installed'):
1930 from_vca_ee_id = vca.get('ee_id')
1931 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1932 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1933 and vca.get('config_sw_installed'):
1934 to_vca_ee_id = vca.get('ee_id')
1935 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1936 if from_vca_ee_id and to_vca_ee_id:
1937 # add relation
1938 await self.n2vc.add_relation(
1939 ee_id_1=from_vca_ee_id,
1940 ee_id_2=to_vca_ee_id,
1941 endpoint_1=from_vca_endpoint,
1942 endpoint_2=to_vca_endpoint)
1943 # remove entry from relations list
1944 ns_relations.remove(r)
1945 else:
1946 # check failed peers
1947 try:
1948 vca_status_list = db_nsr.get('configurationStatus')
1949 if vca_status_list:
1950 for i in range(len(vca_list)):
1951 vca = vca_list[i]
1952 vca_status = vca_status_list[i]
1953 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1954 if vca_status.get('status') == 'BROKEN':
1955 # peer broken: remove relation from list
1956 ns_relations.remove(r)
1957 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1958 if vca_status.get('status') == 'BROKEN':
1959 # peer broken: remove relation from list
1960 ns_relations.remove(r)
1961 except Exception:
1962 # ignore
1963 pass
1964
1965 # for each defined VNF relation, find the VCA's related
1966 for r in vnf_relations:
1967 from_vca_ee_id = None
1968 to_vca_ee_id = None
1969 from_vca_endpoint = None
1970 to_vca_endpoint = None
1971 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1972 for vca in vca_list:
1973 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1974 from_vca_ee_id = vca.get('ee_id')
1975 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1976 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1977 to_vca_ee_id = vca.get('ee_id')
1978 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1979 if from_vca_ee_id and to_vca_ee_id:
1980 # add relation
1981 await self.n2vc.add_relation(
1982 ee_id_1=from_vca_ee_id,
1983 ee_id_2=to_vca_ee_id,
1984 endpoint_1=from_vca_endpoint,
1985 endpoint_2=to_vca_endpoint)
1986 # remove entry from relations list
1987 vnf_relations.remove(r)
1988 else:
1989 # check failed peers
1990 try:
1991 vca_status_list = db_nsr.get('configurationStatus')
1992 if vca_status_list:
1993 for i in range(len(vca_list)):
1994 vca = vca_list[i]
1995 vca_status = vca_status_list[i]
1996 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
1997 if vca_status.get('status') == 'BROKEN':
1998 # peer broken: remove relation from list
1999 ns_relations.remove(r)
2000 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2001 if vca_status.get('status') == 'BROKEN':
2002 # peer broken: remove relation from list
2003 ns_relations.remove(r)
2004 except Exception:
2005 # ignore
2006 pass
2007
2008 # wait for next try
2009 await asyncio.sleep(5.0)
2010
2011 if not ns_relations and not vnf_relations:
2012 self.logger.debug('Relations added')
2013 break
2014
2015 return True
2016
2017 except Exception as e:
2018 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2019 return False
2020
2021 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
2022 # Launch kdus if present in the descriptor
2023
2024 deployed_ok = True
2025
2026 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2027
2028 def _get_cluster_id(cluster_id, cluster_type):
2029 nonlocal k8scluster_id_2_uuic
2030 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2031 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2032
2033 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2034 if not db_k8scluster:
2035 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2036 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2037 if not k8s_id:
2038 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2039 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2040 return k8s_id
2041
2042 logging_text += "Deploy kdus: "
2043 try:
2044 db_nsr_update = {"_admin.deployed.K8s": []}
2045 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2046
2047 # Look for all vnfds
2048 pending_tasks = {}
2049 index = 0
2050 for vnfr_data in db_vnfrs.values():
2051 for kdur in get_iterable(vnfr_data, "kdur"):
2052 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2053 kdumodel = None
2054 k8sclustertype = None
2055 error_text = None
2056 cluster_uuid = None
2057 vnfd_id = vnfr_data.get('vnfd-id')
2058 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2059 if kdur.get("helm-chart"):
2060 kdumodel = kdur["helm-chart"]
2061 k8sclustertype = "chart"
2062 k8sclustertype_full = "helm-chart"
2063 elif kdur.get("juju-bundle"):
2064 kdumodel = kdur["juju-bundle"]
2065 k8sclustertype = "juju"
2066 k8sclustertype_full = "juju-bundle"
2067 else:
2068 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
2069 " running"
2070 # check if kdumodel is a file and exists
2071 try:
2072 # path format: /vnfdid/pkkdir/kdumodel
2073 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
2074 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2075 kdumodel = self.fs.path + filename
2076 except Exception:
2077 # it is not a file
2078 pass
2079
2080 step = "Prepare instantiate KDU {} in k8s cluster {}".format(
2081 kdur["kdu-name"], kdur["k8s-cluster"]["id"])
2082
2083 try:
2084 if not error_text:
2085 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
2086
2087 updated_cluster_list = []
2088 if k8sclustertype == "chart" and cluster_uuid not in updated_cluster_list:
2089 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2090 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2091 if del_repo_list or added_repo_dict:
2092 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2093 updated = {'_admin.helm_charts_added.' +
2094 item: name for item, name in added_repo_dict.items()}
2095 self.logger.debug("repos synchronized, to_delete: {}, to_add: {}".
2096 format(del_repo_list, added_repo_dict))
2097 self.db.set_one("k8sclusters", {"_id": kdur["k8s-cluster"]["id"]},
2098 updated, unset=unset)
2099 updated_cluster_list.append(cluster_uuid)
2100
2101 except LcmException as e:
2102 error_text = str(e)
2103 deployed_ok = False
2104
2105 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
2106
2107 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2108 "k8scluster-type": k8sclustertype,
2109 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2110 if error_text:
2111 k8s_instace_info["detailed-status"] = error_text
2112 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2113 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2114 if error_text:
2115 continue
2116
2117 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
2118 "{}".format(index)}
2119
2120 if k8sclustertype == "chart":
2121 task = asyncio.ensure_future(
2122 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2123 params=desc_params, db_dict=db_dict, timeout=3600)
2124 )
2125 else:
2126 task = asyncio.ensure_future(
2127 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2128 atomic=True, params=desc_params,
2129 db_dict=db_dict, timeout=600,
2130 kdu_name=kdur["kdu-name"])
2131 )
2132
2133 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
2134 index += 1
2135
2136 if pending_tasks:
2137 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2138 pending_list = list(pending_tasks.keys())
2139 while pending_list:
2140 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
2141 return_when=asyncio.FIRST_COMPLETED)
2142 if not done_list: # timeout
2143 for task in pending_list:
2144 db_nsr_update[pending_tasks[task] + "detailed-status"] = "Timeout"
2145 deployed_ok = False
2146 break
2147 for task in done_list:
2148 exc = task.exception()
2149 if exc:
2150 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
2151 deployed_ok = False
2152 else:
2153 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
2154
2155 if not deployed_ok:
2156 raise LcmException('Cannot deploy KDUs')
2157
2158 except Exception as e:
2159 msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
2160 self.logger.error(msg)
2161 raise LcmException(msg)
2162 finally:
2163 if db_nsr_update:
2164 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2165
2166 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2167 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2168 base_folder, task_instantiation_list, task_instantiation_info):
2169 # launch instantiate_N2VC in a asyncio task and register task object
2170 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2171 # if not found, create one entry and update database
2172
2173 # fill db_nsr._admin.deployed.VCA.<index>
2174 vca_index = -1
2175 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2176 if not vca_deployed:
2177 continue
2178 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2179 vca_deployed.get("vdu_id") == vdu_id and \
2180 vca_deployed.get("kdu_name") == kdu_name and \
2181 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2182 break
2183 else:
2184 # not found, create one.
2185 vca_deployed = {
2186 "member-vnf-index": member_vnf_index,
2187 "vdu_id": vdu_id,
2188 "kdu_name": kdu_name,
2189 "vdu_count_index": vdu_index,
2190 "operational-status": "init", # TODO revise
2191 "detailed-status": "", # TODO revise
2192 "step": "initial-deploy", # TODO revise
2193 "vnfd_id": vnfd_id,
2194 "vdu_name": vdu_name,
2195 }
2196 vca_index += 1
2197
2198 # create VCA and configurationStatus in db
2199 db_dict = {
2200 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2201 "configurationStatus.{}".format(vca_index): dict()
2202 }
2203 self.update_db_2("nsrs", nsr_id, db_dict)
2204
2205 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2206
2207 # Launch task
2208 task_n2vc = asyncio.ensure_future(
2209 self.instantiate_N2VC(
2210 logging_text=logging_text,
2211 vca_index=vca_index,
2212 nsi_id=nsi_id,
2213 db_nsr=db_nsr,
2214 db_vnfr=db_vnfr,
2215 vdu_id=vdu_id,
2216 kdu_name=kdu_name,
2217 vdu_index=vdu_index,
2218 deploy_params=deploy_params,
2219 config_descriptor=descriptor_config,
2220 base_folder=base_folder,
2221 nslcmop_id=nslcmop_id
2222 )
2223 )
2224 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2225 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2226 task_instantiation_list.append(task_n2vc)
2227
2228 # Check if this VNFD has a configured terminate action
2229 def _has_terminate_config_primitive(self, vnfd):
2230 vnf_config = vnfd.get("vnf-configuration")
2231 if vnf_config and vnf_config.get("terminate-config-primitive"):
2232 return True
2233 else:
2234 return False
2235
2236 @staticmethod
2237 def _get_terminate_config_primitive_seq_list(vnfd):
2238 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2239 # No need to check for existing primitive twice, already done before
2240 vnf_config = vnfd.get("vnf-configuration")
2241 seq_list = vnf_config.get("terminate-config-primitive")
2242 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2243 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2244 return seq_list_sorted
2245
2246 @staticmethod
2247 def _create_nslcmop(nsr_id, operation, params):
2248 """
2249 Creates a ns-lcm-opp content to be stored at database.
2250 :param nsr_id: internal id of the instance
2251 :param operation: instantiate, terminate, scale, action, ...
2252 :param params: user parameters for the operation
2253 :return: dictionary following SOL005 format
2254 """
2255 # Raise exception if invalid arguments
2256 if not (nsr_id and operation and params):
2257 raise LcmException(
2258 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2259 now = time()
2260 _id = str(uuid4())
2261 nslcmop = {
2262 "id": _id,
2263 "_id": _id,
2264 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2265 "operationState": "PROCESSING",
2266 "statusEnteredTime": now,
2267 "nsInstanceId": nsr_id,
2268 "lcmOperationType": operation,
2269 "startTime": now,
2270 "isAutomaticInvocation": False,
2271 "operationParams": params,
2272 "isCancelPending": False,
2273 "links": {
2274 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2275 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2276 }
2277 }
2278 return nslcmop
2279
2280 def _format_additional_params(self, params):
2281 params = params or {}
2282 for key, value in params.items():
2283 if str(value).startswith("!!yaml "):
2284 params[key] = yaml.safe_load(value[7:])
2285 return params
2286
2287 def _get_terminate_primitive_params(self, seq, vnf_index):
2288 primitive = seq.get('name')
2289 primitive_params = {}
2290 params = {
2291 "member_vnf_index": vnf_index,
2292 "primitive": primitive,
2293 "primitive_params": primitive_params,
2294 }
2295 desc_params = {}
2296 return self._map_primitive_params(seq, params, desc_params)
2297
2298 # sub-operations
2299
2300 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2301 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2302 if (op.get('operationState') == 'COMPLETED'):
2303 # b. Skip sub-operation
2304 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2305 return self.SUBOPERATION_STATUS_SKIP
2306 else:
2307 # c. Reintent executing sub-operation
2308 # The sub-operation exists, and operationState != 'COMPLETED'
2309 # Update operationState = 'PROCESSING' to indicate a reintent.
2310 operationState = 'PROCESSING'
2311 detailed_status = 'In progress'
2312 self._update_suboperation_status(
2313 db_nslcmop, op_index, operationState, detailed_status)
2314 # Return the sub-operation index
2315 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2316 # with arguments extracted from the sub-operation
2317 return op_index
2318
2319 # Find a sub-operation where all keys in a matching dictionary must match
2320 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2321 def _find_suboperation(self, db_nslcmop, match):
2322 if (db_nslcmop and match):
2323 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2324 for i, op in enumerate(op_list):
2325 if all(op.get(k) == match[k] for k in match):
2326 return i
2327 return self.SUBOPERATION_STATUS_NOT_FOUND
2328
2329 # Update status for a sub-operation given its index
2330 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2331 # Update DB for HA tasks
2332 q_filter = {'_id': db_nslcmop['_id']}
2333 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2334 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2335 self.db.set_one("nslcmops",
2336 q_filter=q_filter,
2337 update_dict=update_dict,
2338 fail_on_empty=False)
2339
2340 # Add sub-operation, return the index of the added sub-operation
2341 # Optionally, set operationState, detailed-status, and operationType
2342 # Status and type are currently set for 'scale' sub-operations:
2343 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2344 # 'detailed-status' : status message
2345 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2346 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2347 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2348 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2349 RO_nsr_id=None, RO_scaling_info=None):
2350 if not (db_nslcmop):
2351 return self.SUBOPERATION_STATUS_NOT_FOUND
2352 # Get the "_admin.operations" list, if it exists
2353 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2354 op_list = db_nslcmop_admin.get('operations')
2355 # Create or append to the "_admin.operations" list
2356 new_op = {'member_vnf_index': vnf_index,
2357 'vdu_id': vdu_id,
2358 'vdu_count_index': vdu_count_index,
2359 'primitive': primitive,
2360 'primitive_params': mapped_primitive_params}
2361 if operationState:
2362 new_op['operationState'] = operationState
2363 if detailed_status:
2364 new_op['detailed-status'] = detailed_status
2365 if operationType:
2366 new_op['lcmOperationType'] = operationType
2367 if RO_nsr_id:
2368 new_op['RO_nsr_id'] = RO_nsr_id
2369 if RO_scaling_info:
2370 new_op['RO_scaling_info'] = RO_scaling_info
2371 if not op_list:
2372 # No existing operations, create key 'operations' with current operation as first list element
2373 db_nslcmop_admin.update({'operations': [new_op]})
2374 op_list = db_nslcmop_admin.get('operations')
2375 else:
2376 # Existing operations, append operation to list
2377 op_list.append(new_op)
2378
2379 db_nslcmop_update = {'_admin.operations': op_list}
2380 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2381 op_index = len(op_list) - 1
2382 return op_index
2383
2384 # Helper methods for scale() sub-operations
2385
2386 # pre-scale/post-scale:
2387 # Check for 3 different cases:
2388 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2389 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2390 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2391 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2392 operationType, RO_nsr_id=None, RO_scaling_info=None):
2393 # Find this sub-operation
2394 if (RO_nsr_id and RO_scaling_info):
2395 operationType = 'SCALE-RO'
2396 match = {
2397 'member_vnf_index': vnf_index,
2398 'RO_nsr_id': RO_nsr_id,
2399 'RO_scaling_info': RO_scaling_info,
2400 }
2401 else:
2402 match = {
2403 'member_vnf_index': vnf_index,
2404 'primitive': vnf_config_primitive,
2405 'primitive_params': primitive_params,
2406 'lcmOperationType': operationType
2407 }
2408 op_index = self._find_suboperation(db_nslcmop, match)
2409 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2410 # a. New sub-operation
2411 # The sub-operation does not exist, add it.
2412 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2413 # The following parameters are set to None for all kind of scaling:
2414 vdu_id = None
2415 vdu_count_index = None
2416 vdu_name = None
2417 if (RO_nsr_id and RO_scaling_info):
2418 vnf_config_primitive = None
2419 primitive_params = None
2420 else:
2421 RO_nsr_id = None
2422 RO_scaling_info = None
2423 # Initial status for sub-operation
2424 operationState = 'PROCESSING'
2425 detailed_status = 'In progress'
2426 # Add sub-operation for pre/post-scaling (zero or more operations)
2427 self._add_suboperation(db_nslcmop,
2428 vnf_index,
2429 vdu_id,
2430 vdu_count_index,
2431 vdu_name,
2432 vnf_config_primitive,
2433 primitive_params,
2434 operationState,
2435 detailed_status,
2436 operationType,
2437 RO_nsr_id,
2438 RO_scaling_info)
2439 return self.SUBOPERATION_STATUS_NEW
2440 else:
2441 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2442 # or op_index (operationState != 'COMPLETED')
2443 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2444
2445 # Function to return execution_environment id
2446
2447 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2448 for vca in vca_deployed_list:
2449 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2450 return vca["ee_id"]
2451
2452 # Helper methods for terminate()
2453
2454 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2455 """ Create a primitive with params from VNFD
2456 Called from terminate() before deleting instance
2457 Calls action() to execute the primitive """
2458 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2459 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2460 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2461 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2462 db_vnfds = {}
2463 # Loop over VNFRs
2464 for vnfr in db_vnfrs_list:
2465 vnfd_id = vnfr["vnfd-id"]
2466 vnf_index = vnfr["member-vnf-index-ref"]
2467 if vnfd_id not in db_vnfds:
2468 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2469 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2470 db_vnfds[vnfd_id] = vnfd
2471 vnfd = db_vnfds[vnfd_id]
2472 if not self._has_terminate_config_primitive(vnfd):
2473 continue
2474 # Get the primitive's sorted sequence list
2475 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2476 for seq in seq_list:
2477 # For each sequence in list, get primitive and call _ns_execute_primitive()
2478 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2479 vnf_index, seq.get("name"))
2480 self.logger.debug(logging_text + step)
2481 # Create the primitive for each sequence, i.e. "primitive": "touch"
2482 primitive = seq.get('name')
2483 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2484 # The following 3 parameters are currently set to None for 'terminate':
2485 # vdu_id, vdu_count_index, vdu_name
2486 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2487 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2488 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2489 # Add sub-operation
2490 self._add_suboperation(db_nslcmop,
2491 nslcmop_id,
2492 vnf_index,
2493 vdu_id,
2494 vdu_count_index,
2495 vdu_name,
2496 primitive,
2497 mapped_primitive_params)
2498 # Sub-operations: Call _ns_execute_primitive() instead of action()
2499 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2500 # nsr_deployed = db_nsr["_admin"]["deployed"]
2501
2502 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2503 # nsr_id, nslcmop_terminate_action_id)
2504 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2505 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2506 # if result not in result_ok:
2507 # raise LcmException(
2508 # "terminate_primitive_action for vnf_member_index={}",
2509 # " primitive={} fails with error {}".format(
2510 # vnf_index, seq.get("name"), result_detail))
2511
2512 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2513 try:
2514 await self.n2vc.exec_primitive(
2515 ee_id=ee_id,
2516 primitive_name=primitive,
2517 params_dict=mapped_primitive_params
2518 )
2519 except Exception as e:
2520 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2521 raise LcmException(
2522 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2523 .format(vnf_index, seq.get("name"), e),
2524 )
2525
2526 async def _delete_N2VC(self, nsr_id: str):
2527 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2528 namespace = "." + nsr_id
2529 await self.n2vc.delete_namespace(namespace=namespace)
2530 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2531
2532 async def terminate(self, nsr_id, nslcmop_id):
2533
2534 # Try to lock HA task here
2535 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2536 if not task_is_locked_by_me:
2537 return
2538
2539 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2540 self.logger.debug(logging_text + "Enter")
2541 db_nsr = None
2542 db_nslcmop = None
2543 exc = None
2544 failed_detail = [] # annotates all failed error messages
2545 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2546 "_admin.current-operation": nslcmop_id,
2547 "_admin.operation-type": "terminate"}
2548 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2549 db_nslcmop_update = {}
2550 nslcmop_operation_state = None
2551 autoremove = False # autoremove after terminated
2552 pending_tasks = []
2553 try:
2554 # wait for any previous tasks in process
2555 step = "Waiting for previous operations to terminate"
2556 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2557
2558 self._write_ns_status(
2559 nsr_id=nsr_id,
2560 ns_state="TERMINATING",
2561 current_operation="TERMINATING",
2562 current_operation_id=nslcmop_id
2563 )
2564 self._write_op_status(
2565 op_id=nslcmop_id,
2566 queuePosition=0
2567 )
2568
2569 step = "Getting nslcmop={} from db".format(nslcmop_id)
2570 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2571 step = "Getting nsr={} from db".format(nsr_id)
2572 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2573 # nsd = db_nsr["nsd"]
2574 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2575 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2576 return
2577 # #TODO check if VIM is creating and wait
2578 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2579 # Call internal terminate action
2580 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2581
2582 pending_tasks = []
2583
2584 db_nsr_update["operational-status"] = "terminating"
2585 db_nsr_update["config-status"] = "terminating"
2586
2587 # remove NS
2588 try:
2589 step = "delete execution environment"
2590 self.logger.debug(logging_text + step)
2591
2592 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2593 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2594
2595 pending_tasks.append(task_delete_ee)
2596 except Exception as e:
2597 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2598 self.logger.error(msg)
2599 failed_detail.append(msg)
2600
2601 try:
2602 # Delete from k8scluster
2603 step = "delete kdus"
2604 self.logger.debug(logging_text + step)
2605 # print(nsr_deployed)
2606 if nsr_deployed:
2607 for kdu in nsr_deployed.get("K8s", ()):
2608 kdu_instance = kdu.get("kdu-instance")
2609 if not kdu_instance:
2610 continue
2611 if kdu.get("k8scluster-type") == "chart":
2612 task_delete_kdu_instance = asyncio.ensure_future(
2613 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2614 kdu_instance=kdu_instance))
2615 elif kdu.get("k8scluster-type") == "juju":
2616 task_delete_kdu_instance = asyncio.ensure_future(
2617 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2618 kdu_instance=kdu_instance))
2619 else:
2620 self.error(logging_text + "Unknown k8s deployment type {}".
2621 format(kdu.get("k8scluster-type")))
2622 continue
2623 pending_tasks.append(task_delete_kdu_instance)
2624 except LcmException as e:
2625 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2626 self.logger.error(msg)
2627 failed_detail.append(msg)
2628
2629 # remove from RO
2630 RO_fail = False
2631
2632 # Delete ns
2633 RO_nsr_id = RO_delete_action = None
2634 if nsr_deployed and nsr_deployed.get("RO"):
2635 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2636 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2637 try:
2638 if RO_nsr_id:
2639 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2640 "Deleting ns from VIM"
2641 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2642 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2643 self.logger.debug(logging_text + step)
2644 desc = await self.RO.delete("ns", RO_nsr_id)
2645 RO_delete_action = desc["action_id"]
2646 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2647 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2648 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2649 if RO_delete_action:
2650 # wait until NS is deleted from VIM
2651 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2652 format(RO_nsr_id, RO_delete_action)
2653 detailed_status_old = None
2654 self.logger.debug(logging_text + step)
2655
2656 delete_timeout = 20 * 60 # 20 minutes
2657 while delete_timeout > 0:
2658 desc = await self.RO.show(
2659 "ns",
2660 item_id_name=RO_nsr_id,
2661 extra_item="action",
2662 extra_item_id=RO_delete_action)
2663
2664 # deploymentStatus
2665 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2666
2667 ns_status, ns_status_info = self.RO.check_action_status(desc)
2668 if ns_status == "ERROR":
2669 raise ROclient.ROClientException(ns_status_info)
2670 elif ns_status == "BUILD":
2671 detailed_status = step + "; {}".format(ns_status_info)
2672 elif ns_status == "ACTIVE":
2673 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2674 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2675 break
2676 else:
2677 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2678 if detailed_status != detailed_status_old:
2679 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2680 db_nsr_update["detailed-status"] = detailed_status
2681 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2682 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2683 await asyncio.sleep(5, loop=self.loop)
2684 delete_timeout -= 5
2685 else: # delete_timeout <= 0:
2686 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2687
2688 except ROclient.ROClientException as e:
2689 if e.http_code == 404: # not found
2690 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2691 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2692 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2693 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2694 elif e.http_code == 409: # conflict
2695 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2696 self.logger.debug(logging_text + failed_detail[-1])
2697 RO_fail = True
2698 else:
2699 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2700 self.logger.error(logging_text + failed_detail[-1])
2701 RO_fail = True
2702
2703 # Delete nsd
2704 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2705 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2706 try:
2707 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2708 "Deleting nsd from RO"
2709 await self.RO.delete("nsd", RO_nsd_id)
2710 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2711 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2712 except ROclient.ROClientException as e:
2713 if e.http_code == 404: # not found
2714 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2715 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2716 elif e.http_code == 409: # conflict
2717 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2718 self.logger.debug(logging_text + failed_detail[-1])
2719 RO_fail = True
2720 else:
2721 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2722 self.logger.error(logging_text + failed_detail[-1])
2723 RO_fail = True
2724
2725 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2726 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2727 if not vnf_deployed or not vnf_deployed["id"]:
2728 continue
2729 try:
2730 RO_vnfd_id = vnf_deployed["id"]
2731 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2732 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2733 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2734 await self.RO.delete("vnfd", RO_vnfd_id)
2735 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2736 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2737 except ROclient.ROClientException as e:
2738 if e.http_code == 404: # not found
2739 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2740 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2741 elif e.http_code == 409: # conflict
2742 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2743 self.logger.debug(logging_text + failed_detail[-1])
2744 else:
2745 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2746 self.logger.error(logging_text + failed_detail[-1])
2747
2748 if failed_detail:
2749 terminate_ok = False
2750 self.logger.error(logging_text + " ;".join(failed_detail))
2751 db_nsr_update["operational-status"] = "failed"
2752 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2753 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2754 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2755 db_nslcmop_update["statusEnteredTime"] = time()
2756 else:
2757 terminate_ok = True
2758 db_nsr_update["operational-status"] = "terminated"
2759 db_nsr_update["detailed-status"] = "Done"
2760 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2761 db_nslcmop_update["detailed-status"] = "Done"
2762 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2763 db_nslcmop_update["statusEnteredTime"] = time()
2764 if db_nslcmop["operationParams"].get("autoremove"):
2765 autoremove = True
2766
2767 except (ROclient.ROClientException, DbException, LcmException) as e:
2768 self.logger.error(logging_text + "Exit Exception {}".format(e))
2769 exc = e
2770 except asyncio.CancelledError:
2771 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2772 exc = "Operation was cancelled"
2773 except Exception as e:
2774 exc = traceback.format_exc()
2775 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2776 finally:
2777 if exc and db_nslcmop:
2778 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2779 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2780 db_nslcmop_update["statusEnteredTime"] = time()
2781 try:
2782 if db_nslcmop and db_nslcmop_update:
2783 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2784 if db_nsr:
2785 db_nsr_update["_admin.nslcmop"] = None
2786 db_nsr_update["_admin.current-operation"] = None
2787 db_nsr_update["_admin.operation-type"] = None
2788 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2789
2790 if terminate_ok:
2791 ns_state = "IDLE"
2792 error_description = None
2793 error_detail = None
2794 else:
2795 ns_state = "BROKEN"
2796 error_detail = "; ".join(failed_detail)
2797 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2798 .format(nslcmop_id, step, error_detail)
2799
2800 self._write_ns_status(
2801 nsr_id=nsr_id,
2802 ns_state=ns_state,
2803 current_operation="IDLE",
2804 current_operation_id=None,
2805 error_description=error_description
2806 )
2807
2808 self._write_op_status(
2809 op_id=nslcmop_id,
2810 error_message=error_description
2811 )
2812
2813 except DbException as e:
2814 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2815 if nslcmop_operation_state:
2816 try:
2817 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2818 "operationState": nslcmop_operation_state,
2819 "autoremove": autoremove},
2820 loop=self.loop)
2821 except Exception as e:
2822 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2823
2824 # wait for pending tasks
2825 done = None
2826 pending = None
2827 if pending_tasks:
2828 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2829 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2830 if not pending:
2831 self.logger.debug(logging_text + 'All tasks finished...')
2832 else:
2833 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2834
2835 self.logger.debug(logging_text + "Exit")
2836 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2837
2838 @staticmethod
2839 def _map_primitive_params(primitive_desc, params, instantiation_params):
2840 """
2841 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2842 The default-value is used. If it is between < > it look for a value at instantiation_params
2843 :param primitive_desc: portion of VNFD/NSD that describes primitive
2844 :param params: Params provided by user
2845 :param instantiation_params: Instantiation params provided by user
2846 :return: a dictionary with the calculated params
2847 """
2848 calculated_params = {}
2849 for parameter in primitive_desc.get("parameter", ()):
2850 param_name = parameter["name"]
2851 if param_name in params:
2852 calculated_params[param_name] = params[param_name]
2853 elif "default-value" in parameter or "value" in parameter:
2854 if "value" in parameter:
2855 calculated_params[param_name] = parameter["value"]
2856 else:
2857 calculated_params[param_name] = parameter["default-value"]
2858 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2859 and calculated_params[param_name].endswith(">"):
2860 if calculated_params[param_name][1:-1] in instantiation_params:
2861 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2862 else:
2863 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2864 format(calculated_params[param_name], primitive_desc["name"]))
2865 else:
2866 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2867 format(param_name, primitive_desc["name"]))
2868
2869 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2870 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2871 width=256)
2872 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2873 calculated_params[param_name] = calculated_params[param_name][7:]
2874
2875 # add always ns_config_info if primitive name is config
2876 if primitive_desc["name"] == "config":
2877 if "ns_config_info" in instantiation_params:
2878 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2879 return calculated_params
2880
2881 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2882 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2883
2884 # find vca_deployed record for this action
2885 try:
2886 for vca_deployed in db_deployed["VCA"]:
2887 if not vca_deployed:
2888 continue
2889 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2890 continue
2891 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2892 continue
2893 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2894 continue
2895 break
2896 else:
2897 # vca_deployed not found
2898 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2899 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2900
2901 # get ee_id
2902 ee_id = vca_deployed.get("ee_id")
2903 if not ee_id:
2904 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2905 "execution environment"
2906 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2907
2908 if primitive == "config":
2909 primitive_params = {"params": primitive_params}
2910
2911 while retries >= 0:
2912 try:
2913 output = await self.n2vc.exec_primitive(
2914 ee_id=ee_id,
2915 primitive_name=primitive,
2916 params_dict=primitive_params
2917 )
2918 # execution was OK
2919 break
2920 except Exception as e:
2921 retries -= 1
2922 if retries >= 0:
2923 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2924 # wait and retry
2925 await asyncio.sleep(retries_interval, loop=self.loop)
2926 else:
2927 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2928
2929 return output, 'OK'
2930
2931 except Exception as e:
2932 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2933
2934 async def action(self, nsr_id, nslcmop_id):
2935
2936 # Try to lock HA task here
2937 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2938 if not task_is_locked_by_me:
2939 return
2940
2941 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2942 self.logger.debug(logging_text + "Enter")
2943 # get all needed from database
2944 db_nsr = None
2945 db_nslcmop = None
2946 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2947 "_admin.current-operation": nslcmop_id,
2948 "_admin.operation-type": "action"}
2949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2950 db_nslcmop_update = {}
2951 nslcmop_operation_state = None
2952 nslcmop_operation_state_detail = None
2953 exc = None
2954 try:
2955 # wait for any previous tasks in process
2956 step = "Waiting for previous operations to terminate"
2957 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2958
2959 self._write_ns_status(
2960 nsr_id=nsr_id,
2961 ns_state=None,
2962 current_operation="RUNNING ACTION",
2963 current_operation_id=nslcmop_id
2964 )
2965
2966 step = "Getting information from database"
2967 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2968 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2969
2970 nsr_deployed = db_nsr["_admin"].get("deployed")
2971 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2972 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2973 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2974 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2975 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2976
2977 if vnf_index:
2978 step = "Getting vnfr from database"
2979 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2980 step = "Getting vnfd from database"
2981 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2982 else:
2983 if db_nsr.get("nsd"):
2984 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2985 else:
2986 step = "Getting nsd from database"
2987 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2988
2989 # for backward compatibility
2990 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2991 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2992 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2993 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2994
2995 primitive = db_nslcmop["operationParams"]["primitive"]
2996 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2997
2998 # look for primitive
2999 config_primitive_desc = None
3000 if vdu_id:
3001 for vdu in get_iterable(db_vnfd, "vdu"):
3002 if vdu_id == vdu["id"]:
3003 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
3004 if config_primitive["name"] == primitive:
3005 config_primitive_desc = config_primitive
3006 break
3007 elif kdu_name:
3008 self.logger.debug(logging_text + "Checking actions in KDUs")
3009 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3010 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
3011 if primitive_params:
3012 desc_params.update(primitive_params)
3013 # TODO Check if we will need something at vnf level
3014 index = 0
3015 for kdu in get_iterable(nsr_deployed, "K8s"):
3016 if kdu_name == kdu["kdu-name"]:
3017 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3018 "path": "_admin.deployed.K8s.{}".format(index)}
3019 if primitive == "upgrade":
3020 if desc_params.get("kdu_model"):
3021 kdu_model = desc_params.get("kdu_model")
3022 del desc_params["kdu_model"]
3023 else:
3024 kdu_model = kdu.get("kdu-model")
3025 parts = kdu_model.split(sep=":")
3026 if len(parts) == 2:
3027 kdu_model = parts[0]
3028
3029 if kdu.get("k8scluster-type") == "chart":
3030 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3031 kdu_instance=kdu.get("kdu-instance"),
3032 atomic=True, kdu_model=kdu_model,
3033 params=desc_params, db_dict=db_dict,
3034 timeout=300)
3035 elif kdu.get("k8scluster-type") == "juju":
3036 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3037 kdu_instance=kdu.get("kdu-instance"),
3038 atomic=True, kdu_model=kdu_model,
3039 params=desc_params, db_dict=db_dict,
3040 timeout=300)
3041
3042 else:
3043 msg = "k8scluster-type not defined"
3044 raise LcmException(msg)
3045
3046 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3047 break
3048 elif primitive == "rollback":
3049 if kdu.get("k8scluster-type") == "chart":
3050 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3051 kdu_instance=kdu.get("kdu-instance"),
3052 db_dict=db_dict)
3053 elif kdu.get("k8scluster-type") == "juju":
3054 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3055 kdu_instance=kdu.get("kdu-instance"),
3056 db_dict=db_dict)
3057 else:
3058 msg = "k8scluster-type not defined"
3059 raise LcmException(msg)
3060 break
3061 elif primitive == "status":
3062 if kdu.get("k8scluster-type") == "chart":
3063 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3064 kdu_instance=kdu.get("kdu-instance"))
3065 elif kdu.get("k8scluster-type") == "juju":
3066 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3067 kdu_instance=kdu.get("kdu-instance"))
3068 else:
3069 msg = "k8scluster-type not defined"
3070 raise LcmException(msg)
3071 break
3072 index += 1
3073
3074 else:
3075 raise LcmException("KDU '{}' not found".format(kdu_name))
3076 if output:
3077 db_nslcmop_update["detailed-status"] = output
3078 db_nslcmop_update["operationState"] = 'COMPLETED'
3079 db_nslcmop_update["statusEnteredTime"] = time()
3080 else:
3081 db_nslcmop_update["detailed-status"] = ''
3082 db_nslcmop_update["operationState"] = 'FAILED'
3083 db_nslcmop_update["statusEnteredTime"] = time()
3084 return
3085 elif vnf_index:
3086 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3087 if config_primitive["name"] == primitive:
3088 config_primitive_desc = config_primitive
3089 break
3090 else:
3091 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3092 if config_primitive["name"] == primitive:
3093 config_primitive_desc = config_primitive
3094 break
3095
3096 if not config_primitive_desc:
3097 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3098 format(primitive))
3099
3100 desc_params = {}
3101 if vnf_index:
3102 if db_vnfr.get("additionalParamsForVnf"):
3103 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3104 if vdu_id:
3105 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3106 if vdur.get("additionalParams"):
3107 desc_params = self._format_additional_params(vdur["additionalParams"])
3108 else:
3109 if db_nsr.get("additionalParamsForNs"):
3110 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3111
3112 # TODO check if ns is in a proper status
3113 output, detail = await self._ns_execute_primitive(
3114 db_deployed=nsr_deployed,
3115 member_vnf_index=vnf_index,
3116 vdu_id=vdu_id,
3117 vdu_name=vdu_name,
3118 vdu_count_index=vdu_count_index,
3119 primitive=primitive,
3120 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3121
3122 detailed_status = output
3123 if detail == 'OK':
3124 result = 'COMPLETED'
3125 else:
3126 result = 'FAILED'
3127
3128 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3129 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3130 db_nslcmop_update["statusEnteredTime"] = time()
3131 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3132 return # database update is called inside finally
3133
3134 except (DbException, LcmException) as e:
3135 self.logger.error(logging_text + "Exit Exception {}".format(e))
3136 exc = e
3137 except asyncio.CancelledError:
3138 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3139 exc = "Operation was cancelled"
3140 except Exception as e:
3141 exc = traceback.format_exc()
3142 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3143 finally:
3144 if exc and db_nslcmop:
3145 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3146 "FAILED {}: {}".format(step, exc)
3147 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3148 db_nslcmop_update["statusEnteredTime"] = time()
3149 try:
3150 if db_nslcmop_update:
3151 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3152 if db_nsr:
3153 db_nsr_update["_admin.nslcmop"] = None
3154 db_nsr_update["_admin.operation-type"] = None
3155 db_nsr_update["_admin.nslcmop"] = None
3156 db_nsr_update["_admin.current-operation"] = None
3157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3158 self._write_ns_status(
3159 nsr_id=nsr_id,
3160 ns_state=None,
3161 current_operation="IDLE",
3162 current_operation_id=None
3163 )
3164 if exc:
3165 self._write_op_status(
3166 op_id=nslcmop_id,
3167 error_message=nslcmop_operation_state_detail
3168 )
3169 except DbException as e:
3170 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3171 self.logger.debug(logging_text + "Exit")
3172 if nslcmop_operation_state:
3173 try:
3174 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3175 "operationState": nslcmop_operation_state},
3176 loop=self.loop)
3177 except Exception as e:
3178 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3179 self.logger.debug(logging_text + "Exit")
3180 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3181 return nslcmop_operation_state, nslcmop_operation_state_detail
3182
3183 async def scale(self, nsr_id, nslcmop_id):
3184
3185 # Try to lock HA task here
3186 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3187 if not task_is_locked_by_me:
3188 return
3189
3190 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3191 self.logger.debug(logging_text + "Enter")
3192 # get all needed from database
3193 db_nsr = None
3194 db_nslcmop = None
3195 db_nslcmop_update = {}
3196 nslcmop_operation_state = None
3197 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3198 "_admin.current-operation": nslcmop_id,
3199 "_admin.operation-type": "scale"}
3200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3201 exc = None
3202 # in case of error, indicates what part of scale was failed to put nsr at error status
3203 scale_process = None
3204 old_operational_status = ""
3205 old_config_status = ""
3206 vnfr_scaled = False
3207 try:
3208 # wait for any previous tasks in process
3209 step = "Waiting for previous operations to terminate"
3210 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3211
3212 self._write_ns_status(
3213 nsr_id=nsr_id,
3214 ns_state=None,
3215 current_operation="SCALING",
3216 current_operation_id=nslcmop_id
3217 )
3218
3219 step = "Getting nslcmop from database"
3220 self.logger.debug(step + " after having waited for previous tasks to be completed")
3221 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3222 step = "Getting nsr from database"
3223 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3224
3225 old_operational_status = db_nsr["operational-status"]
3226 old_config_status = db_nsr["config-status"]
3227 step = "Parsing scaling parameters"
3228 # self.logger.debug(step)
3229 db_nsr_update["operational-status"] = "scaling"
3230 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3231 nsr_deployed = db_nsr["_admin"].get("deployed")
3232
3233 #######
3234 nsr_deployed = db_nsr["_admin"].get("deployed")
3235 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3236 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3237 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3238 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3239 #######
3240
3241 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3242 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3243 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3244 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3245 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3246
3247 # for backward compatibility
3248 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3249 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3250 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3251 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3252
3253 step = "Getting vnfr from database"
3254 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3255 step = "Getting vnfd from database"
3256 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3257
3258 step = "Getting scaling-group-descriptor"
3259 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3260 if scaling_descriptor["name"] == scaling_group:
3261 break
3262 else:
3263 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3264 "at vnfd:scaling-group-descriptor".format(scaling_group))
3265
3266 # cooldown_time = 0
3267 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3268 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3269 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3270 # break
3271
3272 # TODO check if ns is in a proper status
3273 step = "Sending scale order to VIM"
3274 nb_scale_op = 0
3275 if not db_nsr["_admin"].get("scaling-group"):
3276 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3277 admin_scale_index = 0
3278 else:
3279 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3280 if admin_scale_info["name"] == scaling_group:
3281 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3282 break
3283 else: # not found, set index one plus last element and add new entry with the name
3284 admin_scale_index += 1
3285 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3286 RO_scaling_info = []
3287 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3288 if scaling_type == "SCALE_OUT":
3289 # count if max-instance-count is reached
3290 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3291 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3292 if nb_scale_op >= max_instance_count:
3293 raise LcmException("reached the limit of {} (max-instance-count) "
3294 "scaling-out operations for the "
3295 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3296
3297 nb_scale_op += 1
3298 vdu_scaling_info["scaling_direction"] = "OUT"
3299 vdu_scaling_info["vdu-create"] = {}
3300 for vdu_scale_info in scaling_descriptor["vdu"]:
3301 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3302 "type": "create", "count": vdu_scale_info.get("count", 1)})
3303 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3304
3305 elif scaling_type == "SCALE_IN":
3306 # count if min-instance-count is reached
3307 min_instance_count = 0
3308 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3309 min_instance_count = int(scaling_descriptor["min-instance-count"])
3310 if nb_scale_op <= min_instance_count:
3311 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3312 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3313 nb_scale_op -= 1
3314 vdu_scaling_info["scaling_direction"] = "IN"
3315 vdu_scaling_info["vdu-delete"] = {}
3316 for vdu_scale_info in scaling_descriptor["vdu"]:
3317 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3318 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3319 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3320
3321 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3322 vdu_create = vdu_scaling_info.get("vdu-create")
3323 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3324 if vdu_scaling_info["scaling_direction"] == "IN":
3325 for vdur in reversed(db_vnfr["vdur"]):
3326 if vdu_delete.get(vdur["vdu-id-ref"]):
3327 vdu_delete[vdur["vdu-id-ref"]] -= 1
3328 vdu_scaling_info["vdu"].append({
3329 "name": vdur["name"],
3330 "vdu_id": vdur["vdu-id-ref"],
3331 "interface": []
3332 })
3333 for interface in vdur["interfaces"]:
3334 vdu_scaling_info["vdu"][-1]["interface"].append({
3335 "name": interface["name"],
3336 "ip_address": interface["ip-address"],
3337 "mac_address": interface.get("mac-address"),
3338 })
3339 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3340
3341 # PRE-SCALE BEGIN
3342 step = "Executing pre-scale vnf-config-primitive"
3343 if scaling_descriptor.get("scaling-config-action"):
3344 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3345 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3346 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3347 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3348 step = db_nslcmop_update["detailed-status"] = \
3349 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3350
3351 # look for primitive
3352 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3353 if config_primitive["name"] == vnf_config_primitive:
3354 break
3355 else:
3356 raise LcmException(
3357 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3358 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3359 "primitive".format(scaling_group, config_primitive))
3360
3361 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3362 if db_vnfr.get("additionalParamsForVnf"):
3363 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3364
3365 scale_process = "VCA"
3366 db_nsr_update["config-status"] = "configuring pre-scaling"
3367 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3368
3369 # Pre-scale reintent check: Check if this sub-operation has been executed before
3370 op_index = self._check_or_add_scale_suboperation(
3371 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3372 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3373 # Skip sub-operation
3374 result = 'COMPLETED'
3375 result_detail = 'Done'
3376 self.logger.debug(logging_text +
3377 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3378 vnf_config_primitive, result, result_detail))
3379 else:
3380 if (op_index == self.SUBOPERATION_STATUS_NEW):
3381 # New sub-operation: Get index of this sub-operation
3382 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3383 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3384 format(vnf_config_primitive))
3385 else:
3386 # Reintent: Get registered params for this existing sub-operation
3387 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3388 vnf_index = op.get('member_vnf_index')
3389 vnf_config_primitive = op.get('primitive')
3390 primitive_params = op.get('primitive_params')
3391 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3392 format(vnf_config_primitive))
3393 # Execute the primitive, either with new (first-time) or registered (reintent) args
3394 result, result_detail = await self._ns_execute_primitive(
3395 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3396 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3397 vnf_config_primitive, result, result_detail))
3398 # Update operationState = COMPLETED | FAILED
3399 self._update_suboperation_status(
3400 db_nslcmop, op_index, result, result_detail)
3401
3402 if result == "FAILED":
3403 raise LcmException(result_detail)
3404 db_nsr_update["config-status"] = old_config_status
3405 scale_process = None
3406 # PRE-SCALE END
3407
3408 # SCALE RO - BEGIN
3409 # Should this block be skipped if 'RO_nsr_id' == None ?
3410 # if (RO_nsr_id and RO_scaling_info):
3411 if RO_scaling_info:
3412 scale_process = "RO"
3413 # Scale RO reintent check: Check if this sub-operation has been executed before
3414 op_index = self._check_or_add_scale_suboperation(
3415 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3416 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3417 # Skip sub-operation
3418 result = 'COMPLETED'
3419 result_detail = 'Done'
3420 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3421 result, result_detail))
3422 else:
3423 if (op_index == self.SUBOPERATION_STATUS_NEW):
3424 # New sub-operation: Get index of this sub-operation
3425 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3426 self.logger.debug(logging_text + "New sub-operation RO")
3427 else:
3428 # Reintent: Get registered params for this existing sub-operation
3429 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3430 RO_nsr_id = op.get('RO_nsr_id')
3431 RO_scaling_info = op.get('RO_scaling_info')
3432 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3433 vnf_config_primitive))
3434
3435 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3436 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3437 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3438 # wait until ready
3439 RO_nslcmop_id = RO_desc["instance_action_id"]
3440 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3441
3442 RO_task_done = False
3443 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3444 detailed_status_old = None
3445 self.logger.debug(logging_text + step)
3446
3447 deployment_timeout = 1 * 3600 # One hour
3448 while deployment_timeout > 0:
3449 if not RO_task_done:
3450 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3451 extra_item_id=RO_nslcmop_id)
3452
3453 # deploymentStatus
3454 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3455
3456 ns_status, ns_status_info = self.RO.check_action_status(desc)
3457 if ns_status == "ERROR":
3458 raise ROclient.ROClientException(ns_status_info)
3459 elif ns_status == "BUILD":
3460 detailed_status = step + "; {}".format(ns_status_info)
3461 elif ns_status == "ACTIVE":
3462 RO_task_done = True
3463 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3464 self.logger.debug(logging_text + step)
3465 else:
3466 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3467 else:
3468
3469 if ns_status == "ERROR":
3470 raise ROclient.ROClientException(ns_status_info)
3471 elif ns_status == "BUILD":
3472 detailed_status = step + "; {}".format(ns_status_info)
3473 elif ns_status == "ACTIVE":
3474 step = detailed_status = \
3475 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3476 if not vnfr_scaled:
3477 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3478 vnfr_scaled = True
3479 try:
3480 desc = await self.RO.show("ns", RO_nsr_id)
3481
3482 # deploymentStatus
3483 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3484
3485 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3486 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3487 break
3488 except LcmExceptionNoMgmtIP:
3489 pass
3490 else:
3491 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3492 if detailed_status != detailed_status_old:
3493 self._update_suboperation_status(
3494 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3495 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3496 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3497
3498 await asyncio.sleep(5, loop=self.loop)
3499 deployment_timeout -= 5
3500 if deployment_timeout <= 0:
3501 self._update_suboperation_status(
3502 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3503 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3504
3505 # update VDU_SCALING_INFO with the obtained ip_addresses
3506 if vdu_scaling_info["scaling_direction"] == "OUT":
3507 for vdur in reversed(db_vnfr["vdur"]):
3508 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3509 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3510 vdu_scaling_info["vdu"].append({
3511 "name": vdur["name"],
3512 "vdu_id": vdur["vdu-id-ref"],
3513 "interface": []
3514 })
3515 for interface in vdur["interfaces"]:
3516 vdu_scaling_info["vdu"][-1]["interface"].append({
3517 "name": interface["name"],
3518 "ip_address": interface["ip-address"],
3519 "mac_address": interface.get("mac-address"),
3520 })
3521 del vdu_scaling_info["vdu-create"]
3522
3523 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3524 # SCALE RO - END
3525
3526 scale_process = None
3527 if db_nsr_update:
3528 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3529
3530 # POST-SCALE BEGIN
3531 # execute primitive service POST-SCALING
3532 step = "Executing post-scale vnf-config-primitive"
3533 if scaling_descriptor.get("scaling-config-action"):
3534 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3535 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3536 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3537 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3538 step = db_nslcmop_update["detailed-status"] = \
3539 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3540
3541 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3542 if db_vnfr.get("additionalParamsForVnf"):
3543 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3544
3545 # look for primitive
3546 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3547 if config_primitive["name"] == vnf_config_primitive:
3548 break
3549 else:
3550 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3551 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3552 "match any vnf-configuration:config-primitive".format(scaling_group,
3553 config_primitive))
3554 scale_process = "VCA"
3555 db_nsr_update["config-status"] = "configuring post-scaling"
3556 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3557
3558 # Post-scale reintent check: Check if this sub-operation has been executed before
3559 op_index = self._check_or_add_scale_suboperation(
3560 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3561 if op_index == self.SUBOPERATION_STATUS_SKIP:
3562 # Skip sub-operation
3563 result = 'COMPLETED'
3564 result_detail = 'Done'
3565 self.logger.debug(logging_text +
3566 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3567 format(vnf_config_primitive, result, result_detail))
3568 else:
3569 if op_index == self.SUBOPERATION_STATUS_NEW:
3570 # New sub-operation: Get index of this sub-operation
3571 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3572 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3573 format(vnf_config_primitive))
3574 else:
3575 # Reintent: Get registered params for this existing sub-operation
3576 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3577 vnf_index = op.get('member_vnf_index')
3578 vnf_config_primitive = op.get('primitive')
3579 primitive_params = op.get('primitive_params')
3580 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3581 format(vnf_config_primitive))
3582 # Execute the primitive, either with new (first-time) or registered (reintent) args
3583 result, result_detail = await self._ns_execute_primitive(
3584 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3585 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3586 vnf_config_primitive, result, result_detail))
3587 # Update operationState = COMPLETED | FAILED
3588 self._update_suboperation_status(
3589 db_nslcmop, op_index, result, result_detail)
3590
3591 if result == "FAILED":
3592 raise LcmException(result_detail)
3593 db_nsr_update["config-status"] = old_config_status
3594 scale_process = None
3595 # POST-SCALE END
3596
3597 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3598 db_nslcmop_update["statusEnteredTime"] = time()
3599 db_nslcmop_update["detailed-status"] = "done"
3600 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3601 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3602 else old_operational_status
3603 db_nsr_update["config-status"] = old_config_status
3604 return
3605 except (ROclient.ROClientException, DbException, LcmException) as e:
3606 self.logger.error(logging_text + "Exit Exception {}".format(e))
3607 exc = e
3608 except asyncio.CancelledError:
3609 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3610 exc = "Operation was cancelled"
3611 except Exception as e:
3612 exc = traceback.format_exc()
3613 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3614 finally:
3615 self._write_ns_status(
3616 nsr_id=nsr_id,
3617 ns_state=None,
3618 current_operation="IDLE",
3619 current_operation_id=None
3620 )
3621 if exc:
3622 if db_nslcmop:
3623 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3624 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3625 db_nslcmop_update["statusEnteredTime"] = time()
3626 if db_nsr:
3627 db_nsr_update["operational-status"] = old_operational_status
3628 db_nsr_update["config-status"] = old_config_status
3629 db_nsr_update["detailed-status"] = ""
3630 db_nsr_update["_admin.nslcmop"] = None
3631 if scale_process:
3632 if "VCA" in scale_process:
3633 db_nsr_update["config-status"] = "failed"
3634 if "RO" in scale_process:
3635 db_nsr_update["operational-status"] = "failed"
3636 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3637 exc)
3638 try:
3639 if db_nslcmop and db_nslcmop_update:
3640 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3641 if db_nsr:
3642 db_nsr_update["_admin.current-operation"] = None
3643 db_nsr_update["_admin.operation-type"] = None
3644 db_nsr_update["_admin.nslcmop"] = None
3645 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3646
3647 self._write_ns_status(
3648 nsr_id=nsr_id,
3649 ns_state=None,
3650 current_operation="IDLE",
3651 current_operation_id=None
3652 )
3653
3654 except DbException as e:
3655 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3656 if nslcmop_operation_state:
3657 try:
3658 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3659 "operationState": nslcmop_operation_state},
3660 loop=self.loop)
3661 # if cooldown_time:
3662 # await asyncio.sleep(cooldown_time, loop=self.loop)
3663 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3664 except Exception as e:
3665 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3666 self.logger.debug(logging_text + "Exit")
3667 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")