Merge branch 'v7.0' tag v7.0.1
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 class NsLcm(LcmBase):
47 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
48 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
49 timeout_charm_delete = 10 * 60
50 timeout_primitive = 10 * 60 # timeout for primitive execution
51
52 SUBOPERATION_STATUS_NOT_FOUND = -1
53 SUBOPERATION_STATUS_NEW = -2
54 SUBOPERATION_STATUS_SKIP = -3
55
56 def __init__(self, db, msg, fs, lcm_tasks, config, loop):
57 """
58 Init, Connect to database, filesystem storage, and messaging
59 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
60 :return: None
61 """
62 super().__init__(
63 db=db,
64 msg=msg,
65 fs=fs,
66 logger=logging.getLogger('lcm.ns')
67 )
68
69 self.loop = loop
70 self.lcm_tasks = lcm_tasks
71 self.timeout = config["timeout"]
72 self.ro_config = config["ro_config"]
73 self.vca_config = config["VCA"].copy()
74
75 # create N2VC connector
76 self.n2vc = N2VCJujuConnector(
77 db=self.db,
78 fs=self.fs,
79 log=self.logger,
80 loop=self.loop,
81 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
82 username=self.vca_config.get('user', None),
83 vca_config=self.vca_config,
84 on_update_db=self._on_update_n2vc_db
85 )
86
87 self.k8sclusterhelm = K8sHelmConnector(
88 kubectl_command=self.vca_config.get("kubectlpath"),
89 helm_command=self.vca_config.get("helmpath"),
90 fs=self.fs,
91 log=self.logger,
92 db=self.db,
93 on_update_db=None,
94 )
95
96 self.k8sclusterjuju = K8sJujuConnector(
97 kubectl_command=self.vca_config.get("kubectlpath"),
98 juju_command=self.vca_config.get("jujupath"),
99 fs=self.fs,
100 log=self.logger,
101 db=self.db,
102 on_update_db=None,
103 )
104
105 # create RO client
106 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
107
108 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
109
110 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
111
112 try:
113 # TODO filter RO descriptor fields...
114
115 # write to database
116 db_dict = dict()
117 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
118 db_dict['deploymentStatus'] = ro_descriptor
119 self.update_db_2("nsrs", nsrs_id, db_dict)
120
121 except Exception as e:
122 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
123
124 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
125
126 # remove last dot from path (if exists)
127 if path.endswith('.'):
128 path = path[:-1]
129
130 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
131 # .format(table, filter, path, updated_data))
132
133 try:
134
135 nsr_id = filter.get('_id')
136
137 # read ns record from database
138 nsr = self.db.get_one(table='nsrs', q_filter=filter)
139 current_ns_status = nsr.get('nsState')
140
141 # get vca status for NS
142 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
143
144 # vcaStatus
145 db_dict = dict()
146 db_dict['vcaStatus'] = status_dict
147
148 # update configurationStatus for this VCA
149 try:
150 vca_index = int(path[path.rfind(".")+1:])
151
152 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
153 vca_status = vca_list[vca_index].get('status')
154
155 configuration_status_list = nsr.get('configurationStatus')
156 config_status = configuration_status_list[vca_index].get('status')
157
158 if config_status == 'BROKEN' and vca_status != 'failed':
159 db_dict['configurationStatus'][vca_index] = 'READY'
160 elif config_status != 'BROKEN' and vca_status == 'failed':
161 db_dict['configurationStatus'][vca_index] = 'BROKEN'
162 except Exception as e:
163 # not update configurationStatus
164 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
165
166 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
167 # if nsState = 'DEGRADED' check if all is OK
168 is_degraded = False
169 if current_ns_status in ('READY', 'DEGRADED'):
170 error_description = ''
171 # check machines
172 if status_dict.get('machines'):
173 for machine_id in status_dict.get('machines'):
174 machine = status_dict.get('machines').get(machine_id)
175 # check machine agent-status
176 if machine.get('agent-status'):
177 s = machine.get('agent-status').get('status')
178 if s != 'started':
179 is_degraded = True
180 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
181 # check machine instance status
182 if machine.get('instance-status'):
183 s = machine.get('instance-status').get('status')
184 if s != 'running':
185 is_degraded = True
186 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
187 # check applications
188 if status_dict.get('applications'):
189 for app_id in status_dict.get('applications'):
190 app = status_dict.get('applications').get(app_id)
191 # check application status
192 if app.get('status'):
193 s = app.get('status').get('status')
194 if s != 'active':
195 is_degraded = True
196 error_description += 'application {} status={} ; '.format(app_id, s)
197
198 if error_description:
199 db_dict['errorDescription'] = error_description
200 if current_ns_status == 'READY' and is_degraded:
201 db_dict['nsState'] = 'DEGRADED'
202 if current_ns_status == 'DEGRADED' and not is_degraded:
203 db_dict['nsState'] = 'READY'
204
205 # write to database
206 self.update_db_2("nsrs", nsr_id, db_dict)
207
208 except Exception as e:
209 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
210
211 return
212
213 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
214 """
215 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
216 :param vnfd: input vnfd
217 :param new_id: overrides vnf id if provided
218 :param additionalParams: Instantiation params for VNFs provided
219 :param nsrId: Id of the NSR
220 :return: copy of vnfd
221 """
222 try:
223 vnfd_RO = deepcopy(vnfd)
224 # remove unused by RO configuration, monitoring, scaling and internal keys
225 vnfd_RO.pop("_id", None)
226 vnfd_RO.pop("_admin", None)
227 vnfd_RO.pop("vnf-configuration", None)
228 vnfd_RO.pop("monitoring-param", None)
229 vnfd_RO.pop("scaling-group-descriptor", None)
230 vnfd_RO.pop("kdu", None)
231 vnfd_RO.pop("k8s-cluster", None)
232 if new_id:
233 vnfd_RO["id"] = new_id
234
235 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
236 for vdu in get_iterable(vnfd_RO, "vdu"):
237 cloud_init_file = None
238 if vdu.get("cloud-init-file"):
239 base_folder = vnfd["_admin"]["storage"]
240 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
241 vdu["cloud-init-file"])
242 with self.fs.file_open(cloud_init_file, "r") as ci_file:
243 cloud_init_content = ci_file.read()
244 vdu.pop("cloud-init-file", None)
245 elif vdu.get("cloud-init"):
246 cloud_init_content = vdu["cloud-init"]
247 else:
248 continue
249
250 env = Environment()
251 ast = env.parse(cloud_init_content)
252 mandatory_vars = meta.find_undeclared_variables(ast)
253 if mandatory_vars:
254 for var in mandatory_vars:
255 if not additionalParams or var not in additionalParams.keys():
256 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
257 "file, must be provided in the instantiation parameters inside the "
258 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
259 template = Template(cloud_init_content)
260 cloud_init_content = template.render(additionalParams or {})
261 vdu["cloud-init"] = cloud_init_content
262
263 return vnfd_RO
264 except FsException as e:
265 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
266 format(vnfd["id"], vdu["id"], cloud_init_file, e))
267 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
268 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
269 format(vnfd["id"], vdu["id"], e))
270
271 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
272 """
273 Creates a RO ns descriptor from OSM ns_instantiate params
274 :param ns_params: OSM instantiate params
275 :return: The RO ns descriptor
276 """
277 vim_2_RO = {}
278 wim_2_RO = {}
279 # TODO feature 1417: Check that no instantiation is set over PDU
280 # check if PDU forces a concrete vim-network-id and add it
281 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
282
283 def vim_account_2_RO(vim_account):
284 if vim_account in vim_2_RO:
285 return vim_2_RO[vim_account]
286
287 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
288 if db_vim["_admin"]["operationalState"] != "ENABLED":
289 raise LcmException("VIM={} is not available. operationalState={}".format(
290 vim_account, db_vim["_admin"]["operationalState"]))
291 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
292 vim_2_RO[vim_account] = RO_vim_id
293 return RO_vim_id
294
295 def wim_account_2_RO(wim_account):
296 if isinstance(wim_account, str):
297 if wim_account in wim_2_RO:
298 return wim_2_RO[wim_account]
299
300 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
301 if db_wim["_admin"]["operationalState"] != "ENABLED":
302 raise LcmException("WIM={} is not available. operationalState={}".format(
303 wim_account, db_wim["_admin"]["operationalState"]))
304 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
305 wim_2_RO[wim_account] = RO_wim_id
306 return RO_wim_id
307 else:
308 return wim_account
309
310 def ip_profile_2_RO(ip_profile):
311 RO_ip_profile = deepcopy((ip_profile))
312 if "dns-server" in RO_ip_profile:
313 if isinstance(RO_ip_profile["dns-server"], list):
314 RO_ip_profile["dns-address"] = []
315 for ds in RO_ip_profile.pop("dns-server"):
316 RO_ip_profile["dns-address"].append(ds['address'])
317 else:
318 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
319 if RO_ip_profile.get("ip-version") == "ipv4":
320 RO_ip_profile["ip-version"] = "IPv4"
321 if RO_ip_profile.get("ip-version") == "ipv6":
322 RO_ip_profile["ip-version"] = "IPv6"
323 if "dhcp-params" in RO_ip_profile:
324 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
325 return RO_ip_profile
326
327 if not ns_params:
328 return None
329 RO_ns_params = {
330 # "name": ns_params["nsName"],
331 # "description": ns_params.get("nsDescription"),
332 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
333 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
334 # "scenario": ns_params["nsdId"],
335 }
336
337 n2vc_key_list = n2vc_key_list or []
338 for vnfd_ref, vnfd in vnfd_dict.items():
339 vdu_needed_access = []
340 mgmt_cp = None
341 if vnfd.get("vnf-configuration"):
342 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
343 if ssh_required and vnfd.get("mgmt-interface"):
344 if vnfd["mgmt-interface"].get("vdu-id"):
345 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
346 elif vnfd["mgmt-interface"].get("cp"):
347 mgmt_cp = vnfd["mgmt-interface"]["cp"]
348
349 for vdu in vnfd.get("vdu", ()):
350 if vdu.get("vdu-configuration"):
351 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
352 if ssh_required:
353 vdu_needed_access.append(vdu["id"])
354 elif mgmt_cp:
355 for vdu_interface in vdu.get("interface"):
356 if vdu_interface.get("external-connection-point-ref") and \
357 vdu_interface["external-connection-point-ref"] == mgmt_cp:
358 vdu_needed_access.append(vdu["id"])
359 mgmt_cp = None
360 break
361
362 if vdu_needed_access:
363 for vnf_member in nsd.get("constituent-vnfd"):
364 if vnf_member["vnfd-id-ref"] != vnfd_ref:
365 continue
366 for vdu in vdu_needed_access:
367 populate_dict(RO_ns_params,
368 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
369 n2vc_key_list)
370
371 if ns_params.get("vduImage"):
372 RO_ns_params["vduImage"] = ns_params["vduImage"]
373
374 if ns_params.get("ssh_keys"):
375 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
376 for vnf_params in get_iterable(ns_params, "vnf"):
377 for constituent_vnfd in nsd["constituent-vnfd"]:
378 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
379 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
380 break
381 else:
382 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
383 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
384 if vnf_params.get("vimAccountId"):
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
386 vim_account_2_RO(vnf_params["vimAccountId"]))
387
388 for vdu_params in get_iterable(vnf_params, "vdu"):
389 # TODO feature 1417: check that this VDU exist and it is not a PDU
390 if vdu_params.get("volume"):
391 for volume_params in vdu_params["volume"]:
392 if volume_params.get("vim-volume-id"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
395 volume_params["vim-volume-id"])
396 if vdu_params.get("interface"):
397 for interface_params in vdu_params["interface"]:
398 if interface_params.get("ip-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "ip_address"),
402 interface_params["ip-address"])
403 if interface_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_params["id"], "interfaces", interface_params["name"],
406 "mac_address"),
407 interface_params["mac-address"])
408 if interface_params.get("floating-ip-required"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
410 vdu_params["id"], "interfaces", interface_params["name"],
411 "floating-ip"),
412 interface_params["floating-ip-required"])
413
414 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
415 if internal_vld_params.get("vim-network-name"):
416 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
417 internal_vld_params["name"], "vim-network-name"),
418 internal_vld_params["vim-network-name"])
419 if internal_vld_params.get("vim-network-id"):
420 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
421 internal_vld_params["name"], "vim-network-id"),
422 internal_vld_params["vim-network-id"])
423 if internal_vld_params.get("ip-profile"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
425 internal_vld_params["name"], "ip-profile"),
426 ip_profile_2_RO(internal_vld_params["ip-profile"]))
427 if internal_vld_params.get("provider-network"):
428
429 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
430 internal_vld_params["name"], "provider-network"),
431 internal_vld_params["provider-network"].copy())
432
433 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
434 # look for interface
435 iface_found = False
436 for vdu_descriptor in vnf_descriptor["vdu"]:
437 for vdu_interface in vdu_descriptor["interface"]:
438 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
439 if icp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 vdu_interface["name"], "ip_address"),
443 icp_params["ip-address"])
444
445 if icp_params.get("mac-address"):
446 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
447 vdu_descriptor["id"], "interfaces",
448 vdu_interface["name"], "mac_address"),
449 icp_params["mac-address"])
450 iface_found = True
451 break
452 if iface_found:
453 break
454 else:
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
456 "internal-vld:id-ref={} is not present at vnfd:internal-"
457 "connection-point".format(vnf_params["member-vnf-index"],
458 icp_params["id-ref"]))
459
460 for vld_params in get_iterable(ns_params, "vld"):
461 if "ip-profile" in vld_params:
462 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
463 ip_profile_2_RO(vld_params["ip-profile"]))
464
465 if vld_params.get("provider-network"):
466
467 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
468 vld_params["provider-network"].copy())
469
470 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
471 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
472 wim_account_2_RO(vld_params["wimAccountId"])),
473 if vld_params.get("vim-network-name"):
474 RO_vld_sites = []
475 if isinstance(vld_params["vim-network-name"], dict):
476 for vim_account, vim_net in vld_params["vim-network-name"].items():
477 RO_vld_sites.append({
478 "netmap-use": vim_net,
479 "datacenter": vim_account_2_RO(vim_account)
480 })
481 else: # isinstance str
482 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
483 if RO_vld_sites:
484 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
485
486 if vld_params.get("vim-network-id"):
487 RO_vld_sites = []
488 if isinstance(vld_params["vim-network-id"], dict):
489 for vim_account, vim_net in vld_params["vim-network-id"].items():
490 RO_vld_sites.append({
491 "netmap-use": vim_net,
492 "datacenter": vim_account_2_RO(vim_account)
493 })
494 else: # isinstance str
495 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
496 if RO_vld_sites:
497 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
498 if vld_params.get("ns-net"):
499 if isinstance(vld_params["ns-net"], dict):
500 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
501 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
502 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
503 if "vnfd-connection-point-ref" in vld_params:
504 for cp_params in vld_params["vnfd-connection-point-ref"]:
505 # look for interface
506 for constituent_vnfd in nsd["constituent-vnfd"]:
507 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
508 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
509 break
510 else:
511 raise LcmException(
512 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
513 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
514 match_cp = False
515 for vdu_descriptor in vnf_descriptor["vdu"]:
516 for interface_descriptor in vdu_descriptor["interface"]:
517 if interface_descriptor.get("external-connection-point-ref") == \
518 cp_params["vnfd-connection-point-ref"]:
519 match_cp = True
520 break
521 if match_cp:
522 break
523 else:
524 raise LcmException(
525 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
526 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
527 cp_params["member-vnf-index-ref"],
528 cp_params["vnfd-connection-point-ref"],
529 vnf_descriptor["id"]))
530 if cp_params.get("ip-address"):
531 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
532 vdu_descriptor["id"], "interfaces",
533 interface_descriptor["name"], "ip_address"),
534 cp_params["ip-address"])
535 if cp_params.get("mac-address"):
536 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
537 vdu_descriptor["id"], "interfaces",
538 interface_descriptor["name"], "mac_address"),
539 cp_params["mac-address"])
540 return RO_ns_params
541
542 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
543 # make a copy to do not change
544 vdu_create = copy(vdu_create)
545 vdu_delete = copy(vdu_delete)
546
547 vdurs = db_vnfr.get("vdur")
548 if vdurs is None:
549 vdurs = []
550 vdu_index = len(vdurs)
551 while vdu_index:
552 vdu_index -= 1
553 vdur = vdurs[vdu_index]
554 if vdur.get("pdu-type"):
555 continue
556 vdu_id_ref = vdur["vdu-id-ref"]
557 if vdu_create and vdu_create.get(vdu_id_ref):
558 for index in range(0, vdu_create[vdu_id_ref]):
559 vdur = deepcopy(vdur)
560 vdur["_id"] = str(uuid4())
561 vdur["count-index"] += 1
562 vdurs.insert(vdu_index+1+index, vdur)
563 del vdu_create[vdu_id_ref]
564 if vdu_delete and vdu_delete.get(vdu_id_ref):
565 del vdurs[vdu_index]
566 vdu_delete[vdu_id_ref] -= 1
567 if not vdu_delete[vdu_id_ref]:
568 del vdu_delete[vdu_id_ref]
569 # check all operations are done
570 if vdu_create or vdu_delete:
571 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
572 vdu_create))
573 if vdu_delete:
574 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
575 vdu_delete))
576
577 vnfr_update = {"vdur": vdurs}
578 db_vnfr["vdur"] = vdurs
579 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
580
581 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
582 """
583 Updates database nsr with the RO info for the created vld
584 :param ns_update_nsr: dictionary to be filled with the updated info
585 :param db_nsr: content of db_nsr. This is also modified
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589
590 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
591 for net_RO in get_iterable(nsr_desc_RO, "nets"):
592 if vld["id"] != net_RO.get("ns_net_osm_id"):
593 continue
594 vld["vim-id"] = net_RO.get("vim_net_id")
595 vld["name"] = net_RO.get("vim_name")
596 vld["status"] = net_RO.get("status")
597 vld["status-detailed"] = net_RO.get("error_msg")
598 ns_update_nsr["vld.{}".format(vld_index)] = vld
599 break
600 else:
601 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
602
603 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
604 """
605 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
606 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
607 :param nsr_desc_RO: nsr descriptor from RO
608 :return: Nothing, LcmException is raised on errors
609 """
610 for vnf_index, db_vnfr in db_vnfrs.items():
611 for vnf_RO in nsr_desc_RO["vnfs"]:
612 if vnf_RO["member_vnf_index"] != vnf_index:
613 continue
614 vnfr_update = {}
615 if vnf_RO.get("ip_address"):
616 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
617 elif not db_vnfr.get("ip-address"):
618 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
619
620 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
621 vdur_RO_count_index = 0
622 if vdur.get("pdu-type"):
623 continue
624 for vdur_RO in get_iterable(vnf_RO, "vms"):
625 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
626 continue
627 if vdur["count-index"] != vdur_RO_count_index:
628 vdur_RO_count_index += 1
629 continue
630 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
631 if vdur_RO.get("ip_address"):
632 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
633 else:
634 vdur["ip-address"] = None
635 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
636 vdur["name"] = vdur_RO.get("vim_name")
637 vdur["status"] = vdur_RO.get("status")
638 vdur["status-detailed"] = vdur_RO.get("error_msg")
639 for ifacer in get_iterable(vdur, "interfaces"):
640 for interface_RO in get_iterable(vdur_RO, "interfaces"):
641 if ifacer["name"] == interface_RO.get("internal_name"):
642 ifacer["ip-address"] = interface_RO.get("ip_address")
643 ifacer["mac-address"] = interface_RO.get("mac_address")
644 break
645 else:
646 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
647 "from VIM info"
648 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
649 vnfr_update["vdur.{}".format(vdu_index)] = vdur
650 break
651 else:
652 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
653 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
654
655 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
656 for net_RO in get_iterable(nsr_desc_RO, "nets"):
657 if vld["id"] != net_RO.get("vnf_net_osm_id"):
658 continue
659 vld["vim-id"] = net_RO.get("vim_net_id")
660 vld["name"] = net_RO.get("vim_name")
661 vld["status"] = net_RO.get("status")
662 vld["status-detailed"] = net_RO.get("error_msg")
663 vnfr_update["vld.{}".format(vld_index)] = vld
664 break
665 else:
666 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
667 vnf_index, vld["id"]))
668
669 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
670 break
671
672 else:
673 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
674
675 def _get_ns_config_info(self, nsr_id):
676 """
677 Generates a mapping between vnf,vdu elements and the N2VC id
678 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
679 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
680 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
681 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
682 """
683 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
684 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
685 mapping = {}
686 ns_config_info = {"osm-config-mapping": mapping}
687 for vca in vca_deployed_list:
688 if not vca["member-vnf-index"]:
689 continue
690 if not vca["vdu_id"]:
691 mapping[vca["member-vnf-index"]] = vca["application"]
692 else:
693 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
694 vca["application"]
695 return ns_config_info
696
697 @staticmethod
698 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
699 """
700 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
701 primitives as verify-ssh-credentials, or config when needed
702 :param desc_primitive_list: information of the descriptor
703 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
704 this element contains a ssh public key
705 :return: The modified list. Can ba an empty list, but always a list
706 """
707 if desc_primitive_list:
708 primitive_list = desc_primitive_list.copy()
709 else:
710 primitive_list = []
711 # look for primitive config, and get the position. None if not present
712 config_position = None
713 for index, primitive in enumerate(primitive_list):
714 if primitive["name"] == "config":
715 config_position = index
716 break
717
718 # for NS, add always a config primitive if not present (bug 874)
719 if not vca_deployed["member-vnf-index"] and config_position is None:
720 primitive_list.insert(0, {"name": "config", "parameter": []})
721 config_position = 0
722 # for VNF/VDU add verify-ssh-credentials after config
723 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
724 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
725 return primitive_list
726
727 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
728 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
729
730 db_nsr_update = {}
731 RO_descriptor_number = 0 # number of descriptors created at RO
732 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
733 start_deploy = time()
734 vdu_flag = False # If any of the VNFDs has VDUs
735 ns_params = db_nslcmop.get("operationParams")
736 if ns_params and ns_params.get("timeout_ns_deploy"):
737 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
738 else:
739 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
740
741 # Check for and optionally request placement optimization. Database will be updated if placement activated
742 await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
743
744 # deploy RO
745
746 # get vnfds, instantiate at RO
747
748 for c_vnf in nsd.get("constituent-vnfd", ()):
749 member_vnf_index = c_vnf["member-vnf-index"]
750 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
751 if vnfd.get("vdu"):
752 vdu_flag = True
753 vnfd_ref = vnfd["id"]
754 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
755 " RO".format(vnfd_ref, member_vnf_index)
756 # self.logger.debug(logging_text + step)
757 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
758 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
759 RO_descriptor_number += 1
760
761 # look position at deployed.RO.vnfd if not present it will be appended at the end
762 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
763 if vnf_deployed["member-vnf-index"] == member_vnf_index:
764 break
765 else:
766 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
767 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
768
769 # look if present
770 RO_update = {"member-vnf-index": member_vnf_index}
771 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
772 if vnfd_list:
773 RO_update["id"] = vnfd_list[0]["uuid"]
774 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
775 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
776 else:
777 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
778 get("additionalParamsForVnf"), nsr_id)
779 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
780 RO_update["id"] = desc["uuid"]
781 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
782 vnfd_ref, member_vnf_index, desc["uuid"]))
783 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
784 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
785 self.update_db_2("nsrs", nsr_id, db_nsr_update)
786
787 # create nsd at RO
788 nsd_ref = nsd["id"]
789 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
790 # self.logger.debug(logging_text + step)
791
792 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
793 RO_descriptor_number += 1
794 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
795 if nsd_list:
796 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
797 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
798 nsd_ref, RO_nsd_uuid))
799 else:
800 nsd_RO = deepcopy(nsd)
801 nsd_RO["id"] = RO_osm_nsd_id
802 nsd_RO.pop("_id", None)
803 nsd_RO.pop("_admin", None)
804 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
805 member_vnf_index = c_vnf["member-vnf-index"]
806 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
807 for c_vld in nsd_RO.get("vld", ()):
808 for cp in c_vld.get("vnfd-connection-point-ref", ()):
809 member_vnf_index = cp["member-vnf-index-ref"]
810 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
811
812 desc = await self.RO.create("nsd", descriptor=nsd_RO)
813 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
814 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
815 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
816 self.update_db_2("nsrs", nsr_id, db_nsr_update)
817
818 # Crate ns at RO
819 # if present use it unless in error status
820 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
821 if RO_nsr_id:
822 try:
823 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
824 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
825 desc = await self.RO.show("ns", RO_nsr_id)
826
827 except ROclient.ROClientException as e:
828 if e.http_code != HTTPStatus.NOT_FOUND:
829 raise
830 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
831 if RO_nsr_id:
832 ns_status, ns_status_info = self.RO.check_ns_status(desc)
833 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
834 if ns_status == "ERROR":
835 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
836 .format(RO_nsr_id)
837 self.logger.debug(logging_text + step)
838 await self.RO.delete("ns", RO_nsr_id)
839 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
840 if not RO_nsr_id:
841 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
842 # self.logger.debug(logging_text + step)
843
844 # check if VIM is creating and wait look if previous tasks in process
845 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
846 if task_dependency:
847 step = "Waiting for related tasks to be completed: {}".format(task_name)
848 self.logger.debug(logging_text + step)
849 await asyncio.wait(task_dependency, timeout=3600)
850 if ns_params.get("vnf"):
851 for vnf in ns_params["vnf"]:
852 if "vimAccountId" in vnf:
853 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
854 vnf["vimAccountId"])
855 if task_dependency:
856 step = "Waiting for related tasks to be completed: {}".format(task_name)
857 self.logger.debug(logging_text + step)
858 await asyncio.wait(task_dependency, timeout=3600)
859
860 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
861
862 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
863
864 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
865 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
866 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
867 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
868 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
869 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
870 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
871 self.update_db_2("nsrs", nsr_id, db_nsr_update)
872
873 # wait until NS is ready
874 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
875 detailed_status_old = None
876 self.logger.debug(logging_text + step)
877
878 old_desc = None
879 while time() <= start_deploy + timeout_ns_deploy:
880 desc = await self.RO.show("ns", RO_nsr_id)
881
882 # deploymentStatus
883 if desc != old_desc:
884 # desc has changed => update db
885 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
886 old_desc = desc
887
888 ns_status, ns_status_info = self.RO.check_ns_status(desc)
889 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
890 if ns_status == "ERROR":
891 raise ROclient.ROClientException(ns_status_info)
892 elif ns_status == "BUILD":
893 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
894 elif ns_status == "ACTIVE":
895 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
896 try:
897 if vdu_flag:
898 self.ns_update_vnfr(db_vnfrs, desc)
899 break
900 except LcmExceptionNoMgmtIP:
901 pass
902 else:
903 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
904 if detailed_status != detailed_status_old:
905 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
906 self.update_db_2("nsrs", nsr_id, db_nsr_update)
907 await asyncio.sleep(5, loop=self.loop)
908 else: # timeout_ns_deploy
909 raise ROclient.ROClientException("Timeout waiting ns to be ready")
910
911 step = "Updating NSR"
912 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
913
914 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
915 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
916 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
917 self.update_db_2("nsrs", nsr_id, db_nsr_update)
918 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
919
920 step = "Deployed at VIM"
921 self.logger.debug(logging_text + step)
922
923 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
924 """
925 Wait for ip addres at RO, and optionally, insert public key in virtual machine
926 :param logging_text: prefix use for logging
927 :param nsr_id:
928 :param vnfr_id:
929 :param vdu_id:
930 :param vdu_index:
931 :param pub_key: public ssh key to inject, None to skip
932 :param user: user to apply the public ssh key
933 :return: IP address
934 """
935
936 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
937 ro_nsr_id = None
938 ip_address = None
939 nb_tries = 0
940 target_vdu_id = None
941 ro_retries = 0
942
943 while True:
944
945 ro_retries += 1
946 if ro_retries >= 360: # 1 hour
947 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
948
949 await asyncio.sleep(10, loop=self.loop)
950 # wait until NS is deployed at RO
951 if not ro_nsr_id:
952 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
953 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
954 if not ro_nsr_id:
955 continue
956
957 # get ip address
958 if not target_vdu_id:
959 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
960
961 if not vdu_id: # for the VNF case
962 ip_address = db_vnfr.get("ip-address")
963 if not ip_address:
964 continue
965 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
966 else: # VDU case
967 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
968 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
969
970 if not vdur:
971 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
972 vnfr_id, vdu_id, vdu_index
973 ))
974
975 if vdur.get("status") == "ACTIVE":
976 ip_address = vdur.get("ip-address")
977 if not ip_address:
978 continue
979 target_vdu_id = vdur["vdu-id-ref"]
980 elif vdur.get("status") == "ERROR":
981 raise LcmException("Cannot inject ssh-key because target VM is in error state")
982
983 if not target_vdu_id:
984 continue
985
986 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
987
988 # inject public key into machine
989 if pub_key and user:
990 # self.logger.debug(logging_text + "Inserting RO key")
991 try:
992 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
993 result_dict = await self.RO.create_action(
994 item="ns",
995 item_id_name=ro_nsr_id,
996 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
997 )
998 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
999 if not result_dict or not isinstance(result_dict, dict):
1000 raise LcmException("Unknown response from RO when injecting key")
1001 for result in result_dict.values():
1002 if result.get("vim_result") == 200:
1003 break
1004 else:
1005 raise ROclient.ROClientException("error injecting key: {}".format(
1006 result.get("description")))
1007 break
1008 except ROclient.ROClientException as e:
1009 if not nb_tries:
1010 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1011 format(e, 20*10))
1012 nb_tries += 1
1013 if nb_tries >= 20:
1014 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1015 else:
1016 break
1017
1018 return ip_address
1019
1020 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1021 """
1022 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1023 """
1024 my_vca = vca_deployed_list[vca_index]
1025 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1026 # vdu or kdu: no dependencies
1027 return
1028 timeout = 300
1029 while timeout >= 0:
1030 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1031 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1032 configuration_status_list = db_nsr["configurationStatus"]
1033 for index, vca_deployed in enumerate(configuration_status_list):
1034 if index == vca_index:
1035 # myself
1036 continue
1037 if not my_vca.get("member-vnf-index") or \
1038 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1039 internal_status = configuration_status_list[index].get("status")
1040 if internal_status == 'READY':
1041 continue
1042 elif internal_status == 'BROKEN':
1043 raise LcmException("Configuration aborted because dependent charm/s has failed")
1044 else:
1045 break
1046 else:
1047 # no dependencies, return
1048 return
1049 await asyncio.sleep(10)
1050 timeout -= 1
1051
1052 raise LcmException("Configuration aborted because dependent charm/s timeout")
1053
1054 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
1055 vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
1056 nsr_id = db_nsr["_id"]
1057 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1058 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1059 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1060 db_dict = {
1061 'collection': 'nsrs',
1062 'filter': {'_id': nsr_id},
1063 'path': db_update_entry
1064 }
1065 step = ""
1066 try:
1067
1068 element_type = 'NS'
1069 element_under_configuration = nsr_id
1070
1071 vnfr_id = None
1072 if db_vnfr:
1073 vnfr_id = db_vnfr["_id"]
1074
1075 namespace = "{nsi}.{ns}".format(
1076 nsi=nsi_id if nsi_id else "",
1077 ns=nsr_id)
1078
1079 if vnfr_id:
1080 element_type = 'VNF'
1081 element_under_configuration = vnfr_id
1082 namespace += ".{}".format(vnfr_id)
1083 if vdu_id:
1084 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1085 element_type = 'VDU'
1086 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1087
1088 # Get artifact path
1089 self.fs.sync() # Sync from FSMongo
1090 artifact_path = "{}/{}/charms/{}".format(
1091 base_folder["folder"],
1092 base_folder["pkg-dir"],
1093 config_descriptor["juju"]["charm"]
1094 )
1095
1096 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1097 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1098 is_proxy_charm = False
1099
1100 # n2vc_redesign STEP 3.1
1101
1102 # find old ee_id if exists
1103 ee_id = vca_deployed.get("ee_id")
1104
1105 # create or register execution environment in VCA
1106 if is_proxy_charm:
1107
1108 self._write_configuration_status(
1109 nsr_id=nsr_id,
1110 vca_index=vca_index,
1111 status='CREATING',
1112 element_under_configuration=element_under_configuration,
1113 element_type=element_type
1114 )
1115
1116 step = "create execution environment"
1117 self.logger.debug(logging_text + step)
1118 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1119 reuse_ee_id=ee_id,
1120 db_dict=db_dict)
1121
1122 else:
1123 step = "Waiting to VM being up and getting IP address"
1124 self.logger.debug(logging_text + step)
1125 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1126 user=None, pub_key=None)
1127 credentials = {"hostname": rw_mgmt_ip}
1128 # get username
1129 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1130 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1131 # merged. Meanwhile let's get username from initial-config-primitive
1132 if not username and config_descriptor.get("initial-config-primitive"):
1133 for config_primitive in config_descriptor["initial-config-primitive"]:
1134 for param in config_primitive.get("parameter", ()):
1135 if param["name"] == "ssh-username":
1136 username = param["value"]
1137 break
1138 if not username:
1139 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1140 "'config-access.ssh-access.default-user'")
1141 credentials["username"] = username
1142 # n2vc_redesign STEP 3.2
1143
1144 self._write_configuration_status(
1145 nsr_id=nsr_id,
1146 vca_index=vca_index,
1147 status='REGISTERING',
1148 element_under_configuration=element_under_configuration,
1149 element_type=element_type
1150 )
1151
1152 step = "register execution environment {}".format(credentials)
1153 self.logger.debug(logging_text + step)
1154 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1155 db_dict=db_dict)
1156
1157 # for compatibility with MON/POL modules, the need model and application name at database
1158 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1159 ee_id_parts = ee_id.split('.')
1160 model_name = ee_id_parts[0]
1161 application_name = ee_id_parts[1]
1162 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1163 db_update_entry + "application": application_name,
1164 db_update_entry + "ee_id": ee_id})
1165
1166 # n2vc_redesign STEP 3.3
1167
1168 step = "Install configuration Software"
1169
1170 self._write_configuration_status(
1171 nsr_id=nsr_id,
1172 vca_index=vca_index,
1173 status='INSTALLING SW',
1174 element_under_configuration=element_under_configuration,
1175 element_type=element_type
1176 )
1177
1178 # TODO check if already done
1179 self.logger.debug(logging_text + step)
1180 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1181
1182 # write in db flag of configuration_sw already installed
1183 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1184
1185 # add relations for this VCA (wait for other peers related with this VCA)
1186 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
1187
1188 # if SSH access is required, then get execution environment SSH public
1189 if is_proxy_charm: # if native charm we have waited already to VM be UP
1190 pub_key = None
1191 user = None
1192 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1193 # Needed to inject a ssh key
1194 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1195 step = "Install configuration Software, getting public ssh key"
1196 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1197
1198 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1199 else:
1200 step = "Waiting to VM being up and getting IP address"
1201 self.logger.debug(logging_text + step)
1202
1203 # n2vc_redesign STEP 5.1
1204 # wait for RO (ip-address) Insert pub_key into VM
1205 if vnfr_id:
1206 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1207 user=user, pub_key=pub_key)
1208 else:
1209 rw_mgmt_ip = None # This is for a NS configuration
1210
1211 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1212
1213 # store rw_mgmt_ip in deploy params for later replacement
1214 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1215
1216 # n2vc_redesign STEP 6 Execute initial config primitive
1217 step = 'execute initial config primitive'
1218 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1219
1220 # sort initial config primitives by 'seq'
1221 if initial_config_primitive_list:
1222 try:
1223 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1224 except Exception as e:
1225 self.logger.error(logging_text + step + ": " + str(e))
1226 else:
1227 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1228
1229 # add config if not present for NS charm
1230 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1231 vca_deployed)
1232
1233 # wait for dependent primitives execution (NS -> VNF -> VDU)
1234 if initial_config_primitive_list:
1235 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1236
1237 # stage, in function of element type: vdu, kdu, vnf or ns
1238 my_vca = vca_deployed_list[vca_index]
1239 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1240 # VDU or KDU
1241 stage = 'Stage 3/5: running Day-1 primitives for VDU'
1242 elif my_vca.get("member-vnf-index"):
1243 # VNF
1244 stage = 'Stage 4/5: running Day-1 primitives for VNF'
1245 else:
1246 # NS
1247 stage = 'Stage 5/5: running Day-1 primitives for NS'
1248
1249 self._write_configuration_status(
1250 nsr_id=nsr_id,
1251 vca_index=vca_index,
1252 status='EXECUTING PRIMITIVE'
1253 )
1254
1255 self._write_op_status(
1256 op_id=nslcmop_id,
1257 stage=stage
1258 )
1259
1260 for initial_config_primitive in initial_config_primitive_list:
1261 # adding information on the vca_deployed if it is a NS execution environment
1262 if not vca_deployed["member-vnf-index"]:
1263 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1264 # TODO check if already done
1265 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1266
1267 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1268 self.logger.debug(logging_text + step)
1269 await self.n2vc.exec_primitive(
1270 ee_id=ee_id,
1271 primitive_name=initial_config_primitive["name"],
1272 params_dict=primitive_params_,
1273 db_dict=db_dict
1274 )
1275
1276 # TODO register in database that primitive is done
1277
1278 step = "instantiated at VCA"
1279 self.logger.debug(logging_text + step)
1280
1281 self._write_configuration_status(
1282 nsr_id=nsr_id,
1283 vca_index=vca_index,
1284 status='READY'
1285 )
1286
1287 except Exception as e: # TODO not use Exception but N2VC exception
1288 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1289 self._write_configuration_status(
1290 nsr_id=nsr_id,
1291 vca_index=vca_index,
1292 status='BROKEN'
1293 )
1294 raise Exception("{} {}".format(step, e)) from e
1295 # TODO raise N2VC exception with 'step' extra information
1296
1297 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1298 error_description: str = None):
1299 try:
1300 db_dict = dict()
1301 if ns_state:
1302 db_dict["nsState"] = ns_state
1303 db_dict["currentOperation"] = current_operation
1304 db_dict["currentOperationID"] = current_operation_id
1305 db_dict["errorDescription"] = error_description
1306 self.update_db_2("nsrs", nsr_id, db_dict)
1307 except Exception as e:
1308 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1309
1310 def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
1311 try:
1312 db_dict = dict()
1313 db_dict['queuePosition'] = queuePosition
1314 db_dict['stage'] = stage
1315 if error_message:
1316 db_dict['errorMessage'] = error_message
1317 self.update_db_2("nslcmops", op_id, db_dict)
1318 except Exception as e:
1319 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1320
1321 def _write_all_config_status(self, nsr_id: str, status: str):
1322 try:
1323 # nsrs record
1324 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1325 # configurationStatus
1326 config_status = db_nsr.get('configurationStatus')
1327 if config_status:
1328 # update status
1329 db_dict = dict()
1330 db_dict['configurationStatus'] = list()
1331 for c in config_status:
1332 c['status'] = status
1333 db_dict['configurationStatus'].append(c)
1334 self.update_db_2("nsrs", nsr_id, db_dict)
1335
1336 except Exception as e:
1337 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1338
1339 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1340 element_under_configuration: str = None, element_type: str = None):
1341
1342 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1343 # .format(vca_index, status))
1344
1345 try:
1346 db_path = 'configurationStatus.{}.'.format(vca_index)
1347 db_dict = dict()
1348 if status:
1349 db_dict[db_path + 'status'] = status
1350 if element_under_configuration:
1351 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1352 if element_type:
1353 db_dict[db_path + 'elementType'] = element_type
1354 self.update_db_2("nsrs", nsr_id, db_dict)
1355 except Exception as e:
1356 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1357 .format(status, nsr_id, vca_index, e))
1358
1359 async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1360 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1361 if placement_engine == "PLA":
1362 self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
1363 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
1364 db_poll_interval = 5
1365 wait = db_poll_interval * 4
1366 pla_result = None
1367 while not pla_result and wait >= 0:
1368 await asyncio.sleep(db_poll_interval)
1369 wait -= db_poll_interval
1370 db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
1371 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1372
1373 if not pla_result:
1374 raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
1375
1376 for pla_vnf in pla_result['vnf']:
1377 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1378 if not pla_vnf.get('vimAccountId') or not vnfr:
1379 continue
1380 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1381 return
1382
1383 def update_nsrs_with_pla_result(self, params):
1384 try:
1385 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1386 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1387 except Exception as e:
1388 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1389
1390 async def instantiate(self, nsr_id, nslcmop_id):
1391 """
1392
1393 :param nsr_id: ns instance to deploy
1394 :param nslcmop_id: operation to run
1395 :return:
1396 """
1397
1398 # Try to lock HA task here
1399 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1400 if not task_is_locked_by_me:
1401 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1402 return
1403
1404 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1405 self.logger.debug(logging_text + "Enter")
1406
1407 # get all needed from database
1408
1409 # database nsrs record
1410 db_nsr = None
1411
1412 # database nslcmops record
1413 db_nslcmop = None
1414
1415 # update operation on nsrs
1416 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1417 "_admin.current-operation": nslcmop_id,
1418 "_admin.operation-type": "instantiate"}
1419 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1420
1421 # update operation on nslcmops
1422 db_nslcmop_update = {}
1423
1424 nslcmop_operation_state = None
1425 db_vnfrs = {} # vnf's info indexed by member-index
1426 # n2vc_info = {}
1427 task_instantiation_list = []
1428 task_instantiation_info = {} # from task to info text
1429 exc = None
1430 try:
1431 # wait for any previous tasks in process
1432 step = "Waiting for previous operations to terminate"
1433 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1434
1435 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1436
1437 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1438 self._write_ns_status(
1439 nsr_id=nsr_id,
1440 ns_state="BUILDING",
1441 current_operation="INSTANTIATING",
1442 current_operation_id=nslcmop_id
1443 )
1444
1445 # read from db: operation
1446 step = "Getting nslcmop={} from db".format(nslcmop_id)
1447 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1448 ns_params = db_nslcmop.get("operationParams")
1449 if ns_params and ns_params.get("timeout_ns_deploy"):
1450 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1451 else:
1452 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1453
1454 # read from db: ns
1455 step = "Getting nsr={} from db".format(nsr_id)
1456 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1457 # nsd is replicated into ns (no db read)
1458 nsd = db_nsr["nsd"]
1459 # nsr_name = db_nsr["name"] # TODO short-name??
1460
1461 # read from db: vnf's of this ns
1462 step = "Getting vnfrs from db"
1463 self.logger.debug(logging_text + step)
1464 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1465
1466 # read from db: vnfd's for every vnf
1467 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1468 db_vnfds = {} # every vnfd data indexed by vnf id
1469 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1470
1471 self._write_op_status(
1472 op_id=nslcmop_id,
1473 stage='Stage 1/5: preparation of the environment',
1474 queuePosition=0
1475 )
1476
1477 # for each vnf in ns, read vnfd
1478 for vnfr in db_vnfrs_list:
1479 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1480 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1481 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1482 # if we haven't this vnfd, read it from db
1483 if vnfd_id not in db_vnfds:
1484 # read from db
1485 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1486 self.logger.debug(logging_text + step)
1487 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1488
1489 # store vnfd
1490 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1491 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1492 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1493
1494 # Get or generates the _admin.deployed.VCA list
1495 vca_deployed_list = None
1496 if db_nsr["_admin"].get("deployed"):
1497 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1498 if vca_deployed_list is None:
1499 vca_deployed_list = []
1500 configuration_status_list = []
1501 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1502 db_nsr_update["configurationStatus"] = configuration_status_list
1503 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1504 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1505 elif isinstance(vca_deployed_list, dict):
1506 # maintain backward compatibility. Change a dict to list at database
1507 vca_deployed_list = list(vca_deployed_list.values())
1508 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1509 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1510
1511 db_nsr_update["detailed-status"] = "creating"
1512 db_nsr_update["operational-status"] = "init"
1513
1514 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1515 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1516 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1517
1518 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1519 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1520 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1521
1522 # n2vc_redesign STEP 2 Deploy Network Scenario
1523
1524 self._write_op_status(
1525 op_id=nslcmop_id,
1526 stage='Stage 2/5: deployment of VMs and execution environments'
1527 )
1528
1529 self.logger.debug(logging_text + "Before deploy_kdus")
1530 # Call to deploy_kdus in case exists the "vdu:kdu" param
1531 task_kdu = asyncio.ensure_future(
1532 self.deploy_kdus(
1533 logging_text=logging_text,
1534 nsr_id=nsr_id,
1535 db_nsr=db_nsr,
1536 db_vnfrs=db_vnfrs,
1537 db_vnfds=db_vnfds
1538 )
1539 )
1540 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1541 task_instantiation_info[task_kdu] = "Deploy KDUs"
1542 task_instantiation_list.append(task_kdu)
1543 # n2vc_redesign STEP 1 Get VCA public ssh-key
1544 # feature 1429. Add n2vc public key to needed VMs
1545 n2vc_key = self.n2vc.get_public_key()
1546 n2vc_key_list = [n2vc_key]
1547 if self.vca_config.get("public_key"):
1548 n2vc_key_list.append(self.vca_config["public_key"])
1549
1550 task_ro = asyncio.ensure_future(
1551 self.instantiate_RO(
1552 logging_text=logging_text,
1553 nsr_id=nsr_id,
1554 nsd=nsd,
1555 db_nsr=db_nsr,
1556 db_nslcmop=db_nslcmop,
1557 db_vnfrs=db_vnfrs,
1558 db_vnfds_ref=db_vnfds_ref,
1559 n2vc_key_list=n2vc_key_list
1560 )
1561 )
1562 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1563 task_instantiation_info[task_ro] = "Deploy at VIM"
1564 task_instantiation_list.append(task_ro)
1565
1566 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1567 step = "Deploying proxy and native charms"
1568 self.logger.debug(logging_text + step)
1569
1570 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1571 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1572 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1573 vnfd_id = c_vnf["vnfd-id-ref"]
1574 vnfd = db_vnfds_ref[vnfd_id]
1575 member_vnf_index = str(c_vnf["member-vnf-index"])
1576 db_vnfr = db_vnfrs[member_vnf_index]
1577 base_folder = vnfd["_admin"]["storage"]
1578 vdu_id = None
1579 vdu_index = 0
1580 vdu_name = None
1581 kdu_name = None
1582
1583 # Get additional parameters
1584 deploy_params = {}
1585 if db_vnfr.get("additionalParamsForVnf"):
1586 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1587
1588 descriptor_config = vnfd.get("vnf-configuration")
1589 if descriptor_config and descriptor_config.get("juju"):
1590 self._deploy_n2vc(
1591 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1592 db_nsr=db_nsr,
1593 db_vnfr=db_vnfr,
1594 nslcmop_id=nslcmop_id,
1595 nsr_id=nsr_id,
1596 nsi_id=nsi_id,
1597 vnfd_id=vnfd_id,
1598 vdu_id=vdu_id,
1599 kdu_name=kdu_name,
1600 member_vnf_index=member_vnf_index,
1601 vdu_index=vdu_index,
1602 vdu_name=vdu_name,
1603 deploy_params=deploy_params,
1604 descriptor_config=descriptor_config,
1605 base_folder=base_folder,
1606 task_instantiation_list=task_instantiation_list,
1607 task_instantiation_info=task_instantiation_info
1608 )
1609
1610 # Deploy charms for each VDU that supports one.
1611 for vdud in get_iterable(vnfd, 'vdu'):
1612 vdu_id = vdud["id"]
1613 descriptor_config = vdud.get('vdu-configuration')
1614 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1615 if vdur.get("additionalParams"):
1616 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1617 else:
1618 deploy_params_vdu = deploy_params
1619 if descriptor_config and descriptor_config.get("juju"):
1620 # look for vdu index in the db_vnfr["vdu"] section
1621 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1622 # if vdur["vdu-id-ref"] == vdu_id:
1623 # break
1624 # else:
1625 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1626 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1627 # vdu_name = vdur.get("name")
1628 vdu_name = None
1629 kdu_name = None
1630 for vdu_index in range(int(vdud.get("count", 1))):
1631 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1632 self._deploy_n2vc(
1633 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1634 member_vnf_index, vdu_id, vdu_index),
1635 db_nsr=db_nsr,
1636 db_vnfr=db_vnfr,
1637 nslcmop_id=nslcmop_id,
1638 nsr_id=nsr_id,
1639 nsi_id=nsi_id,
1640 vnfd_id=vnfd_id,
1641 vdu_id=vdu_id,
1642 kdu_name=kdu_name,
1643 member_vnf_index=member_vnf_index,
1644 vdu_index=vdu_index,
1645 vdu_name=vdu_name,
1646 deploy_params=deploy_params_vdu,
1647 descriptor_config=descriptor_config,
1648 base_folder=base_folder,
1649 task_instantiation_list=task_instantiation_list,
1650 task_instantiation_info=task_instantiation_info
1651 )
1652 for kdud in get_iterable(vnfd, 'kdu'):
1653 kdu_name = kdud["name"]
1654 descriptor_config = kdud.get('kdu-configuration')
1655 if descriptor_config and descriptor_config.get("juju"):
1656 vdu_id = None
1657 vdu_index = 0
1658 vdu_name = None
1659 # look for vdu index in the db_vnfr["vdu"] section
1660 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1661 # if vdur["vdu-id-ref"] == vdu_id:
1662 # break
1663 # else:
1664 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1665 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1666 # vdu_name = vdur.get("name")
1667 # vdu_name = None
1668
1669 self._deploy_n2vc(
1670 logging_text=logging_text,
1671 db_nsr=db_nsr,
1672 db_vnfr=db_vnfr,
1673 nslcmop_id=nslcmop_id,
1674 nsr_id=nsr_id,
1675 nsi_id=nsi_id,
1676 vnfd_id=vnfd_id,
1677 vdu_id=vdu_id,
1678 kdu_name=kdu_name,
1679 member_vnf_index=member_vnf_index,
1680 vdu_index=vdu_index,
1681 vdu_name=vdu_name,
1682 deploy_params=deploy_params,
1683 descriptor_config=descriptor_config,
1684 base_folder=base_folder,
1685 task_instantiation_list=task_instantiation_list,
1686 task_instantiation_info=task_instantiation_info
1687 )
1688
1689 # Check if this NS has a charm configuration
1690 descriptor_config = nsd.get("ns-configuration")
1691 if descriptor_config and descriptor_config.get("juju"):
1692 vnfd_id = None
1693 db_vnfr = None
1694 member_vnf_index = None
1695 vdu_id = None
1696 kdu_name = None
1697 vdu_index = 0
1698 vdu_name = None
1699
1700 # Get additional parameters
1701 deploy_params = {}
1702 if db_nsr.get("additionalParamsForNs"):
1703 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1704 base_folder = nsd["_admin"]["storage"]
1705 self._deploy_n2vc(
1706 logging_text=logging_text,
1707 db_nsr=db_nsr,
1708 db_vnfr=db_vnfr,
1709 nslcmop_id=nslcmop_id,
1710 nsr_id=nsr_id,
1711 nsi_id=nsi_id,
1712 vnfd_id=vnfd_id,
1713 vdu_id=vdu_id,
1714 kdu_name=kdu_name,
1715 member_vnf_index=member_vnf_index,
1716 vdu_index=vdu_index,
1717 vdu_name=vdu_name,
1718 deploy_params=deploy_params,
1719 descriptor_config=descriptor_config,
1720 base_folder=base_folder,
1721 task_instantiation_list=task_instantiation_list,
1722 task_instantiation_info=task_instantiation_info
1723 )
1724
1725 # Wait until all tasks of "task_instantiation_list" have been finished
1726
1727 error_text_list = []
1728
1729 # let's begin with all OK
1730 instantiated_ok = True
1731 # let's begin with RO 'running' status (later we can change it)
1732 db_nsr_update["operational-status"] = "running"
1733 # let's begin with VCA 'configured' status (later we can change it)
1734 db_nsr_update["config-status"] = "configured"
1735
1736 step = "Waiting for tasks to be finished"
1737 if task_instantiation_list:
1738 # wait for all tasks completion
1739 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
1740
1741 for task in pending:
1742 instantiated_ok = False
1743 if task in (task_ro, task_kdu):
1744 # RO or KDU task is pending
1745 db_nsr_update["operational-status"] = "failed"
1746 else:
1747 # A N2VC task is pending
1748 db_nsr_update["config-status"] = "failed"
1749 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1750 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1751 for task in done:
1752 if task.cancelled():
1753 instantiated_ok = False
1754 if task in (task_ro, task_kdu):
1755 # RO or KDU task was cancelled
1756 db_nsr_update["operational-status"] = "failed"
1757 else:
1758 # A N2VC was cancelled
1759 db_nsr_update["config-status"] = "failed"
1760 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1761 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1762 else:
1763 exc = task.exception()
1764 if exc:
1765 instantiated_ok = False
1766 if task in (task_ro, task_kdu):
1767 # RO or KDU task raised an exception
1768 db_nsr_update["operational-status"] = "failed"
1769 else:
1770 # A N2VC task raised an exception
1771 db_nsr_update["config-status"] = "failed"
1772 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1773
1774 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1775 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1776 else:
1777 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1778 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1779 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1780 else:
1781 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1782
1783 if error_text_list:
1784 error_text = "\n".join(error_text_list)
1785 db_nsr_update["detailed-status"] = error_text
1786 db_nslcmop_update["detailed-status"] = error_text
1787 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1788 db_nslcmop_update["statusEnteredTime"] = time()
1789 else:
1790 # all is done
1791 db_nsr_update["detailed-status"] = "done"
1792 db_nslcmop_update["detailed-status"] = "done"
1793 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1794 db_nslcmop_update["statusEnteredTime"] = time()
1795
1796 except (ROclient.ROClientException, DbException, LcmException) as e:
1797 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1798 exc = e
1799 except asyncio.CancelledError:
1800 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1801 exc = "Operation was cancelled"
1802 except Exception as e:
1803 exc = traceback.format_exc()
1804 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1805 exc_info=True)
1806 finally:
1807 if exc:
1808 if db_nsr:
1809 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1810 db_nsr_update["operational-status"] = "failed"
1811 db_nsr_update["config-status"] = "failed"
1812 if db_nslcmop:
1813 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1814 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1815 db_nslcmop_update["statusEnteredTime"] = time()
1816 try:
1817 if db_nsr:
1818 db_nsr_update["_admin.nslcmop"] = None
1819 db_nsr_update["_admin.current-operation"] = None
1820 db_nsr_update["_admin.operation-type"] = None
1821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1822
1823 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1824 ns_state = None
1825 error_description = None
1826 if instantiated_ok:
1827 ns_state = "READY"
1828 else:
1829 ns_state = "BROKEN"
1830 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1831
1832 self._write_ns_status(
1833 nsr_id=nsr_id,
1834 ns_state=ns_state,
1835 current_operation="IDLE",
1836 current_operation_id=None,
1837 error_description=error_description
1838 )
1839
1840 self._write_op_status(
1841 op_id=nslcmop_id,
1842 error_message=error_description
1843 )
1844
1845 if db_nslcmop_update:
1846 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1847
1848 self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
1849
1850 except DbException as e:
1851 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1852
1853 if nslcmop_operation_state:
1854 try:
1855 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1856 "operationState": nslcmop_operation_state},
1857 loop=self.loop)
1858 except Exception as e:
1859 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1860
1861 self.logger.debug(logging_text + "Exit")
1862 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1863
1864 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
1865
1866 # steps:
1867 # 1. find all relations for this VCA
1868 # 2. wait for other peers related
1869 # 3. add relations
1870
1871 try:
1872
1873 # STEP 1: find all relations for this VCA
1874
1875 # read nsr record
1876 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1877
1878 # this VCA data
1879 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
1880
1881 # read all ns-configuration relations
1882 ns_relations = list()
1883 db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
1884 if db_ns_relations:
1885 for r in db_ns_relations:
1886 # check if this VCA is in the relation
1887 if my_vca.get('member-vnf-index') in\
1888 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1889 ns_relations.append(r)
1890
1891 # read all vnf-configuration relations
1892 vnf_relations = list()
1893 db_vnfd_list = db_nsr.get('vnfd-id')
1894 if db_vnfd_list:
1895 for vnfd in db_vnfd_list:
1896 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
1897 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
1898 if db_vnf_relations:
1899 for r in db_vnf_relations:
1900 # check if this VCA is in the relation
1901 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
1902 vnf_relations.append(r)
1903
1904 # if no relations, terminate
1905 if not ns_relations and not vnf_relations:
1906 self.logger.debug(logging_text + ' No relations')
1907 return True
1908
1909 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
1910
1911 # add all relations
1912 start = time()
1913 while True:
1914 # check timeout
1915 now = time()
1916 if now - start >= timeout:
1917 self.logger.error(logging_text + ' : timeout adding relations')
1918 return False
1919
1920 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
1921 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1922
1923 # for each defined NS relation, find the VCA's related
1924 for r in ns_relations:
1925 from_vca_ee_id = None
1926 to_vca_ee_id = None
1927 from_vca_endpoint = None
1928 to_vca_endpoint = None
1929 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1930 for vca in vca_list:
1931 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
1932 and vca.get('config_sw_installed'):
1933 from_vca_ee_id = vca.get('ee_id')
1934 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1935 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
1936 and vca.get('config_sw_installed'):
1937 to_vca_ee_id = vca.get('ee_id')
1938 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1939 if from_vca_ee_id and to_vca_ee_id:
1940 # add relation
1941 await self.n2vc.add_relation(
1942 ee_id_1=from_vca_ee_id,
1943 ee_id_2=to_vca_ee_id,
1944 endpoint_1=from_vca_endpoint,
1945 endpoint_2=to_vca_endpoint)
1946 # remove entry from relations list
1947 ns_relations.remove(r)
1948 else:
1949 # check failed peers
1950 try:
1951 vca_status_list = db_nsr.get('configurationStatus')
1952 if vca_status_list:
1953 for i in range(len(vca_list)):
1954 vca = vca_list[i]
1955 vca_status = vca_status_list[i]
1956 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
1957 if vca_status.get('status') == 'BROKEN':
1958 # peer broken: remove relation from list
1959 ns_relations.remove(r)
1960 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
1961 if vca_status.get('status') == 'BROKEN':
1962 # peer broken: remove relation from list
1963 ns_relations.remove(r)
1964 except Exception:
1965 # ignore
1966 pass
1967
1968 # for each defined VNF relation, find the VCA's related
1969 for r in vnf_relations:
1970 from_vca_ee_id = None
1971 to_vca_ee_id = None
1972 from_vca_endpoint = None
1973 to_vca_endpoint = None
1974 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
1975 for vca in vca_list:
1976 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
1977 from_vca_ee_id = vca.get('ee_id')
1978 from_vca_endpoint = r.get('entities')[0].get('endpoint')
1979 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
1980 to_vca_ee_id = vca.get('ee_id')
1981 to_vca_endpoint = r.get('entities')[1].get('endpoint')
1982 if from_vca_ee_id and to_vca_ee_id:
1983 # add relation
1984 await self.n2vc.add_relation(
1985 ee_id_1=from_vca_ee_id,
1986 ee_id_2=to_vca_ee_id,
1987 endpoint_1=from_vca_endpoint,
1988 endpoint_2=to_vca_endpoint)
1989 # remove entry from relations list
1990 vnf_relations.remove(r)
1991 else:
1992 # check failed peers
1993 try:
1994 vca_status_list = db_nsr.get('configurationStatus')
1995 if vca_status_list:
1996 for i in range(len(vca_list)):
1997 vca = vca_list[i]
1998 vca_status = vca_status_list[i]
1999 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2000 if vca_status.get('status') == 'BROKEN':
2001 # peer broken: remove relation from list
2002 ns_relations.remove(r)
2003 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2004 if vca_status.get('status') == 'BROKEN':
2005 # peer broken: remove relation from list
2006 ns_relations.remove(r)
2007 except Exception:
2008 # ignore
2009 pass
2010
2011 # wait for next try
2012 await asyncio.sleep(5.0)
2013
2014 if not ns_relations and not vnf_relations:
2015 self.logger.debug('Relations added')
2016 break
2017
2018 return True
2019
2020 except Exception as e:
2021 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2022 return False
2023
2024 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
2025 # Launch kdus if present in the descriptor
2026
2027 deployed_ok = True
2028
2029 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2030
2031 def _get_cluster_id(cluster_id, cluster_type):
2032 nonlocal k8scluster_id_2_uuic
2033 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2034 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2035
2036 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2037 if not db_k8scluster:
2038 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2039 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2040 if not k8s_id:
2041 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2042 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2043 return k8s_id
2044
2045 logging_text += "Deploy kdus: "
2046 try:
2047 db_nsr_update = {"_admin.deployed.K8s": []}
2048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2049
2050 # Look for all vnfds
2051 pending_tasks = {}
2052 index = 0
2053 for vnfr_data in db_vnfrs.values():
2054 for kdur in get_iterable(vnfr_data, "kdur"):
2055 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2056 kdumodel = None
2057 k8sclustertype = None
2058 error_text = None
2059 cluster_uuid = None
2060 vnfd_id = vnfr_data.get('vnfd-id')
2061 pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
2062 if kdur.get("helm-chart"):
2063 kdumodel = kdur["helm-chart"]
2064 k8sclustertype = "chart"
2065 k8sclustertype_full = "helm-chart"
2066 elif kdur.get("juju-bundle"):
2067 kdumodel = kdur["juju-bundle"]
2068 k8sclustertype = "juju"
2069 k8sclustertype_full = "juju-bundle"
2070 else:
2071 error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
2072 " running"
2073 # check if kdumodel is a file and exists
2074 try:
2075 # path format: /vnfdid/pkkdir/kdumodel
2076 filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
2077 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2078 kdumodel = self.fs.path + filename
2079 except Exception:
2080 # it is not a file
2081 pass
2082 try:
2083 if not error_text:
2084 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
2085 except LcmException as e:
2086 error_text = str(e)
2087 deployed_ok = False
2088
2089 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
2090
2091 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
2092 "k8scluster-type": k8sclustertype,
2093 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
2094 if error_text:
2095 k8s_instace_info["detailed-status"] = error_text
2096 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
2097 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2098 if error_text:
2099 continue
2100
2101 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
2102 "{}".format(index)}
2103 if k8sclustertype == "chart":
2104 task = asyncio.ensure_future(
2105 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
2106 params=desc_params, db_dict=db_dict, timeout=3600)
2107 )
2108 else:
2109 task = asyncio.ensure_future(
2110 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2111 atomic=True, params=desc_params,
2112 db_dict=db_dict, timeout=600,
2113 kdu_name=kdur["kdu-name"])
2114 )
2115
2116 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
2117 index += 1
2118
2119 if pending_tasks:
2120 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2121 pending_list = list(pending_tasks.keys())
2122 while pending_list:
2123 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
2124 return_when=asyncio.FIRST_COMPLETED)
2125 if not done_list: # timeout
2126 for task in pending_list:
2127 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
2128 deployed_ok = False
2129 break
2130 for task in done_list:
2131 exc = task.exception()
2132 if exc:
2133 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
2134 deployed_ok = False
2135 else:
2136 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
2137
2138 if not deployed_ok:
2139 raise LcmException('Cannot deploy KDUs')
2140
2141 except Exception as e:
2142 msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
2143 self.logger.error(msg)
2144 raise LcmException(msg)
2145 finally:
2146 if db_nsr_update:
2147 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2148
2149 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2150 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2151 base_folder, task_instantiation_list, task_instantiation_info):
2152 # launch instantiate_N2VC in a asyncio task and register task object
2153 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2154 # if not found, create one entry and update database
2155
2156 # fill db_nsr._admin.deployed.VCA.<index>
2157 vca_index = -1
2158 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2159 if not vca_deployed:
2160 continue
2161 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2162 vca_deployed.get("vdu_id") == vdu_id and \
2163 vca_deployed.get("kdu_name") == kdu_name and \
2164 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2165 break
2166 else:
2167 # not found, create one.
2168 vca_deployed = {
2169 "member-vnf-index": member_vnf_index,
2170 "vdu_id": vdu_id,
2171 "kdu_name": kdu_name,
2172 "vdu_count_index": vdu_index,
2173 "operational-status": "init", # TODO revise
2174 "detailed-status": "", # TODO revise
2175 "step": "initial-deploy", # TODO revise
2176 "vnfd_id": vnfd_id,
2177 "vdu_name": vdu_name,
2178 }
2179 vca_index += 1
2180
2181 # create VCA and configurationStatus in db
2182 db_dict = {
2183 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2184 "configurationStatus.{}".format(vca_index): dict()
2185 }
2186 self.update_db_2("nsrs", nsr_id, db_dict)
2187
2188 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2189
2190 # Launch task
2191 task_n2vc = asyncio.ensure_future(
2192 self.instantiate_N2VC(
2193 logging_text=logging_text,
2194 vca_index=vca_index,
2195 nsi_id=nsi_id,
2196 db_nsr=db_nsr,
2197 db_vnfr=db_vnfr,
2198 vdu_id=vdu_id,
2199 kdu_name=kdu_name,
2200 vdu_index=vdu_index,
2201 deploy_params=deploy_params,
2202 config_descriptor=descriptor_config,
2203 base_folder=base_folder,
2204 nslcmop_id=nslcmop_id
2205 )
2206 )
2207 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2208 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
2209 task_instantiation_list.append(task_n2vc)
2210
2211 # Check if this VNFD has a configured terminate action
2212 def _has_terminate_config_primitive(self, vnfd):
2213 vnf_config = vnfd.get("vnf-configuration")
2214 if vnf_config and vnf_config.get("terminate-config-primitive"):
2215 return True
2216 else:
2217 return False
2218
2219 @staticmethod
2220 def _get_terminate_config_primitive_seq_list(vnfd):
2221 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2222 # No need to check for existing primitive twice, already done before
2223 vnf_config = vnfd.get("vnf-configuration")
2224 seq_list = vnf_config.get("terminate-config-primitive")
2225 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2226 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2227 return seq_list_sorted
2228
2229 @staticmethod
2230 def _create_nslcmop(nsr_id, operation, params):
2231 """
2232 Creates a ns-lcm-opp content to be stored at database.
2233 :param nsr_id: internal id of the instance
2234 :param operation: instantiate, terminate, scale, action, ...
2235 :param params: user parameters for the operation
2236 :return: dictionary following SOL005 format
2237 """
2238 # Raise exception if invalid arguments
2239 if not (nsr_id and operation and params):
2240 raise LcmException(
2241 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2242 now = time()
2243 _id = str(uuid4())
2244 nslcmop = {
2245 "id": _id,
2246 "_id": _id,
2247 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2248 "operationState": "PROCESSING",
2249 "statusEnteredTime": now,
2250 "nsInstanceId": nsr_id,
2251 "lcmOperationType": operation,
2252 "startTime": now,
2253 "isAutomaticInvocation": False,
2254 "operationParams": params,
2255 "isCancelPending": False,
2256 "links": {
2257 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2258 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2259 }
2260 }
2261 return nslcmop
2262
2263 def _format_additional_params(self, params):
2264 params = params or {}
2265 for key, value in params.items():
2266 if str(value).startswith("!!yaml "):
2267 params[key] = yaml.safe_load(value[7:])
2268 return params
2269
2270 def _get_terminate_primitive_params(self, seq, vnf_index):
2271 primitive = seq.get('name')
2272 primitive_params = {}
2273 params = {
2274 "member_vnf_index": vnf_index,
2275 "primitive": primitive,
2276 "primitive_params": primitive_params,
2277 }
2278 desc_params = {}
2279 return self._map_primitive_params(seq, params, desc_params)
2280
2281 # sub-operations
2282
2283 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
2284 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2285 if (op.get('operationState') == 'COMPLETED'):
2286 # b. Skip sub-operation
2287 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2288 return self.SUBOPERATION_STATUS_SKIP
2289 else:
2290 # c. Reintent executing sub-operation
2291 # The sub-operation exists, and operationState != 'COMPLETED'
2292 # Update operationState = 'PROCESSING' to indicate a reintent.
2293 operationState = 'PROCESSING'
2294 detailed_status = 'In progress'
2295 self._update_suboperation_status(
2296 db_nslcmop, op_index, operationState, detailed_status)
2297 # Return the sub-operation index
2298 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2299 # with arguments extracted from the sub-operation
2300 return op_index
2301
2302 # Find a sub-operation where all keys in a matching dictionary must match
2303 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2304 def _find_suboperation(self, db_nslcmop, match):
2305 if (db_nslcmop and match):
2306 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2307 for i, op in enumerate(op_list):
2308 if all(op.get(k) == match[k] for k in match):
2309 return i
2310 return self.SUBOPERATION_STATUS_NOT_FOUND
2311
2312 # Update status for a sub-operation given its index
2313 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2314 # Update DB for HA tasks
2315 q_filter = {'_id': db_nslcmop['_id']}
2316 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2317 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2318 self.db.set_one("nslcmops",
2319 q_filter=q_filter,
2320 update_dict=update_dict,
2321 fail_on_empty=False)
2322
2323 # Add sub-operation, return the index of the added sub-operation
2324 # Optionally, set operationState, detailed-status, and operationType
2325 # Status and type are currently set for 'scale' sub-operations:
2326 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2327 # 'detailed-status' : status message
2328 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2329 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2330 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2331 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2332 RO_nsr_id=None, RO_scaling_info=None):
2333 if not (db_nslcmop):
2334 return self.SUBOPERATION_STATUS_NOT_FOUND
2335 # Get the "_admin.operations" list, if it exists
2336 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2337 op_list = db_nslcmop_admin.get('operations')
2338 # Create or append to the "_admin.operations" list
2339 new_op = {'member_vnf_index': vnf_index,
2340 'vdu_id': vdu_id,
2341 'vdu_count_index': vdu_count_index,
2342 'primitive': primitive,
2343 'primitive_params': mapped_primitive_params}
2344 if operationState:
2345 new_op['operationState'] = operationState
2346 if detailed_status:
2347 new_op['detailed-status'] = detailed_status
2348 if operationType:
2349 new_op['lcmOperationType'] = operationType
2350 if RO_nsr_id:
2351 new_op['RO_nsr_id'] = RO_nsr_id
2352 if RO_scaling_info:
2353 new_op['RO_scaling_info'] = RO_scaling_info
2354 if not op_list:
2355 # No existing operations, create key 'operations' with current operation as first list element
2356 db_nslcmop_admin.update({'operations': [new_op]})
2357 op_list = db_nslcmop_admin.get('operations')
2358 else:
2359 # Existing operations, append operation to list
2360 op_list.append(new_op)
2361
2362 db_nslcmop_update = {'_admin.operations': op_list}
2363 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2364 op_index = len(op_list) - 1
2365 return op_index
2366
2367 # Helper methods for scale() sub-operations
2368
2369 # pre-scale/post-scale:
2370 # Check for 3 different cases:
2371 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2372 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2373 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2374 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2375 operationType, RO_nsr_id=None, RO_scaling_info=None):
2376 # Find this sub-operation
2377 if (RO_nsr_id and RO_scaling_info):
2378 operationType = 'SCALE-RO'
2379 match = {
2380 'member_vnf_index': vnf_index,
2381 'RO_nsr_id': RO_nsr_id,
2382 'RO_scaling_info': RO_scaling_info,
2383 }
2384 else:
2385 match = {
2386 'member_vnf_index': vnf_index,
2387 'primitive': vnf_config_primitive,
2388 'primitive_params': primitive_params,
2389 'lcmOperationType': operationType
2390 }
2391 op_index = self._find_suboperation(db_nslcmop, match)
2392 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
2393 # a. New sub-operation
2394 # The sub-operation does not exist, add it.
2395 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2396 # The following parameters are set to None for all kind of scaling:
2397 vdu_id = None
2398 vdu_count_index = None
2399 vdu_name = None
2400 if (RO_nsr_id and RO_scaling_info):
2401 vnf_config_primitive = None
2402 primitive_params = None
2403 else:
2404 RO_nsr_id = None
2405 RO_scaling_info = None
2406 # Initial status for sub-operation
2407 operationState = 'PROCESSING'
2408 detailed_status = 'In progress'
2409 # Add sub-operation for pre/post-scaling (zero or more operations)
2410 self._add_suboperation(db_nslcmop,
2411 vnf_index,
2412 vdu_id,
2413 vdu_count_index,
2414 vdu_name,
2415 vnf_config_primitive,
2416 primitive_params,
2417 operationState,
2418 detailed_status,
2419 operationType,
2420 RO_nsr_id,
2421 RO_scaling_info)
2422 return self.SUBOPERATION_STATUS_NEW
2423 else:
2424 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2425 # or op_index (operationState != 'COMPLETED')
2426 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
2427
2428 # Function to return execution_environment id
2429
2430 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2431 for vca in vca_deployed_list:
2432 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2433 return vca["ee_id"]
2434
2435 # Helper methods for terminate()
2436
2437 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
2438 """ Create a primitive with params from VNFD
2439 Called from terminate() before deleting instance
2440 Calls action() to execute the primitive """
2441 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2442 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2443 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2444 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2445 db_vnfds = {}
2446 # Loop over VNFRs
2447 for vnfr in db_vnfrs_list:
2448 vnfd_id = vnfr["vnfd-id"]
2449 vnf_index = vnfr["member-vnf-index-ref"]
2450 if vnfd_id not in db_vnfds:
2451 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2452 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2453 db_vnfds[vnfd_id] = vnfd
2454 vnfd = db_vnfds[vnfd_id]
2455 if not self._has_terminate_config_primitive(vnfd):
2456 continue
2457 # Get the primitive's sorted sequence list
2458 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2459 for seq in seq_list:
2460 # For each sequence in list, get primitive and call _ns_execute_primitive()
2461 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2462 vnf_index, seq.get("name"))
2463 self.logger.debug(logging_text + step)
2464 # Create the primitive for each sequence, i.e. "primitive": "touch"
2465 primitive = seq.get('name')
2466 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2467 # The following 3 parameters are currently set to None for 'terminate':
2468 # vdu_id, vdu_count_index, vdu_name
2469 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2470 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2471 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2472 # Add sub-operation
2473 self._add_suboperation(db_nslcmop,
2474 nslcmop_id,
2475 vnf_index,
2476 vdu_id,
2477 vdu_count_index,
2478 vdu_name,
2479 primitive,
2480 mapped_primitive_params)
2481 # Sub-operations: Call _ns_execute_primitive() instead of action()
2482 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2483 # nsr_deployed = db_nsr["_admin"]["deployed"]
2484
2485 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2486 # nsr_id, nslcmop_terminate_action_id)
2487 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2488 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2489 # if result not in result_ok:
2490 # raise LcmException(
2491 # "terminate_primitive_action for vnf_member_index={}",
2492 # " primitive={} fails with error {}".format(
2493 # vnf_index, seq.get("name"), result_detail))
2494
2495 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2496 try:
2497 await self.n2vc.exec_primitive(
2498 ee_id=ee_id,
2499 primitive_name=primitive,
2500 params_dict=mapped_primitive_params
2501 )
2502 except Exception as e:
2503 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2504 raise LcmException(
2505 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2506 .format(vnf_index, seq.get("name"), e),
2507 )
2508
2509 async def _delete_N2VC(self, nsr_id: str):
2510 self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
2511 namespace = "." + nsr_id
2512 await self.n2vc.delete_namespace(namespace=namespace)
2513 self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
2514
2515 async def terminate(self, nsr_id, nslcmop_id):
2516
2517 # Try to lock HA task here
2518 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2519 if not task_is_locked_by_me:
2520 return
2521
2522 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2523 self.logger.debug(logging_text + "Enter")
2524 db_nsr = None
2525 db_nslcmop = None
2526 exc = None
2527 failed_detail = [] # annotates all failed error messages
2528 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2529 "_admin.current-operation": nslcmop_id,
2530 "_admin.operation-type": "terminate"}
2531 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2532 db_nslcmop_update = {}
2533 nslcmop_operation_state = None
2534 autoremove = False # autoremove after terminated
2535 pending_tasks = []
2536 try:
2537 # wait for any previous tasks in process
2538 step = "Waiting for previous operations to terminate"
2539 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2540
2541 self._write_ns_status(
2542 nsr_id=nsr_id,
2543 ns_state="TERMINATING",
2544 current_operation="TERMINATING",
2545 current_operation_id=nslcmop_id
2546 )
2547 self._write_op_status(
2548 op_id=nslcmop_id,
2549 queuePosition=0
2550 )
2551
2552 step = "Getting nslcmop={} from db".format(nslcmop_id)
2553 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2554 step = "Getting nsr={} from db".format(nsr_id)
2555 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2556 # nsd = db_nsr["nsd"]
2557 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2558 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2559 return
2560 # #TODO check if VIM is creating and wait
2561 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2562 # Call internal terminate action
2563 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2564
2565 pending_tasks = []
2566
2567 db_nsr_update["operational-status"] = "terminating"
2568 db_nsr_update["config-status"] = "terminating"
2569
2570 # remove NS
2571 try:
2572 step = "delete execution environment"
2573 self.logger.debug(logging_text + step)
2574
2575 task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
2576 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2577
2578 pending_tasks.append(task_delete_ee)
2579 except Exception as e:
2580 msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
2581 self.logger.error(msg)
2582 failed_detail.append(msg)
2583
2584 try:
2585 # Delete from k8scluster
2586 step = "delete kdus"
2587 self.logger.debug(logging_text + step)
2588 # print(nsr_deployed)
2589 if nsr_deployed:
2590 for kdu in nsr_deployed.get("K8s", ()):
2591 kdu_instance = kdu.get("kdu-instance")
2592 if not kdu_instance:
2593 continue
2594 if kdu.get("k8scluster-type") == "chart":
2595 task_delete_kdu_instance = asyncio.ensure_future(
2596 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2597 kdu_instance=kdu_instance))
2598 elif kdu.get("k8scluster-type") == "juju":
2599 task_delete_kdu_instance = asyncio.ensure_future(
2600 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2601 kdu_instance=kdu_instance))
2602 else:
2603 self.error(logging_text + "Unknown k8s deployment type {}".
2604 format(kdu.get("k8scluster-type")))
2605 continue
2606 pending_tasks.append(task_delete_kdu_instance)
2607 except LcmException as e:
2608 msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
2609 self.logger.error(msg)
2610 failed_detail.append(msg)
2611
2612 # remove from RO
2613 RO_fail = False
2614
2615 # Delete ns
2616 RO_nsr_id = RO_delete_action = None
2617 if nsr_deployed and nsr_deployed.get("RO"):
2618 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2619 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2620 try:
2621 if RO_nsr_id:
2622 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2623 "Deleting ns from VIM"
2624 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2625 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2626 self.logger.debug(logging_text + step)
2627 desc = await self.RO.delete("ns", RO_nsr_id)
2628 RO_delete_action = desc["action_id"]
2629 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2630 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2631 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2632 if RO_delete_action:
2633 # wait until NS is deleted from VIM
2634 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2635 format(RO_nsr_id, RO_delete_action)
2636 detailed_status_old = None
2637 self.logger.debug(logging_text + step)
2638
2639 delete_timeout = 20 * 60 # 20 minutes
2640 while delete_timeout > 0:
2641 desc = await self.RO.show(
2642 "ns",
2643 item_id_name=RO_nsr_id,
2644 extra_item="action",
2645 extra_item_id=RO_delete_action)
2646
2647 # deploymentStatus
2648 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2649
2650 ns_status, ns_status_info = self.RO.check_action_status(desc)
2651 if ns_status == "ERROR":
2652 raise ROclient.ROClientException(ns_status_info)
2653 elif ns_status == "BUILD":
2654 detailed_status = step + "; {}".format(ns_status_info)
2655 elif ns_status == "ACTIVE":
2656 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2657 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2658 break
2659 else:
2660 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2661 if detailed_status != detailed_status_old:
2662 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2663 db_nsr_update["detailed-status"] = detailed_status
2664 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2665 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2666 await asyncio.sleep(5, loop=self.loop)
2667 delete_timeout -= 5
2668 else: # delete_timeout <= 0:
2669 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2670
2671 except ROclient.ROClientException as e:
2672 if e.http_code == 404: # not found
2673 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2674 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2675 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2676 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2677 elif e.http_code == 409: # conflict
2678 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2679 self.logger.debug(logging_text + failed_detail[-1])
2680 RO_fail = True
2681 else:
2682 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2683 self.logger.error(logging_text + failed_detail[-1])
2684 RO_fail = True
2685
2686 # Delete nsd
2687 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2688 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2689 try:
2690 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2691 "Deleting nsd from RO"
2692 await self.RO.delete("nsd", RO_nsd_id)
2693 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2694 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2695 except ROclient.ROClientException as e:
2696 if e.http_code == 404: # not found
2697 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2698 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2699 elif e.http_code == 409: # conflict
2700 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2701 self.logger.debug(logging_text + failed_detail[-1])
2702 RO_fail = True
2703 else:
2704 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2705 self.logger.error(logging_text + failed_detail[-1])
2706 RO_fail = True
2707
2708 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2709 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2710 if not vnf_deployed or not vnf_deployed["id"]:
2711 continue
2712 try:
2713 RO_vnfd_id = vnf_deployed["id"]
2714 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2715 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2716 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2717 await self.RO.delete("vnfd", RO_vnfd_id)
2718 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2719 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2720 except ROclient.ROClientException as e:
2721 if e.http_code == 404: # not found
2722 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2723 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2724 elif e.http_code == 409: # conflict
2725 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2726 self.logger.debug(logging_text + failed_detail[-1])
2727 else:
2728 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2729 self.logger.error(logging_text + failed_detail[-1])
2730
2731 if failed_detail:
2732 terminate_ok = False
2733 self.logger.error(logging_text + " ;".join(failed_detail))
2734 db_nsr_update["operational-status"] = "failed"
2735 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2736 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2737 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2738 db_nslcmop_update["statusEnteredTime"] = time()
2739 else:
2740 terminate_ok = True
2741 db_nsr_update["operational-status"] = "terminated"
2742 db_nsr_update["detailed-status"] = "Done"
2743 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2744 db_nslcmop_update["detailed-status"] = "Done"
2745 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2746 db_nslcmop_update["statusEnteredTime"] = time()
2747 if db_nslcmop["operationParams"].get("autoremove"):
2748 autoremove = True
2749
2750 except (ROclient.ROClientException, DbException, LcmException) as e:
2751 self.logger.error(logging_text + "Exit Exception {}".format(e))
2752 exc = e
2753 except asyncio.CancelledError:
2754 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2755 exc = "Operation was cancelled"
2756 except Exception as e:
2757 exc = traceback.format_exc()
2758 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2759 finally:
2760 if exc and db_nslcmop:
2761 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2762 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2763 db_nslcmop_update["statusEnteredTime"] = time()
2764 try:
2765 if db_nslcmop and db_nslcmop_update:
2766 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2767 if db_nsr:
2768 db_nsr_update["_admin.nslcmop"] = None
2769 db_nsr_update["_admin.current-operation"] = None
2770 db_nsr_update["_admin.operation-type"] = None
2771 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2772
2773 if terminate_ok:
2774 ns_state = "IDLE"
2775 error_description = None
2776 error_detail = None
2777 else:
2778 ns_state = "BROKEN"
2779 error_detail = "; ".join(failed_detail)
2780 error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
2781 .format(nslcmop_id, step, error_detail)
2782
2783 self._write_ns_status(
2784 nsr_id=nsr_id,
2785 ns_state=ns_state,
2786 current_operation="IDLE",
2787 current_operation_id=None,
2788 error_description=error_description
2789 )
2790
2791 self._write_op_status(
2792 op_id=nslcmop_id,
2793 error_message=error_description
2794 )
2795
2796 except DbException as e:
2797 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2798 if nslcmop_operation_state:
2799 try:
2800 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2801 "operationState": nslcmop_operation_state,
2802 "autoremove": autoremove},
2803 loop=self.loop)
2804 except Exception as e:
2805 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2806
2807 # wait for pending tasks
2808 done = None
2809 pending = None
2810 if pending_tasks:
2811 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2812 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2813 if not pending:
2814 self.logger.debug(logging_text + 'All tasks finished...')
2815 else:
2816 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2817
2818 self.logger.debug(logging_text + "Exit")
2819 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2820
2821 @staticmethod
2822 def _map_primitive_params(primitive_desc, params, instantiation_params):
2823 """
2824 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2825 The default-value is used. If it is between < > it look for a value at instantiation_params
2826 :param primitive_desc: portion of VNFD/NSD that describes primitive
2827 :param params: Params provided by user
2828 :param instantiation_params: Instantiation params provided by user
2829 :return: a dictionary with the calculated params
2830 """
2831 calculated_params = {}
2832 for parameter in primitive_desc.get("parameter", ()):
2833 param_name = parameter["name"]
2834 if param_name in params:
2835 calculated_params[param_name] = params[param_name]
2836 elif "default-value" in parameter or "value" in parameter:
2837 if "value" in parameter:
2838 calculated_params[param_name] = parameter["value"]
2839 else:
2840 calculated_params[param_name] = parameter["default-value"]
2841 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2842 and calculated_params[param_name].endswith(">"):
2843 if calculated_params[param_name][1:-1] in instantiation_params:
2844 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2845 else:
2846 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2847 format(calculated_params[param_name], primitive_desc["name"]))
2848 else:
2849 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2850 format(param_name, primitive_desc["name"]))
2851
2852 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2853 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2854 width=256)
2855 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2856 calculated_params[param_name] = calculated_params[param_name][7:]
2857
2858 # add always ns_config_info if primitive name is config
2859 if primitive_desc["name"] == "config":
2860 if "ns_config_info" in instantiation_params:
2861 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2862 return calculated_params
2863
2864 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2865 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2866
2867 # find vca_deployed record for this action
2868 try:
2869 for vca_deployed in db_deployed["VCA"]:
2870 if not vca_deployed:
2871 continue
2872 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2873 continue
2874 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2875 continue
2876 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2877 continue
2878 break
2879 else:
2880 # vca_deployed not found
2881 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2882 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2883
2884 # get ee_id
2885 ee_id = vca_deployed.get("ee_id")
2886 if not ee_id:
2887 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2888 "execution environment"
2889 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2890
2891 if primitive == "config":
2892 primitive_params = {"params": primitive_params}
2893
2894 while retries >= 0:
2895 try:
2896 output = await self.n2vc.exec_primitive(
2897 ee_id=ee_id,
2898 primitive_name=primitive,
2899 params_dict=primitive_params
2900 )
2901 # execution was OK
2902 break
2903 except Exception as e:
2904 retries -= 1
2905 if retries >= 0:
2906 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2907 # wait and retry
2908 await asyncio.sleep(retries_interval, loop=self.loop)
2909 else:
2910 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2911
2912 return output, 'OK'
2913
2914 except Exception as e:
2915 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2916
2917 async def action(self, nsr_id, nslcmop_id):
2918
2919 # Try to lock HA task here
2920 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2921 if not task_is_locked_by_me:
2922 return
2923
2924 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2925 self.logger.debug(logging_text + "Enter")
2926 # get all needed from database
2927 db_nsr = None
2928 db_nslcmop = None
2929 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2930 "_admin.current-operation": nslcmop_id,
2931 "_admin.operation-type": "action"}
2932 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2933 db_nslcmop_update = {}
2934 nslcmop_operation_state = None
2935 nslcmop_operation_state_detail = None
2936 exc = None
2937 try:
2938 # wait for any previous tasks in process
2939 step = "Waiting for previous operations to terminate"
2940 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2941
2942 self._write_ns_status(
2943 nsr_id=nsr_id,
2944 ns_state=None,
2945 current_operation="RUNNING ACTION",
2946 current_operation_id=nslcmop_id
2947 )
2948
2949 step = "Getting information from database"
2950 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2951 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2952
2953 nsr_deployed = db_nsr["_admin"].get("deployed")
2954 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2955 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2956 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2957 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2958 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2959
2960 if vnf_index:
2961 step = "Getting vnfr from database"
2962 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2963 step = "Getting vnfd from database"
2964 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2965 else:
2966 if db_nsr.get("nsd"):
2967 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2968 else:
2969 step = "Getting nsd from database"
2970 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2971
2972 # for backward compatibility
2973 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2974 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2975 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2976 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2977
2978 primitive = db_nslcmop["operationParams"]["primitive"]
2979 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2980
2981 # look for primitive
2982 config_primitive_desc = None
2983 if vdu_id:
2984 for vdu in get_iterable(db_vnfd, "vdu"):
2985 if vdu_id == vdu["id"]:
2986 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2987 if config_primitive["name"] == primitive:
2988 config_primitive_desc = config_primitive
2989 break
2990 elif kdu_name:
2991 self.logger.debug(logging_text + "Checking actions in KDUs")
2992 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2993 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2994 if primitive_params:
2995 desc_params.update(primitive_params)
2996 # TODO Check if we will need something at vnf level
2997 index = 0
2998 for kdu in get_iterable(nsr_deployed, "K8s"):
2999 if kdu_name == kdu["kdu-name"]:
3000 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
3001 "path": "_admin.deployed.K8s.{}".format(index)}
3002 if primitive == "upgrade":
3003 if desc_params.get("kdu_model"):
3004 kdu_model = desc_params.get("kdu_model")
3005 del desc_params["kdu_model"]
3006 else:
3007 kdu_model = kdu.get("kdu-model")
3008 parts = kdu_model.split(sep=":")
3009 if len(parts) == 2:
3010 kdu_model = parts[0]
3011
3012 if kdu.get("k8scluster-type") == "chart":
3013 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3014 kdu_instance=kdu.get("kdu-instance"),
3015 atomic=True, kdu_model=kdu_model,
3016 params=desc_params, db_dict=db_dict,
3017 timeout=300)
3018 elif kdu.get("k8scluster-type") == "juju":
3019 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
3020 kdu_instance=kdu.get("kdu-instance"),
3021 atomic=True, kdu_model=kdu_model,
3022 params=desc_params, db_dict=db_dict,
3023 timeout=300)
3024
3025 else:
3026 msg = "k8scluster-type not defined"
3027 raise LcmException(msg)
3028
3029 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
3030 break
3031 elif primitive == "rollback":
3032 if kdu.get("k8scluster-type") == "chart":
3033 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3034 kdu_instance=kdu.get("kdu-instance"),
3035 db_dict=db_dict)
3036 elif kdu.get("k8scluster-type") == "juju":
3037 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
3038 kdu_instance=kdu.get("kdu-instance"),
3039 db_dict=db_dict)
3040 else:
3041 msg = "k8scluster-type not defined"
3042 raise LcmException(msg)
3043 break
3044 elif primitive == "status":
3045 if kdu.get("k8scluster-type") == "chart":
3046 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3047 kdu_instance=kdu.get("kdu-instance"))
3048 elif kdu.get("k8scluster-type") == "juju":
3049 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
3050 kdu_instance=kdu.get("kdu-instance"))
3051 else:
3052 msg = "k8scluster-type not defined"
3053 raise LcmException(msg)
3054 break
3055 index += 1
3056
3057 else:
3058 raise LcmException("KDU '{}' not found".format(kdu_name))
3059 if output:
3060 db_nslcmop_update["detailed-status"] = output
3061 db_nslcmop_update["operationState"] = 'COMPLETED'
3062 db_nslcmop_update["statusEnteredTime"] = time()
3063 else:
3064 db_nslcmop_update["detailed-status"] = ''
3065 db_nslcmop_update["operationState"] = 'FAILED'
3066 db_nslcmop_update["statusEnteredTime"] = time()
3067 return
3068 elif vnf_index:
3069 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3070 if config_primitive["name"] == primitive:
3071 config_primitive_desc = config_primitive
3072 break
3073 else:
3074 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
3075 if config_primitive["name"] == primitive:
3076 config_primitive_desc = config_primitive
3077 break
3078
3079 if not config_primitive_desc:
3080 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3081 format(primitive))
3082
3083 desc_params = {}
3084 if vnf_index:
3085 if db_vnfr.get("additionalParamsForVnf"):
3086 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
3087 if vdu_id:
3088 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3089 if vdur.get("additionalParams"):
3090 desc_params = self._format_additional_params(vdur["additionalParams"])
3091 else:
3092 if db_nsr.get("additionalParamsForNs"):
3093 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
3094
3095 # TODO check if ns is in a proper status
3096 output, detail = await self._ns_execute_primitive(
3097 db_deployed=nsr_deployed,
3098 member_vnf_index=vnf_index,
3099 vdu_id=vdu_id,
3100 vdu_name=vdu_name,
3101 vdu_count_index=vdu_count_index,
3102 primitive=primitive,
3103 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
3104
3105 detailed_status = output
3106 if detail == 'OK':
3107 result = 'COMPLETED'
3108 else:
3109 result = 'FAILED'
3110
3111 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
3112 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
3113 db_nslcmop_update["statusEnteredTime"] = time()
3114 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
3115 return # database update is called inside finally
3116
3117 except (DbException, LcmException) as e:
3118 self.logger.error(logging_text + "Exit Exception {}".format(e))
3119 exc = e
3120 except asyncio.CancelledError:
3121 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3122 exc = "Operation was cancelled"
3123 except Exception as e:
3124 exc = traceback.format_exc()
3125 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3126 finally:
3127 if exc and db_nslcmop:
3128 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
3129 "FAILED {}: {}".format(step, exc)
3130 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3131 db_nslcmop_update["statusEnteredTime"] = time()
3132 try:
3133 if db_nslcmop_update:
3134 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3135 if db_nsr:
3136 db_nsr_update["_admin.nslcmop"] = None
3137 db_nsr_update["_admin.operation-type"] = None
3138 db_nsr_update["_admin.nslcmop"] = None
3139 db_nsr_update["_admin.current-operation"] = None
3140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3141 self._write_ns_status(
3142 nsr_id=nsr_id,
3143 ns_state=None,
3144 current_operation="IDLE",
3145 current_operation_id=None
3146 )
3147 if exc:
3148 self._write_op_status(
3149 op_id=nslcmop_id,
3150 error_message=nslcmop_operation_state_detail
3151 )
3152 except DbException as e:
3153 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3154 self.logger.debug(logging_text + "Exit")
3155 if nslcmop_operation_state:
3156 try:
3157 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3158 "operationState": nslcmop_operation_state},
3159 loop=self.loop)
3160 except Exception as e:
3161 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3162 self.logger.debug(logging_text + "Exit")
3163 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3164 return nslcmop_operation_state, nslcmop_operation_state_detail
3165
3166 async def scale(self, nsr_id, nslcmop_id):
3167
3168 # Try to lock HA task here
3169 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3170 if not task_is_locked_by_me:
3171 return
3172
3173 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3174 self.logger.debug(logging_text + "Enter")
3175 # get all needed from database
3176 db_nsr = None
3177 db_nslcmop = None
3178 db_nslcmop_update = {}
3179 nslcmop_operation_state = None
3180 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
3181 "_admin.current-operation": nslcmop_id,
3182 "_admin.operation-type": "scale"}
3183 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3184 exc = None
3185 # in case of error, indicates what part of scale was failed to put nsr at error status
3186 scale_process = None
3187 old_operational_status = ""
3188 old_config_status = ""
3189 vnfr_scaled = False
3190 try:
3191 # wait for any previous tasks in process
3192 step = "Waiting for previous operations to terminate"
3193 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3194
3195 self._write_ns_status(
3196 nsr_id=nsr_id,
3197 ns_state=None,
3198 current_operation="SCALING",
3199 current_operation_id=nslcmop_id
3200 )
3201
3202 step = "Getting nslcmop from database"
3203 self.logger.debug(step + " after having waited for previous tasks to be completed")
3204 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3205 step = "Getting nsr from database"
3206 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3207
3208 old_operational_status = db_nsr["operational-status"]
3209 old_config_status = db_nsr["config-status"]
3210 step = "Parsing scaling parameters"
3211 # self.logger.debug(step)
3212 db_nsr_update["operational-status"] = "scaling"
3213 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3214 nsr_deployed = db_nsr["_admin"].get("deployed")
3215
3216 #######
3217 nsr_deployed = db_nsr["_admin"].get("deployed")
3218 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3219 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3220 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3221 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3222 #######
3223
3224 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3225 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3226 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3227 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3228 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3229
3230 # for backward compatibility
3231 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3232 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3233 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3234 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3235
3236 step = "Getting vnfr from database"
3237 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3238 step = "Getting vnfd from database"
3239 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3240
3241 step = "Getting scaling-group-descriptor"
3242 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3243 if scaling_descriptor["name"] == scaling_group:
3244 break
3245 else:
3246 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3247 "at vnfd:scaling-group-descriptor".format(scaling_group))
3248
3249 # cooldown_time = 0
3250 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3251 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3252 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3253 # break
3254
3255 # TODO check if ns is in a proper status
3256 step = "Sending scale order to VIM"
3257 nb_scale_op = 0
3258 if not db_nsr["_admin"].get("scaling-group"):
3259 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3260 admin_scale_index = 0
3261 else:
3262 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3263 if admin_scale_info["name"] == scaling_group:
3264 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3265 break
3266 else: # not found, set index one plus last element and add new entry with the name
3267 admin_scale_index += 1
3268 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3269 RO_scaling_info = []
3270 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3271 if scaling_type == "SCALE_OUT":
3272 # count if max-instance-count is reached
3273 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3274 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3275 if nb_scale_op >= max_instance_count:
3276 raise LcmException("reached the limit of {} (max-instance-count) "
3277 "scaling-out operations for the "
3278 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3279
3280 nb_scale_op += 1
3281 vdu_scaling_info["scaling_direction"] = "OUT"
3282 vdu_scaling_info["vdu-create"] = {}
3283 for vdu_scale_info in scaling_descriptor["vdu"]:
3284 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3285 "type": "create", "count": vdu_scale_info.get("count", 1)})
3286 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3287
3288 elif scaling_type == "SCALE_IN":
3289 # count if min-instance-count is reached
3290 min_instance_count = 0
3291 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3292 min_instance_count = int(scaling_descriptor["min-instance-count"])
3293 if nb_scale_op <= min_instance_count:
3294 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3295 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3296 nb_scale_op -= 1
3297 vdu_scaling_info["scaling_direction"] = "IN"
3298 vdu_scaling_info["vdu-delete"] = {}
3299 for vdu_scale_info in scaling_descriptor["vdu"]:
3300 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3301 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3302 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3303
3304 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3305 vdu_create = vdu_scaling_info.get("vdu-create")
3306 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3307 if vdu_scaling_info["scaling_direction"] == "IN":
3308 for vdur in reversed(db_vnfr["vdur"]):
3309 if vdu_delete.get(vdur["vdu-id-ref"]):
3310 vdu_delete[vdur["vdu-id-ref"]] -= 1
3311 vdu_scaling_info["vdu"].append({
3312 "name": vdur["name"],
3313 "vdu_id": vdur["vdu-id-ref"],
3314 "interface": []
3315 })
3316 for interface in vdur["interfaces"]:
3317 vdu_scaling_info["vdu"][-1]["interface"].append({
3318 "name": interface["name"],
3319 "ip_address": interface["ip-address"],
3320 "mac_address": interface.get("mac-address"),
3321 })
3322 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3323
3324 # PRE-SCALE BEGIN
3325 step = "Executing pre-scale vnf-config-primitive"
3326 if scaling_descriptor.get("scaling-config-action"):
3327 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3328 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3329 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3330 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3331 step = db_nslcmop_update["detailed-status"] = \
3332 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3333
3334 # look for primitive
3335 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3336 if config_primitive["name"] == vnf_config_primitive:
3337 break
3338 else:
3339 raise LcmException(
3340 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3341 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3342 "primitive".format(scaling_group, config_primitive))
3343
3344 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3345 if db_vnfr.get("additionalParamsForVnf"):
3346 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3347
3348 scale_process = "VCA"
3349 db_nsr_update["config-status"] = "configuring pre-scaling"
3350 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3351
3352 # Pre-scale reintent check: Check if this sub-operation has been executed before
3353 op_index = self._check_or_add_scale_suboperation(
3354 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3355 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3356 # Skip sub-operation
3357 result = 'COMPLETED'
3358 result_detail = 'Done'
3359 self.logger.debug(logging_text +
3360 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3361 vnf_config_primitive, result, result_detail))
3362 else:
3363 if (op_index == self.SUBOPERATION_STATUS_NEW):
3364 # New sub-operation: Get index of this sub-operation
3365 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3366 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3367 format(vnf_config_primitive))
3368 else:
3369 # Reintent: Get registered params for this existing sub-operation
3370 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3371 vnf_index = op.get('member_vnf_index')
3372 vnf_config_primitive = op.get('primitive')
3373 primitive_params = op.get('primitive_params')
3374 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3375 format(vnf_config_primitive))
3376 # Execute the primitive, either with new (first-time) or registered (reintent) args
3377 result, result_detail = await self._ns_execute_primitive(
3378 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3379 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3380 vnf_config_primitive, result, result_detail))
3381 # Update operationState = COMPLETED | FAILED
3382 self._update_suboperation_status(
3383 db_nslcmop, op_index, result, result_detail)
3384
3385 if result == "FAILED":
3386 raise LcmException(result_detail)
3387 db_nsr_update["config-status"] = old_config_status
3388 scale_process = None
3389 # PRE-SCALE END
3390
3391 # SCALE RO - BEGIN
3392 # Should this block be skipped if 'RO_nsr_id' == None ?
3393 # if (RO_nsr_id and RO_scaling_info):
3394 if RO_scaling_info:
3395 scale_process = "RO"
3396 # Scale RO reintent check: Check if this sub-operation has been executed before
3397 op_index = self._check_or_add_scale_suboperation(
3398 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3399 if (op_index == self.SUBOPERATION_STATUS_SKIP):
3400 # Skip sub-operation
3401 result = 'COMPLETED'
3402 result_detail = 'Done'
3403 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3404 result, result_detail))
3405 else:
3406 if (op_index == self.SUBOPERATION_STATUS_NEW):
3407 # New sub-operation: Get index of this sub-operation
3408 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3409 self.logger.debug(logging_text + "New sub-operation RO")
3410 else:
3411 # Reintent: Get registered params for this existing sub-operation
3412 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3413 RO_nsr_id = op.get('RO_nsr_id')
3414 RO_scaling_info = op.get('RO_scaling_info')
3415 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
3416 vnf_config_primitive))
3417
3418 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3419 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3420 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3421 # wait until ready
3422 RO_nslcmop_id = RO_desc["instance_action_id"]
3423 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3424
3425 RO_task_done = False
3426 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3427 detailed_status_old = None
3428 self.logger.debug(logging_text + step)
3429
3430 deployment_timeout = 1 * 3600 # One hour
3431 while deployment_timeout > 0:
3432 if not RO_task_done:
3433 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3434 extra_item_id=RO_nslcmop_id)
3435
3436 # deploymentStatus
3437 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3438
3439 ns_status, ns_status_info = self.RO.check_action_status(desc)
3440 if ns_status == "ERROR":
3441 raise ROclient.ROClientException(ns_status_info)
3442 elif ns_status == "BUILD":
3443 detailed_status = step + "; {}".format(ns_status_info)
3444 elif ns_status == "ACTIVE":
3445 RO_task_done = True
3446 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
3447 self.logger.debug(logging_text + step)
3448 else:
3449 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3450 else:
3451
3452 if ns_status == "ERROR":
3453 raise ROclient.ROClientException(ns_status_info)
3454 elif ns_status == "BUILD":
3455 detailed_status = step + "; {}".format(ns_status_info)
3456 elif ns_status == "ACTIVE":
3457 step = detailed_status = \
3458 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3459 if not vnfr_scaled:
3460 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
3461 vnfr_scaled = True
3462 try:
3463 desc = await self.RO.show("ns", RO_nsr_id)
3464
3465 # deploymentStatus
3466 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3467
3468 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3469 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
3470 break
3471 except LcmExceptionNoMgmtIP:
3472 pass
3473 else:
3474 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3475 if detailed_status != detailed_status_old:
3476 self._update_suboperation_status(
3477 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3478 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3479 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3480
3481 await asyncio.sleep(5, loop=self.loop)
3482 deployment_timeout -= 5
3483 if deployment_timeout <= 0:
3484 self._update_suboperation_status(
3485 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3486 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3487
3488 # update VDU_SCALING_INFO with the obtained ip_addresses
3489 if vdu_scaling_info["scaling_direction"] == "OUT":
3490 for vdur in reversed(db_vnfr["vdur"]):
3491 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3492 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3493 vdu_scaling_info["vdu"].append({
3494 "name": vdur["name"],
3495 "vdu_id": vdur["vdu-id-ref"],
3496 "interface": []
3497 })
3498 for interface in vdur["interfaces"]:
3499 vdu_scaling_info["vdu"][-1]["interface"].append({
3500 "name": interface["name"],
3501 "ip_address": interface["ip-address"],
3502 "mac_address": interface.get("mac-address"),
3503 })
3504 del vdu_scaling_info["vdu-create"]
3505
3506 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3507 # SCALE RO - END
3508
3509 scale_process = None
3510 if db_nsr_update:
3511 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3512
3513 # POST-SCALE BEGIN
3514 # execute primitive service POST-SCALING
3515 step = "Executing post-scale vnf-config-primitive"
3516 if scaling_descriptor.get("scaling-config-action"):
3517 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3518 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3519 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3520 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3521 step = db_nslcmop_update["detailed-status"] = \
3522 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3523
3524 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3525 if db_vnfr.get("additionalParamsForVnf"):
3526 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3527
3528 # look for primitive
3529 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3530 if config_primitive["name"] == vnf_config_primitive:
3531 break
3532 else:
3533 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3534 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3535 "match any vnf-configuration:config-primitive".format(scaling_group,
3536 config_primitive))
3537 scale_process = "VCA"
3538 db_nsr_update["config-status"] = "configuring post-scaling"
3539 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3540
3541 # Post-scale reintent check: Check if this sub-operation has been executed before
3542 op_index = self._check_or_add_scale_suboperation(
3543 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3544 if op_index == self.SUBOPERATION_STATUS_SKIP:
3545 # Skip sub-operation
3546 result = 'COMPLETED'
3547 result_detail = 'Done'
3548 self.logger.debug(logging_text +
3549 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3550 format(vnf_config_primitive, result, result_detail))
3551 else:
3552 if op_index == self.SUBOPERATION_STATUS_NEW:
3553 # New sub-operation: Get index of this sub-operation
3554 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3555 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3556 format(vnf_config_primitive))
3557 else:
3558 # Reintent: Get registered params for this existing sub-operation
3559 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3560 vnf_index = op.get('member_vnf_index')
3561 vnf_config_primitive = op.get('primitive')
3562 primitive_params = op.get('primitive_params')
3563 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3564 format(vnf_config_primitive))
3565 # Execute the primitive, either with new (first-time) or registered (reintent) args
3566 result, result_detail = await self._ns_execute_primitive(
3567 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3568 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3569 vnf_config_primitive, result, result_detail))
3570 # Update operationState = COMPLETED | FAILED
3571 self._update_suboperation_status(
3572 db_nslcmop, op_index, result, result_detail)
3573
3574 if result == "FAILED":
3575 raise LcmException(result_detail)
3576 db_nsr_update["config-status"] = old_config_status
3577 scale_process = None
3578 # POST-SCALE END
3579
3580 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3581 db_nslcmop_update["statusEnteredTime"] = time()
3582 db_nslcmop_update["detailed-status"] = "done"
3583 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3584 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3585 else old_operational_status
3586 db_nsr_update["config-status"] = old_config_status
3587 return
3588 except (ROclient.ROClientException, DbException, LcmException) as e:
3589 self.logger.error(logging_text + "Exit Exception {}".format(e))
3590 exc = e
3591 except asyncio.CancelledError:
3592 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3593 exc = "Operation was cancelled"
3594 except Exception as e:
3595 exc = traceback.format_exc()
3596 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3597 finally:
3598 self._write_ns_status(
3599 nsr_id=nsr_id,
3600 ns_state=None,
3601 current_operation="IDLE",
3602 current_operation_id=None
3603 )
3604 if exc:
3605 if db_nslcmop:
3606 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3607 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3608 db_nslcmop_update["statusEnteredTime"] = time()
3609 if db_nsr:
3610 db_nsr_update["operational-status"] = old_operational_status
3611 db_nsr_update["config-status"] = old_config_status
3612 db_nsr_update["detailed-status"] = ""
3613 db_nsr_update["_admin.nslcmop"] = None
3614 if scale_process:
3615 if "VCA" in scale_process:
3616 db_nsr_update["config-status"] = "failed"
3617 if "RO" in scale_process:
3618 db_nsr_update["operational-status"] = "failed"
3619 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3620 exc)
3621 try:
3622 if db_nslcmop and db_nslcmop_update:
3623 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3624 if db_nsr:
3625 db_nsr_update["_admin.current-operation"] = None
3626 db_nsr_update["_admin.operation-type"] = None
3627 db_nsr_update["_admin.nslcmop"] = None
3628 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3629
3630 self._write_ns_status(
3631 nsr_id=nsr_id,
3632 ns_state=None,
3633 current_operation="IDLE",
3634 current_operation_id=None
3635 )
3636
3637 except DbException as e:
3638 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3639 if nslcmop_operation_state:
3640 try:
3641 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3642 "operationState": nslcmop_operation_state},
3643 loop=self.loop)
3644 # if cooldown_time:
3645 # await asyncio.sleep(cooldown_time, loop=self.loop)
3646 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3647 except Exception as e:
3648 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3649 self.logger.debug(logging_text + "Exit")
3650 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")