Provide more info upon instantiate task exceptions
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
28 from n2vc.k8s_helm_conn import K8sHelmConnector
29 from n2vc.k8s_juju_conn import K8sJujuConnector
30
31 from osm_common.dbbase import DbException
32 from osm_common.fsbase import FsException
33
34 from n2vc.n2vc_juju_conn import N2VCJujuConnector
35 from n2vc.exceptions import N2VCException
36
37 from copy import copy, deepcopy
38 from http import HTTPStatus
39 from time import time
40 from uuid import uuid4
41
42 __author__ = "Alfonso Tierno"
43
44
45 def get_iterable(in_dict, in_key):
46 """
47 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
48 :param in_dict: a dictionary
49 :param in_key: the key to look for at in_dict
50 :return: in_dict[in_var] or () if it is None or not present
51 """
52 if not in_dict.get(in_key):
53 return ()
54 return in_dict[in_key]
55
56
57 def populate_dict(target_dict, key_list, value):
58 """
59 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
60 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
61 :param target_dict: dictionary to be changed
62 :param key_list: list of keys to insert at target_dict
63 :param value:
64 :return: None
65 """
66 for key in key_list[0:-1]:
67 if key not in target_dict:
68 target_dict[key] = {}
69 target_dict = target_dict[key]
70 target_dict[key_list[-1]] = value
71
72
73 class NsLcm(LcmBase):
74 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
75 total_deploy_timeout = 2 * 3600 # global timeout for deployment
76 timeout_charm_delete = 10 * 60
77 timeout_primitive = 10 * 60 # timeout for primitive execution
78
79 SUBOPERATION_STATUS_NOT_FOUND = -1
80 SUBOPERATION_STATUS_NEW = -2
81 SUBOPERATION_STATUS_SKIP = -3
82
83 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
84 """
85 Init, Connect to database, filesystem storage, and messaging
86 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
87 :return: None
88 """
89 super().__init__(
90 db=db,
91 msg=msg,
92 fs=fs,
93 logger=logging.getLogger('lcm.ns')
94 )
95
96 self.loop = loop
97 self.lcm_tasks = lcm_tasks
98 self.ro_config = ro_config
99 self.vca_config = vca_config
100 if 'pubkey' in self.vca_config:
101 self.vca_config['public_key'] = self.vca_config['pubkey']
102 if 'cacert' in self.vca_config:
103 self.vca_config['ca_cert'] = self.vca_config['cacert']
104 if 'apiproxy' in self.vca_config:
105 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
106
107 # create N2VC connector
108 self.n2vc = N2VCJujuConnector(
109 db=self.db,
110 fs=self.fs,
111 log=self.logger,
112 loop=self.loop,
113 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
114 username=self.vca_config.get('user', None),
115 vca_config=self.vca_config,
116 on_update_db=self._on_update_n2vc_db,
117 # ca_cert=self.vca_config.get('cacert'),
118 # api_proxy=self.vca_config.get('apiproxy'),
119 )
120
121 self.k8sclusterhelm = K8sHelmConnector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helmpath"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 fs=self.fs,
134 log=self.logger,
135 db=self.db,
136 on_update_db=None,
137 )
138
139 # create RO client
140 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
141
142 def _on_update_n2vc_db(self, table, filter, path, updated_data):
143
144 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
145 .format(table, filter, path, updated_data))
146
147 return
148 # write NS status to database
149 # try:
150 # # nsrs_id = filter.get('_id')
151 # # print(nsrs_id)
152 # # get ns record
153 # nsr = self.db.get_one(table=table, q_filter=filter)
154 # # get VCA deployed list
155 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
156 # # get RO deployed
157 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
158 # for vca in vca_list:
159 # # status = vca.get('status')
160 # # print(status)
161 # # detailed_status = vca.get('detailed-status')
162 # # print(detailed_status)
163 # # for ro in ro_list:
164 # # print(ro)
165 #
166 # except Exception as e:
167 # self.logger.error('Error writing NS status to db: {}'.format(e))
168
169 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
170 """
171 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
172 :param vnfd: input vnfd
173 :param new_id: overrides vnf id if provided
174 :param additionalParams: Instantiation params for VNFs provided
175 :param nsrId: Id of the NSR
176 :return: copy of vnfd
177 """
178 try:
179 vnfd_RO = deepcopy(vnfd)
180 # remove unused by RO configuration, monitoring, scaling and internal keys
181 vnfd_RO.pop("_id", None)
182 vnfd_RO.pop("_admin", None)
183 vnfd_RO.pop("vnf-configuration", None)
184 vnfd_RO.pop("monitoring-param", None)
185 vnfd_RO.pop("scaling-group-descriptor", None)
186 vnfd_RO.pop("kdu", None)
187 vnfd_RO.pop("k8s-cluster", None)
188 if new_id:
189 vnfd_RO["id"] = new_id
190
191 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
192 for vdu in get_iterable(vnfd_RO, "vdu"):
193 cloud_init_file = None
194 if vdu.get("cloud-init-file"):
195 base_folder = vnfd["_admin"]["storage"]
196 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
197 vdu["cloud-init-file"])
198 with self.fs.file_open(cloud_init_file, "r") as ci_file:
199 cloud_init_content = ci_file.read()
200 vdu.pop("cloud-init-file", None)
201 elif vdu.get("cloud-init"):
202 cloud_init_content = vdu["cloud-init"]
203 else:
204 continue
205
206 env = Environment()
207 ast = env.parse(cloud_init_content)
208 mandatory_vars = meta.find_undeclared_variables(ast)
209 if mandatory_vars:
210 for var in mandatory_vars:
211 if not additionalParams or var not in additionalParams.keys():
212 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
213 "file, must be provided in the instantiation parameters inside the "
214 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
215 template = Template(cloud_init_content)
216 cloud_init_content = template.render(additionalParams or {})
217 vdu["cloud-init"] = cloud_init_content
218
219 return vnfd_RO
220 except FsException as e:
221 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
222 format(vnfd["id"], vdu["id"], cloud_init_file, e))
223 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
224 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
225 format(vnfd["id"], vdu["id"], e))
226
227 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
228 """
229 Creates a RO ns descriptor from OSM ns_instantiate params
230 :param ns_params: OSM instantiate params
231 :return: The RO ns descriptor
232 """
233 vim_2_RO = {}
234 wim_2_RO = {}
235 # TODO feature 1417: Check that no instantiation is set over PDU
236 # check if PDU forces a concrete vim-network-id and add it
237 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
238
239 def vim_account_2_RO(vim_account):
240 if vim_account in vim_2_RO:
241 return vim_2_RO[vim_account]
242
243 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
244 if db_vim["_admin"]["operationalState"] != "ENABLED":
245 raise LcmException("VIM={} is not available. operationalState={}".format(
246 vim_account, db_vim["_admin"]["operationalState"]))
247 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
248 vim_2_RO[vim_account] = RO_vim_id
249 return RO_vim_id
250
251 def wim_account_2_RO(wim_account):
252 if isinstance(wim_account, str):
253 if wim_account in wim_2_RO:
254 return wim_2_RO[wim_account]
255
256 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
257 if db_wim["_admin"]["operationalState"] != "ENABLED":
258 raise LcmException("WIM={} is not available. operationalState={}".format(
259 wim_account, db_wim["_admin"]["operationalState"]))
260 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
261 wim_2_RO[wim_account] = RO_wim_id
262 return RO_wim_id
263 else:
264 return wim_account
265
266 def ip_profile_2_RO(ip_profile):
267 RO_ip_profile = deepcopy((ip_profile))
268 if "dns-server" in RO_ip_profile:
269 if isinstance(RO_ip_profile["dns-server"], list):
270 RO_ip_profile["dns-address"] = []
271 for ds in RO_ip_profile.pop("dns-server"):
272 RO_ip_profile["dns-address"].append(ds['address'])
273 else:
274 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
275 if RO_ip_profile.get("ip-version") == "ipv4":
276 RO_ip_profile["ip-version"] = "IPv4"
277 if RO_ip_profile.get("ip-version") == "ipv6":
278 RO_ip_profile["ip-version"] = "IPv6"
279 if "dhcp-params" in RO_ip_profile:
280 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
281 return RO_ip_profile
282
283 if not ns_params:
284 return None
285 RO_ns_params = {
286 # "name": ns_params["nsName"],
287 # "description": ns_params.get("nsDescription"),
288 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
289 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
290 # "scenario": ns_params["nsdId"],
291 }
292
293 n2vc_key_list = n2vc_key_list or []
294 for vnfd_ref, vnfd in vnfd_dict.items():
295 vdu_needed_access = []
296 mgmt_cp = None
297 if vnfd.get("vnf-configuration"):
298 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
299 if ssh_required and vnfd.get("mgmt-interface"):
300 if vnfd["mgmt-interface"].get("vdu-id"):
301 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
302 elif vnfd["mgmt-interface"].get("cp"):
303 mgmt_cp = vnfd["mgmt-interface"]["cp"]
304
305 for vdu in vnfd.get("vdu", ()):
306 if vdu.get("vdu-configuration"):
307 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
308 if ssh_required:
309 vdu_needed_access.append(vdu["id"])
310 elif mgmt_cp:
311 for vdu_interface in vdu.get("interface"):
312 if vdu_interface.get("external-connection-point-ref") and \
313 vdu_interface["external-connection-point-ref"] == mgmt_cp:
314 vdu_needed_access.append(vdu["id"])
315 mgmt_cp = None
316 break
317
318 if vdu_needed_access:
319 for vnf_member in nsd.get("constituent-vnfd"):
320 if vnf_member["vnfd-id-ref"] != vnfd_ref:
321 continue
322 for vdu in vdu_needed_access:
323 populate_dict(RO_ns_params,
324 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
325 n2vc_key_list)
326
327 if ns_params.get("vduImage"):
328 RO_ns_params["vduImage"] = ns_params["vduImage"]
329
330 if ns_params.get("ssh_keys"):
331 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
332 for vnf_params in get_iterable(ns_params, "vnf"):
333 for constituent_vnfd in nsd["constituent-vnfd"]:
334 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
335 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
336 break
337 else:
338 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
339 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
340 if vnf_params.get("vimAccountId"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
342 vim_account_2_RO(vnf_params["vimAccountId"]))
343
344 for vdu_params in get_iterable(vnf_params, "vdu"):
345 # TODO feature 1417: check that this VDU exist and it is not a PDU
346 if vdu_params.get("volume"):
347 for volume_params in vdu_params["volume"]:
348 if volume_params.get("vim-volume-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
351 volume_params["vim-volume-id"])
352 if vdu_params.get("interface"):
353 for interface_params in vdu_params["interface"]:
354 if interface_params.get("ip-address"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "ip_address"),
358 interface_params["ip-address"])
359 if interface_params.get("mac-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_params["id"], "interfaces", interface_params["name"],
362 "mac_address"),
363 interface_params["mac-address"])
364 if interface_params.get("floating-ip-required"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_params["id"], "interfaces", interface_params["name"],
367 "floating-ip"),
368 interface_params["floating-ip-required"])
369
370 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
371 if internal_vld_params.get("vim-network-name"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
373 internal_vld_params["name"], "vim-network-name"),
374 internal_vld_params["vim-network-name"])
375 if internal_vld_params.get("vim-network-id"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-id"),
378 internal_vld_params["vim-network-id"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383 if internal_vld_params.get("provider-network"):
384
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
386 internal_vld_params["name"], "provider-network"),
387 internal_vld_params["provider-network"].copy())
388
389 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
390 # look for interface
391 iface_found = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for vdu_interface in vdu_descriptor["interface"]:
394 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
395 if icp_params.get("ip-address"):
396 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
397 vdu_descriptor["id"], "interfaces",
398 vdu_interface["name"], "ip_address"),
399 icp_params["ip-address"])
400
401 if icp_params.get("mac-address"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_descriptor["id"], "interfaces",
404 vdu_interface["name"], "mac_address"),
405 icp_params["mac-address"])
406 iface_found = True
407 break
408 if iface_found:
409 break
410 else:
411 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
412 "internal-vld:id-ref={} is not present at vnfd:internal-"
413 "connection-point".format(vnf_params["member-vnf-index"],
414 icp_params["id-ref"]))
415
416 for vld_params in get_iterable(ns_params, "vld"):
417 if "ip-profile" in vld_params:
418 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
419 ip_profile_2_RO(vld_params["ip-profile"]))
420
421 if vld_params.get("provider-network"):
422
423 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
424 vld_params["provider-network"].copy())
425
426 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
428 wim_account_2_RO(vld_params["wimAccountId"])),
429 if vld_params.get("vim-network-name"):
430 RO_vld_sites = []
431 if isinstance(vld_params["vim-network-name"], dict):
432 for vim_account, vim_net in vld_params["vim-network-name"].items():
433 RO_vld_sites.append({
434 "netmap-use": vim_net,
435 "datacenter": vim_account_2_RO(vim_account)
436 })
437 else: # isinstance str
438 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
439 if RO_vld_sites:
440 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
441
442 if vld_params.get("vim-network-id"):
443 RO_vld_sites = []
444 if isinstance(vld_params["vim-network-id"], dict):
445 for vim_account, vim_net in vld_params["vim-network-id"].items():
446 RO_vld_sites.append({
447 "netmap-use": vim_net,
448 "datacenter": vim_account_2_RO(vim_account)
449 })
450 else: # isinstance str
451 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
452 if RO_vld_sites:
453 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
454 if vld_params.get("ns-net"):
455 if isinstance(vld_params["ns-net"], dict):
456 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
457 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
458 if RO_vld_ns_net:
459 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
460 if "vnfd-connection-point-ref" in vld_params:
461 for cp_params in vld_params["vnfd-connection-point-ref"]:
462 # look for interface
463 for constituent_vnfd in nsd["constituent-vnfd"]:
464 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
465 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
466 break
467 else:
468 raise LcmException(
469 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
470 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
471 match_cp = False
472 for vdu_descriptor in vnf_descriptor["vdu"]:
473 for interface_descriptor in vdu_descriptor["interface"]:
474 if interface_descriptor.get("external-connection-point-ref") == \
475 cp_params["vnfd-connection-point-ref"]:
476 match_cp = True
477 break
478 if match_cp:
479 break
480 else:
481 raise LcmException(
482 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
483 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
484 cp_params["member-vnf-index-ref"],
485 cp_params["vnfd-connection-point-ref"],
486 vnf_descriptor["id"]))
487 if cp_params.get("ip-address"):
488 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
489 vdu_descriptor["id"], "interfaces",
490 interface_descriptor["name"], "ip_address"),
491 cp_params["ip-address"])
492 if cp_params.get("mac-address"):
493 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
494 vdu_descriptor["id"], "interfaces",
495 interface_descriptor["name"], "mac_address"),
496 cp_params["mac-address"])
497 return RO_ns_params
498
499 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
500 # make a copy to do not change
501 vdu_create = copy(vdu_create)
502 vdu_delete = copy(vdu_delete)
503
504 vdurs = db_vnfr.get("vdur")
505 if vdurs is None:
506 vdurs = []
507 vdu_index = len(vdurs)
508 while vdu_index:
509 vdu_index -= 1
510 vdur = vdurs[vdu_index]
511 if vdur.get("pdu-type"):
512 continue
513 vdu_id_ref = vdur["vdu-id-ref"]
514 if vdu_create and vdu_create.get(vdu_id_ref):
515 for index in range(0, vdu_create[vdu_id_ref]):
516 vdur = deepcopy(vdur)
517 vdur["_id"] = str(uuid4())
518 vdur["count-index"] += 1
519 vdurs.insert(vdu_index+1+index, vdur)
520 del vdu_create[vdu_id_ref]
521 if vdu_delete and vdu_delete.get(vdu_id_ref):
522 del vdurs[vdu_index]
523 vdu_delete[vdu_id_ref] -= 1
524 if not vdu_delete[vdu_id_ref]:
525 del vdu_delete[vdu_id_ref]
526 # check all operations are done
527 if vdu_create or vdu_delete:
528 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_create))
530 if vdu_delete:
531 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
532 vdu_delete))
533
534 vnfr_update = {"vdur": vdurs}
535 db_vnfr["vdur"] = vdurs
536 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
537
538 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
539 """
540 Updates database nsr with the RO info for the created vld
541 :param ns_update_nsr: dictionary to be filled with the updated info
542 :param db_nsr: content of db_nsr. This is also modified
543 :param nsr_desc_RO: nsr descriptor from RO
544 :return: Nothing, LcmException is raised on errors
545 """
546
547 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
548 for net_RO in get_iterable(nsr_desc_RO, "nets"):
549 if vld["id"] != net_RO.get("ns_net_osm_id"):
550 continue
551 vld["vim-id"] = net_RO.get("vim_net_id")
552 vld["name"] = net_RO.get("vim_name")
553 vld["status"] = net_RO.get("status")
554 vld["status-detailed"] = net_RO.get("error_msg")
555 ns_update_nsr["vld.{}".format(vld_index)] = vld
556 break
557 else:
558 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
559
560 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
561 """
562 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
563 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
564 :param nsr_desc_RO: nsr descriptor from RO
565 :return: Nothing, LcmException is raised on errors
566 """
567 for vnf_index, db_vnfr in db_vnfrs.items():
568 for vnf_RO in nsr_desc_RO["vnfs"]:
569 if vnf_RO["member_vnf_index"] != vnf_index:
570 continue
571 vnfr_update = {}
572 if vnf_RO.get("ip_address"):
573 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
574 elif not db_vnfr.get("ip-address"):
575 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
576
577 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
578 vdur_RO_count_index = 0
579 if vdur.get("pdu-type"):
580 continue
581 for vdur_RO in get_iterable(vnf_RO, "vms"):
582 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
583 continue
584 if vdur["count-index"] != vdur_RO_count_index:
585 vdur_RO_count_index += 1
586 continue
587 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
588 if vdur_RO.get("ip_address"):
589 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
590 else:
591 vdur["ip-address"] = None
592 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
593 vdur["name"] = vdur_RO.get("vim_name")
594 vdur["status"] = vdur_RO.get("status")
595 vdur["status-detailed"] = vdur_RO.get("error_msg")
596 for ifacer in get_iterable(vdur, "interfaces"):
597 for interface_RO in get_iterable(vdur_RO, "interfaces"):
598 if ifacer["name"] == interface_RO.get("internal_name"):
599 ifacer["ip-address"] = interface_RO.get("ip_address")
600 ifacer["mac-address"] = interface_RO.get("mac_address")
601 break
602 else:
603 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
604 "from VIM info"
605 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
606 vnfr_update["vdur.{}".format(vdu_index)] = vdur
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
610 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
611
612 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
613 for net_RO in get_iterable(nsr_desc_RO, "nets"):
614 if vld["id"] != net_RO.get("vnf_net_osm_id"):
615 continue
616 vld["vim-id"] = net_RO.get("vim_net_id")
617 vld["name"] = net_RO.get("vim_name")
618 vld["status"] = net_RO.get("status")
619 vld["status-detailed"] = net_RO.get("error_msg")
620 vnfr_update["vld.{}".format(vld_index)] = vld
621 break
622 else:
623 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
624 vnf_index, vld["id"]))
625
626 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
627 break
628
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
631
632 @staticmethod
633 def _get_ns_config_info(vca_deployed_list):
634 """
635 Generates a mapping between vnf,vdu elements and the N2VC id
636 :param vca_deployed_list: List of database _admin.deploy.VCA that contains this list
637 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
638 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
639 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
640 """
641 mapping = {}
642 ns_config_info = {"osm-config-mapping": mapping}
643 for vca in vca_deployed_list:
644 if not vca["member-vnf-index"]:
645 continue
646 if not vca["vdu_id"]:
647 mapping[vca["member-vnf-index"]] = vca["application"]
648 else:
649 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
650 vca["application"]
651 return ns_config_info
652
653 @staticmethod
654 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
655 """
656 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
657 primitives as verify-ssh-credentials, or config when needed
658 :param desc_primitive_list: information of the descriptor
659 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
660 this element contains a ssh public key
661 :return: The modified list. Can ba an empty list, but always a list
662 """
663 if desc_primitive_list:
664 primitive_list = desc_primitive_list.copy()
665 else:
666 primitive_list = []
667 # look for primitive config, and get the position. None if not present
668 config_position = None
669 for index, primitive in enumerate(primitive_list):
670 if primitive["name"] == "config":
671 config_position = index
672 break
673
674 # for NS, add always a config primitive if not present (bug 874)
675 if not vca_deployed["member-vnf-index"] and config_position is None:
676 primitive_list.insert(0, {"name": "config", "parameter": []})
677 config_position = 0
678 # for VNF/VDU add verify-ssh-credentials after config
679 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
680 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
681 return primitive_list
682
683 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
684 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
685
686 db_nsr_update = {}
687 RO_descriptor_number = 0 # number of descriptors created at RO
688 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
689 start_deploy = time()
690 vdu_flag = False # If any of the VNFDs has VDUs
691 ns_params = db_nslcmop.get("operationParams")
692
693 # deploy RO
694
695 # get vnfds, instantiate at RO
696
697 for c_vnf in nsd.get("constituent-vnfd", ()):
698 member_vnf_index = c_vnf["member-vnf-index"]
699 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
700 if vnfd.get("vdu"):
701 vdu_flag = True
702 vnfd_ref = vnfd["id"]
703 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
704 " RO".format(vnfd_ref, member_vnf_index)
705 # self.logger.debug(logging_text + step)
706 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
707 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
708 RO_descriptor_number += 1
709
710 # look position at deployed.RO.vnfd if not present it will be appended at the end
711 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
712 if vnf_deployed["member-vnf-index"] == member_vnf_index:
713 break
714 else:
715 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
716 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
717
718 # look if present
719 RO_update = {"member-vnf-index": member_vnf_index}
720 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
721 if vnfd_list:
722 RO_update["id"] = vnfd_list[0]["uuid"]
723 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
724 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
725 else:
726 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
727 get("additionalParamsForVnf"), nsr_id)
728 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
729 RO_update["id"] = desc["uuid"]
730 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
731 vnfd_ref, member_vnf_index, desc["uuid"]))
732 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
733 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
734 self.update_db_2("nsrs", nsr_id, db_nsr_update)
735 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
736
737 # create nsd at RO
738 nsd_ref = nsd["id"]
739 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
740 # self.logger.debug(logging_text + step)
741
742 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
743 RO_descriptor_number += 1
744 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
745 if nsd_list:
746 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
747 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
748 nsd_ref, RO_nsd_uuid))
749 else:
750 nsd_RO = deepcopy(nsd)
751 nsd_RO["id"] = RO_osm_nsd_id
752 nsd_RO.pop("_id", None)
753 nsd_RO.pop("_admin", None)
754 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
755 member_vnf_index = c_vnf["member-vnf-index"]
756 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
757 for c_vld in nsd_RO.get("vld", ()):
758 for cp in c_vld.get("vnfd-connection-point-ref", ()):
759 member_vnf_index = cp["member-vnf-index-ref"]
760 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
761
762 desc = await self.RO.create("nsd", descriptor=nsd_RO)
763 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
764 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
765 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
767 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
768
769 # Crate ns at RO
770 # if present use it unless in error status
771 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
772 if RO_nsr_id:
773 try:
774 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
775 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
776 desc = await self.RO.show("ns", RO_nsr_id)
777 except ROclient.ROClientException as e:
778 if e.http_code != HTTPStatus.NOT_FOUND:
779 raise
780 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
781 if RO_nsr_id:
782 ns_status, ns_status_info = self.RO.check_ns_status(desc)
783 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
784 if ns_status == "ERROR":
785 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
786 .format(RO_nsr_id)
787 self.logger.debug(logging_text + step)
788 await self.RO.delete("ns", RO_nsr_id)
789 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
790 if not RO_nsr_id:
791 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
792 # self.logger.debug(logging_text + step)
793
794 # check if VIM is creating and wait look if previous tasks in process
795 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
796 if task_dependency:
797 step = "Waiting for related tasks to be completed: {}".format(task_name)
798 self.logger.debug(logging_text + step)
799 await asyncio.wait(task_dependency, timeout=3600)
800 if ns_params.get("vnf"):
801 for vnf in ns_params["vnf"]:
802 if "vimAccountId" in vnf:
803 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
804 vnf["vimAccountId"])
805 if task_dependency:
806 step = "Waiting for related tasks to be completed: {}".format(task_name)
807 self.logger.debug(logging_text + step)
808 await asyncio.wait(task_dependency, timeout=3600)
809
810 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
811
812 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
813
814 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
815 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
816 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
817 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
818 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
819 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
820 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
822 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
823
824 # wait until NS is ready
825 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
826 detailed_status_old = None
827 self.logger.debug(logging_text + step)
828
829 while time() <= start_deploy + self.total_deploy_timeout:
830 desc = await self.RO.show("ns", RO_nsr_id)
831 ns_status, ns_status_info = self.RO.check_ns_status(desc)
832 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
833 if ns_status == "ERROR":
834 raise ROclient.ROClientException(ns_status_info)
835 elif ns_status == "BUILD":
836 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
837 elif ns_status == "ACTIVE":
838 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
839 try:
840 if vdu_flag:
841 self.ns_update_vnfr(db_vnfrs, desc)
842 break
843 except LcmExceptionNoMgmtIP:
844 pass
845 else:
846 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
847 if detailed_status != detailed_status_old:
848 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
849 self.update_db_2("nsrs", nsr_id, db_nsr_update)
850 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
851 await asyncio.sleep(5, loop=self.loop)
852 else: # total_deploy_timeout
853 raise ROclient.ROClientException("Timeout waiting ns to be ready")
854
855 step = "Updating NSR"
856 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
857
858 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
859 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
860 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 self.update_db_2("nsrs", nsr_id, db_nsr_update)
862 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
863
864 step = "Deployed at VIM"
865 self.logger.debug(logging_text + step)
866
867 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
868 """
869 Wait for ip addres at RO, and optionally, insert public key in virtual machine
870 :param logging_text: prefix use for logging
871 :param nsr_id:
872 :param vnfr_id:
873 :param vdu_id:
874 :param vdu_index:
875 :param pub_key: public ssh key to inject, None to skip
876 :param user: user to apply the public ssh key
877 :return: IP address
878 """
879
880 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
881 ro_nsr_id = None
882 ip_address = None
883 nb_tries = 0
884 target_vdu_id = None
885 ro_retries = 0
886
887 while True:
888
889 ro_retries += 1
890 if ro_retries >= 360: # 1 hour
891 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
892
893 await asyncio.sleep(10, loop=self.loop)
894 # wait until NS is deployed at RO
895 if not ro_nsr_id:
896 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
897 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
898 if not ro_nsr_id:
899 continue
900
901 # get ip address
902 if not target_vdu_id:
903 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
904
905 if not vdu_id: # for the VNF case
906 ip_address = db_vnfr.get("ip-address")
907 if not ip_address:
908 continue
909 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
910 else: # VDU case
911 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
912 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
913
914 if not vdur:
915 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
916 vnfr_id, vdu_id, vdu_index
917 ))
918
919 if vdur.get("status") == "ACTIVE":
920 ip_address = vdur.get("ip-address")
921 if not ip_address:
922 continue
923 target_vdu_id = vdur["vdu-id-ref"]
924 elif vdur.get("status") == "ERROR":
925 raise LcmException("Cannot inject ssh-key because target VM is in error state")
926
927 if not target_vdu_id:
928 continue
929
930 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
931
932 # inject public key into machine
933 if pub_key and user:
934 # self.logger.debug(logging_text + "Inserting RO key")
935 try:
936 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
937 result_dict = await self.RO.create_action(
938 item="ns",
939 item_id_name=ro_nsr_id,
940 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
941 )
942 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
943 if not result_dict or not isinstance(result_dict, dict):
944 raise LcmException("Unknown response from RO when injecting key")
945 for result in result_dict.values():
946 if result.get("vim_result") == 200:
947 break
948 else:
949 raise ROclient.ROClientException("error injecting key: {}".format(
950 result.get("description")))
951 break
952 except ROclient.ROClientException as e:
953 if not nb_tries:
954 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
955 format(e, 20*10))
956 nb_tries += 1
957 if nb_tries >= 20:
958 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
959 else:
960 break
961
962 return ip_address
963
964 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
965 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
966 nsr_id = db_nsr["_id"]
967 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
968 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
969 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
970 db_dict = {
971 'collection': 'nsrs',
972 'filter': {'_id': nsr_id},
973 'path': db_update_entry
974 }
975 logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"],
976 vdu_id, vdu_index)
977
978 step = ""
979 try:
980 vnfr_id = None
981 if db_vnfr:
982 vnfr_id = db_vnfr["_id"]
983
984 namespace = "{nsi}.{ns}".format(
985 nsi=nsi_id if nsi_id else "",
986 ns=nsr_id)
987 if vnfr_id:
988 namespace += "." + vnfr_id
989 if vdu_id:
990 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
991
992 # Get artifact path
993 artifact_path = "{}/{}/charms/{}".format(
994 base_folder["folder"],
995 base_folder["pkg-dir"],
996 config_descriptor["juju"]["charm"]
997 )
998
999 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1000 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1001 is_proxy_charm = False
1002
1003 # n2vc_redesign STEP 3.1
1004
1005 # find old ee_id if exists
1006 ee_id = vca_deployed.get("ee_id")
1007
1008 # create or register execution environment in VCA
1009 if is_proxy_charm:
1010 step = "create execution environment"
1011 self.logger.debug(logging_text + step)
1012 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1013 reuse_ee_id=ee_id,
1014 db_dict=db_dict)
1015 else:
1016 step = "Waiting to VM being up and getting IP address"
1017 self.logger.debug(logging_text + step)
1018 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1019 user=None, pub_key=None)
1020 credentials = {"hostname": rw_mgmt_ip}
1021 # get username
1022 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1023 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1024 # merged. Meanwhile let's get username from initial-config-primitive
1025 if not username and config_descriptor.get("initial-config-primitive"):
1026 for config_primitive in config_descriptor["initial-config-primitive"]:
1027 for param in config_primitive.get("parameter", ()):
1028 if param["name"] == "ssh-username":
1029 username = param["value"]
1030 break
1031 if not username:
1032 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1033 "'config-access.ssh-access.default-user'")
1034 credentials["username"] = username
1035 # n2vc_redesign STEP 3.2
1036
1037 step = "register execution environment {}".format(credentials)
1038 self.logger.debug(logging_text + step)
1039 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1040 db_dict=db_dict)
1041
1042 # for compatibility with MON/POL modules, the need model and application name at database
1043 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1044 ee_id_parts = ee_id.split('.')
1045 model_name = ee_id_parts[0]
1046 application_name = ee_id_parts[1]
1047 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1048 db_update_entry + "application": application_name,
1049 db_update_entry + "ee_id": ee_id})
1050
1051 # n2vc_redesign STEP 3.3
1052
1053 step = "Install configuration Software"
1054 # TODO check if already done
1055 self.logger.debug(logging_text + step)
1056 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1057
1058 # if SSH access is required, then get execution environment SSH public
1059 if is_proxy_charm: # if native charm we have waited already to VM be UP
1060 pub_key = None
1061 user = None
1062 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1063 # Needed to inject a ssh key
1064 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1065 step = "Install configuration Software, getting public ssh key"
1066 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1067
1068 step = "Insert public key into VM"
1069 else:
1070 step = "Waiting to VM being up and getting IP address"
1071 self.logger.debug(logging_text + step)
1072
1073 # n2vc_redesign STEP 5.1
1074 # wait for RO (ip-address) Insert pub_key into VM
1075 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1076 user=user, pub_key=pub_key)
1077
1078 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1079
1080 # store rw_mgmt_ip in deploy params for later replacement
1081 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1082
1083 # n2vc_redesign STEP 6 Execute initial config primitive
1084 step = 'execute initial config primitive'
1085 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1086
1087 # sort initial config primitives by 'seq'
1088 try:
1089 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1090 except Exception as e:
1091 self.logger.error(logging_text + step + ": " + str(e))
1092
1093 # add config if not present for NS charm
1094 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1095 vca_deployed)
1096
1097 for initial_config_primitive in initial_config_primitive_list:
1098 # adding information on the vca_deployed if it is a NS execution environment
1099 if not vca_deployed["member-vnf-index"]:
1100 deploy_params["ns_config_info"] = self._get_ns_config_info(vca_deployed_list)
1101 # TODO check if already done
1102 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1103
1104 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1105 self.logger.debug(logging_text + step)
1106 await self.n2vc.exec_primitive(
1107 ee_id=ee_id,
1108 primitive_name=initial_config_primitive["name"],
1109 params_dict=primitive_params_,
1110 db_dict=db_dict
1111 )
1112 # TODO register in database that primitive is done
1113
1114 step = "instantiated at VCA"
1115 self.logger.debug(logging_text + step)
1116
1117 except Exception as e: # TODO not use Exception but N2VC exception
1118 raise Exception("{} {}".format(step, e)) from e
1119 # TODO raise N2VC exception with 'step' extra information
1120
1121 async def instantiate(self, nsr_id, nslcmop_id):
1122 """
1123
1124 :param nsr_id: ns instance to deploy
1125 :param nslcmop_id: operation to run
1126 :return:
1127 """
1128
1129 # Try to lock HA task here
1130 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1131 if not task_is_locked_by_me:
1132 self.logger.debug('instantiate() task is not locked by me')
1133 return
1134
1135 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1136 self.logger.debug(logging_text + "Enter")
1137
1138 # get all needed from database
1139
1140 # database nsrs record
1141 db_nsr = None
1142
1143 # database nslcmops record
1144 db_nslcmop = None
1145
1146 # update operation on nsrs
1147 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1148 "_admin.current-operation": nslcmop_id,
1149 "_admin.operation-type": "instantiate"}
1150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1151
1152 # update operation on nslcmops
1153 db_nslcmop_update = {}
1154
1155 nslcmop_operation_state = None
1156 db_vnfrs = {} # vnf's info indexed by member-index
1157 # n2vc_info = {}
1158 task_instantiation_list = []
1159 task_instantiation_info = {} # from task to info text
1160 exc = None
1161 try:
1162 # wait for any previous tasks in process
1163 step = "Waiting for previous operations to terminate"
1164 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1165
1166 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1167
1168 # read from db: operation
1169 step = "Getting nslcmop={} from db".format(nslcmop_id)
1170 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1171
1172 # read from db: ns
1173 step = "Getting nsr={} from db".format(nsr_id)
1174 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1175 # nsd is replicated into ns (no db read)
1176 nsd = db_nsr["nsd"]
1177 # nsr_name = db_nsr["name"] # TODO short-name??
1178
1179 # read from db: vnf's of this ns
1180 step = "Getting vnfrs from db"
1181 self.logger.debug(logging_text + step)
1182 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1183
1184 # read from db: vnfd's for every vnf
1185 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1186 db_vnfds = {} # every vnfd data indexed by vnf id
1187 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1188
1189 # for each vnf in ns, read vnfd
1190 for vnfr in db_vnfrs_list:
1191 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1192 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1193 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1194 # if we haven't this vnfd, read it from db
1195 if vnfd_id not in db_vnfds:
1196 # read from cb
1197 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1198 self.logger.debug(logging_text + step)
1199 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1200
1201 # store vnfd
1202 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1203 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1204 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1205
1206 # Get or generates the _admin.deployed.VCA list
1207 vca_deployed_list = None
1208 if db_nsr["_admin"].get("deployed"):
1209 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1210 if vca_deployed_list is None:
1211 vca_deployed_list = []
1212 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1213 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1214 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1215 elif isinstance(vca_deployed_list, dict):
1216 # maintain backward compatibility. Change a dict to list at database
1217 vca_deployed_list = list(vca_deployed_list.values())
1218 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1219 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1220
1221 db_nsr_update["detailed-status"] = "creating"
1222 db_nsr_update["operational-status"] = "init"
1223
1224 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1225 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1226 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1227
1228 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1229 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1230 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1231 self.logger.debug(logging_text + "Before deploy_kdus")
1232 # Call to deploy_kdus in case exists the "vdu:kdu" param
1233 task_kdu = asyncio.ensure_future(
1234 self.deploy_kdus(
1235 logging_text=logging_text,
1236 nsr_id=nsr_id,
1237 db_nsr=db_nsr,
1238 db_vnfrs=db_vnfrs,
1239 )
1240 )
1241 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1242 task_instantiation_info[task_kdu] = "Deploy KDUs"
1243 task_instantiation_list.append(task_kdu)
1244 # n2vc_redesign STEP 1 Get VCA public ssh-key
1245 # feature 1429. Add n2vc public key to needed VMs
1246 n2vc_key = self.n2vc.get_public_key()
1247 n2vc_key_list = [n2vc_key]
1248 if self.vca_config.get("public_key"):
1249 n2vc_key_list.append(self.vca_config["public_key"])
1250
1251 # n2vc_redesign STEP 2 Deploy Network Scenario
1252 task_ro = asyncio.ensure_future(
1253 self.instantiate_RO(
1254 logging_text=logging_text,
1255 nsr_id=nsr_id,
1256 nsd=nsd,
1257 db_nsr=db_nsr,
1258 db_nslcmop=db_nslcmop,
1259 db_vnfrs=db_vnfrs,
1260 db_vnfds_ref=db_vnfds_ref,
1261 n2vc_key_list=n2vc_key_list
1262 )
1263 )
1264 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1265 task_instantiation_info[task_ro] = "Deploy at VIM"
1266 task_instantiation_list.append(task_ro)
1267
1268 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1269 step = "Looking for needed vnfd to configure with proxy charm"
1270 self.logger.debug(logging_text + step)
1271
1272 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1273 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1274 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1275 vnfd_id = c_vnf["vnfd-id-ref"]
1276 vnfd = db_vnfds_ref[vnfd_id]
1277 member_vnf_index = str(c_vnf["member-vnf-index"])
1278 db_vnfr = db_vnfrs[member_vnf_index]
1279 base_folder = vnfd["_admin"]["storage"]
1280 vdu_id = None
1281 vdu_index = 0
1282 vdu_name = None
1283 kdu_name = None
1284
1285 # Get additional parameters
1286 deploy_params = {}
1287 if db_vnfr.get("additionalParamsForVnf"):
1288 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1289
1290 descriptor_config = vnfd.get("vnf-configuration")
1291 if descriptor_config and descriptor_config.get("juju"):
1292 self._deploy_n2vc(
1293 logging_text=logging_text,
1294 db_nsr=db_nsr,
1295 db_vnfr=db_vnfr,
1296 nslcmop_id=nslcmop_id,
1297 nsr_id=nsr_id,
1298 nsi_id=nsi_id,
1299 vnfd_id=vnfd_id,
1300 vdu_id=vdu_id,
1301 kdu_name=kdu_name,
1302 member_vnf_index=member_vnf_index,
1303 vdu_index=vdu_index,
1304 vdu_name=vdu_name,
1305 deploy_params=deploy_params,
1306 descriptor_config=descriptor_config,
1307 base_folder=base_folder,
1308 task_instantiation_list=task_instantiation_list,
1309 task_instantiation_info=task_instantiation_info
1310 )
1311
1312 # Deploy charms for each VDU that supports one.
1313 for vdud in get_iterable(vnfd, 'vdu'):
1314 vdu_id = vdud["id"]
1315 descriptor_config = vdud.get('vdu-configuration')
1316 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1317 if vdur.get("additionalParams"):
1318 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1319 else:
1320 deploy_params_vdu = deploy_params
1321 if descriptor_config and descriptor_config.get("juju"):
1322 # look for vdu index in the db_vnfr["vdu"] section
1323 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1324 # if vdur["vdu-id-ref"] == vdu_id:
1325 # break
1326 # else:
1327 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1328 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1329 # vdu_name = vdur.get("name")
1330 vdu_name = None
1331 kdu_name = None
1332 for vdu_index in range(int(vdud.get("count", 1))):
1333 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1334 self._deploy_n2vc(
1335 logging_text=logging_text,
1336 db_nsr=db_nsr,
1337 db_vnfr=db_vnfr,
1338 nslcmop_id=nslcmop_id,
1339 nsr_id=nsr_id,
1340 nsi_id=nsi_id,
1341 vnfd_id=vnfd_id,
1342 vdu_id=vdu_id,
1343 kdu_name=kdu_name,
1344 member_vnf_index=member_vnf_index,
1345 vdu_index=vdu_index,
1346 vdu_name=vdu_name,
1347 deploy_params=deploy_params_vdu,
1348 descriptor_config=descriptor_config,
1349 base_folder=base_folder,
1350 task_instantiation_list=task_instantiation_list,
1351 task_instantiation_info=task_instantiation_info
1352 )
1353 for kdud in get_iterable(vnfd, 'kdu'):
1354 kdu_name = kdud["name"]
1355 descriptor_config = kdud.get('kdu-configuration')
1356 if descriptor_config and descriptor_config.get("juju"):
1357 vdu_id = None
1358 vdu_index = 0
1359 vdu_name = None
1360 # look for vdu index in the db_vnfr["vdu"] section
1361 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1362 # if vdur["vdu-id-ref"] == vdu_id:
1363 # break
1364 # else:
1365 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1366 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1367 # vdu_name = vdur.get("name")
1368 # vdu_name = None
1369
1370 self._deploy_n2vc(
1371 logging_text=logging_text,
1372 db_nsr=db_nsr,
1373 db_vnfr=db_vnfr,
1374 nslcmop_id=nslcmop_id,
1375 nsr_id=nsr_id,
1376 nsi_id=nsi_id,
1377 vnfd_id=vnfd_id,
1378 vdu_id=vdu_id,
1379 kdu_name=kdu_name,
1380 member_vnf_index=member_vnf_index,
1381 vdu_index=vdu_index,
1382 vdu_name=vdu_name,
1383 deploy_params=deploy_params,
1384 descriptor_config=descriptor_config,
1385 base_folder=base_folder,
1386 task_instantiation_list=task_instantiation_list,
1387 task_instantiation_info=task_instantiation_info
1388 )
1389
1390 # Check if this NS has a charm configuration
1391 descriptor_config = nsd.get("ns-configuration")
1392 if descriptor_config and descriptor_config.get("juju"):
1393 vnfd_id = None
1394 db_vnfr = None
1395 member_vnf_index = None
1396 vdu_id = None
1397 kdu_name = None
1398 vdu_index = 0
1399 vdu_name = None
1400
1401 # Get additional parameters
1402 deploy_params = {}
1403 if db_nsr.get("additionalParamsForNs"):
1404 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1405 base_folder = nsd["_admin"]["storage"]
1406 self._deploy_n2vc(
1407 logging_text=logging_text,
1408 db_nsr=db_nsr,
1409 db_vnfr=db_vnfr,
1410 nslcmop_id=nslcmop_id,
1411 nsr_id=nsr_id,
1412 nsi_id=nsi_id,
1413 vnfd_id=vnfd_id,
1414 vdu_id=vdu_id,
1415 kdu_name=kdu_name,
1416 member_vnf_index=member_vnf_index,
1417 vdu_index=vdu_index,
1418 vdu_name=vdu_name,
1419 deploy_params=deploy_params,
1420 descriptor_config=descriptor_config,
1421 base_folder=base_folder,
1422 task_instantiation_list=task_instantiation_list,
1423 task_instantiation_info=task_instantiation_info
1424 )
1425
1426 # Wait until all tasks of "task_instantiation_list" have been finished
1427
1428 # while time() <= start_deploy + self.total_deploy_timeout:
1429 error_text_list = []
1430 timeout = 3600 # time() - start_deploy
1431 if task_instantiation_list:
1432 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1433 if pending:
1434 for task in pending:
1435 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1436 for task in done:
1437 if task.cancelled():
1438 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1439 elif task.done():
1440 exc = task.exception()
1441 if exc:
1442 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1443 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1444 else:
1445 error_text_list.append(task_instantiation_info[task] + ": " + "".
1446 join(traceback.format_exception(None, exc, exc.__traceback__)))
1447
1448 if error_text_list:
1449 error_text = "\n".join(error_text_list)
1450 db_nsr_update["config-status"] = "failed"
1451 db_nsr_update["detailed-status"] = error_text
1452 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1453 db_nslcmop_update["detailed-status"] = error_text
1454 db_nslcmop_update["statusEnteredTime"] = time()
1455 else:
1456 # all is done
1457 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1458 db_nslcmop_update["statusEnteredTime"] = time()
1459 db_nslcmop_update["detailed-status"] = "done"
1460 db_nsr_update["config-status"] = "configured"
1461 db_nsr_update["detailed-status"] = "done"
1462
1463 except (ROclient.ROClientException, DbException, LcmException) as e:
1464 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1465 exc = e
1466 except asyncio.CancelledError:
1467 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1468 exc = "Operation was cancelled"
1469 except Exception as e:
1470 exc = traceback.format_exc()
1471 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1472 exc_info=True)
1473 finally:
1474 if exc:
1475 if db_nsr:
1476 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1477 db_nsr_update["operational-status"] = "failed"
1478 if db_nslcmop:
1479 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1480 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1481 db_nslcmop_update["statusEnteredTime"] = time()
1482 try:
1483 if db_nsr:
1484 db_nsr_update["_admin.nslcmop"] = None
1485 db_nsr_update["_admin.current-operation"] = None
1486 db_nsr_update["_admin.operation-type"] = None
1487 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1488 if db_nslcmop_update:
1489 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1490 except DbException as e:
1491 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1492 if nslcmop_operation_state:
1493 try:
1494 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1495 "operationState": nslcmop_operation_state},
1496 loop=self.loop)
1497 except Exception as e:
1498 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1499
1500 self.logger.debug(logging_text + "Exit")
1501 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1502
1503 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1504 # Launch kdus if present in the descriptor
1505
1506 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1507
1508 def _get_cluster_id(cluster_id, cluster_type):
1509 nonlocal k8scluster_id_2_uuic
1510 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1511 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1512
1513 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1514 if not db_k8scluster:
1515 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1516 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1517 if not k8s_id:
1518 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1519 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1520 return k8s_id
1521
1522 logging_text += "Deploy kdus: "
1523 try:
1524 db_nsr_update = {"_admin.deployed.K8s": []}
1525 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1526
1527 # Look for all vnfds
1528 pending_tasks = {}
1529 index = 0
1530 for vnfr_data in db_vnfrs.values():
1531 for kdur in get_iterable(vnfr_data, "kdur"):
1532 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1533 kdumodel = None
1534 k8sclustertype = None
1535 error_text = None
1536 cluster_uuid = None
1537 if kdur.get("helm-chart"):
1538 kdumodel = kdur["helm-chart"]
1539 k8sclustertype = "chart"
1540 k8sclustertype_full = "helm-chart"
1541 elif kdur.get("juju-bundle"):
1542 kdumodel = kdur["juju-bundle"]
1543 k8sclustertype = "juju"
1544 k8sclustertype_full = "juju-bundle"
1545 else:
1546 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1547 " running"
1548 try:
1549 if not error_text:
1550 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1551 except LcmException as e:
1552 error_text = str(e)
1553 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1554
1555 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1556 "k8scluster-type": k8sclustertype,
1557 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1558 if error_text:
1559 k8s_instace_info["detailed-status"] = error_text
1560 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1561 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1562 if error_text:
1563 continue
1564
1565 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1566 "{}".format(index)}
1567 if k8sclustertype == "chart":
1568 task = asyncio.ensure_future(
1569 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1570 params=desc_params, db_dict=db_dict, timeout=3600)
1571 )
1572 else:
1573 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1574 atomic=True, params=desc_params,
1575 db_dict=db_dict, timeout=600)
1576
1577 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1578 index += 1
1579 if not pending_tasks:
1580 return
1581 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1582 pending_list = list(pending_tasks.keys())
1583 while pending_list:
1584 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1585 return_when=asyncio.FIRST_COMPLETED)
1586 if not done_list: # timeout
1587 for task in pending_list:
1588 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1589 break
1590 for task in done_list:
1591 exc = task.exception()
1592 if exc:
1593 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1594 else:
1595 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1596
1597 except Exception as e:
1598 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1599 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1600 finally:
1601 # TODO Write in data base
1602 if db_nsr_update:
1603 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1604
1605 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1606 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1607 base_folder, task_instantiation_list, task_instantiation_info):
1608 # launch instantiate_N2VC in a asyncio task and register task object
1609 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1610 # if not found, create one entry and update database
1611
1612 # fill db_nsr._admin.deployed.VCA.<index>
1613 vca_index = -1
1614 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1615 if not vca_deployed:
1616 continue
1617 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1618 vca_deployed.get("vdu_id") == vdu_id and \
1619 vca_deployed.get("kdu_name") == kdu_name and \
1620 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1621 break
1622 else:
1623 # not found, create one.
1624 vca_deployed = {
1625 "member-vnf-index": member_vnf_index,
1626 "vdu_id": vdu_id,
1627 "kdu_name": kdu_name,
1628 "vdu_count_index": vdu_index,
1629 "operational-status": "init", # TODO revise
1630 "detailed-status": "", # TODO revise
1631 "step": "initial-deploy", # TODO revise
1632 "vnfd_id": vnfd_id,
1633 "vdu_name": vdu_name,
1634 }
1635 vca_index += 1
1636 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1637 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1638
1639 # Launch task
1640 task_n2vc = asyncio.ensure_future(
1641 self.instantiate_N2VC(
1642 logging_text=logging_text,
1643 vca_index=vca_index,
1644 nsi_id=nsi_id,
1645 db_nsr=db_nsr,
1646 db_vnfr=db_vnfr,
1647 vdu_id=vdu_id,
1648 kdu_name=kdu_name,
1649 vdu_index=vdu_index,
1650 deploy_params=deploy_params,
1651 config_descriptor=descriptor_config,
1652 base_folder=base_folder,
1653 )
1654 )
1655 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1656 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1657 task_instantiation_list.append(task_n2vc)
1658
1659 # Check if this VNFD has a configured terminate action
1660 def _has_terminate_config_primitive(self, vnfd):
1661 vnf_config = vnfd.get("vnf-configuration")
1662 if vnf_config and vnf_config.get("terminate-config-primitive"):
1663 return True
1664 else:
1665 return False
1666
1667 @staticmethod
1668 def _get_terminate_config_primitive_seq_list(vnfd):
1669 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1670 # No need to check for existing primitive twice, already done before
1671 vnf_config = vnfd.get("vnf-configuration")
1672 seq_list = vnf_config.get("terminate-config-primitive")
1673 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1674 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1675 return seq_list_sorted
1676
1677 @staticmethod
1678 def _create_nslcmop(nsr_id, operation, params):
1679 """
1680 Creates a ns-lcm-opp content to be stored at database.
1681 :param nsr_id: internal id of the instance
1682 :param operation: instantiate, terminate, scale, action, ...
1683 :param params: user parameters for the operation
1684 :return: dictionary following SOL005 format
1685 """
1686 # Raise exception if invalid arguments
1687 if not (nsr_id and operation and params):
1688 raise LcmException(
1689 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1690 now = time()
1691 _id = str(uuid4())
1692 nslcmop = {
1693 "id": _id,
1694 "_id": _id,
1695 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1696 "operationState": "PROCESSING",
1697 "statusEnteredTime": now,
1698 "nsInstanceId": nsr_id,
1699 "lcmOperationType": operation,
1700 "startTime": now,
1701 "isAutomaticInvocation": False,
1702 "operationParams": params,
1703 "isCancelPending": False,
1704 "links": {
1705 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1706 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1707 }
1708 }
1709 return nslcmop
1710
1711 def _format_additional_params(self, params):
1712 params = params or {}
1713 for key, value in params.items():
1714 if str(value).startswith("!!yaml "):
1715 params[key] = yaml.safe_load(value[7:])
1716 return params
1717
1718 def _get_terminate_primitive_params(self, seq, vnf_index):
1719 primitive = seq.get('name')
1720 primitive_params = {}
1721 params = {
1722 "member_vnf_index": vnf_index,
1723 "primitive": primitive,
1724 "primitive_params": primitive_params,
1725 }
1726 desc_params = {}
1727 return self._map_primitive_params(seq, params, desc_params)
1728
1729 # sub-operations
1730
1731 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1732 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1733 if (op.get('operationState') == 'COMPLETED'):
1734 # b. Skip sub-operation
1735 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1736 return self.SUBOPERATION_STATUS_SKIP
1737 else:
1738 # c. Reintent executing sub-operation
1739 # The sub-operation exists, and operationState != 'COMPLETED'
1740 # Update operationState = 'PROCESSING' to indicate a reintent.
1741 operationState = 'PROCESSING'
1742 detailed_status = 'In progress'
1743 self._update_suboperation_status(
1744 db_nslcmop, op_index, operationState, detailed_status)
1745 # Return the sub-operation index
1746 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1747 # with arguments extracted from the sub-operation
1748 return op_index
1749
1750 # Find a sub-operation where all keys in a matching dictionary must match
1751 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1752 def _find_suboperation(self, db_nslcmop, match):
1753 if (db_nslcmop and match):
1754 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1755 for i, op in enumerate(op_list):
1756 if all(op.get(k) == match[k] for k in match):
1757 return i
1758 return self.SUBOPERATION_STATUS_NOT_FOUND
1759
1760 # Update status for a sub-operation given its index
1761 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1762 # Update DB for HA tasks
1763 q_filter = {'_id': db_nslcmop['_id']}
1764 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1765 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1766 self.db.set_one("nslcmops",
1767 q_filter=q_filter,
1768 update_dict=update_dict,
1769 fail_on_empty=False)
1770
1771 # Add sub-operation, return the index of the added sub-operation
1772 # Optionally, set operationState, detailed-status, and operationType
1773 # Status and type are currently set for 'scale' sub-operations:
1774 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1775 # 'detailed-status' : status message
1776 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1777 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1778 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1779 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1780 RO_nsr_id=None, RO_scaling_info=None):
1781 if not (db_nslcmop):
1782 return self.SUBOPERATION_STATUS_NOT_FOUND
1783 # Get the "_admin.operations" list, if it exists
1784 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1785 op_list = db_nslcmop_admin.get('operations')
1786 # Create or append to the "_admin.operations" list
1787 new_op = {'member_vnf_index': vnf_index,
1788 'vdu_id': vdu_id,
1789 'vdu_count_index': vdu_count_index,
1790 'primitive': primitive,
1791 'primitive_params': mapped_primitive_params}
1792 if operationState:
1793 new_op['operationState'] = operationState
1794 if detailed_status:
1795 new_op['detailed-status'] = detailed_status
1796 if operationType:
1797 new_op['lcmOperationType'] = operationType
1798 if RO_nsr_id:
1799 new_op['RO_nsr_id'] = RO_nsr_id
1800 if RO_scaling_info:
1801 new_op['RO_scaling_info'] = RO_scaling_info
1802 if not op_list:
1803 # No existing operations, create key 'operations' with current operation as first list element
1804 db_nslcmop_admin.update({'operations': [new_op]})
1805 op_list = db_nslcmop_admin.get('operations')
1806 else:
1807 # Existing operations, append operation to list
1808 op_list.append(new_op)
1809
1810 db_nslcmop_update = {'_admin.operations': op_list}
1811 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1812 op_index = len(op_list) - 1
1813 return op_index
1814
1815 # Helper methods for scale() sub-operations
1816
1817 # pre-scale/post-scale:
1818 # Check for 3 different cases:
1819 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1820 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1821 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1822 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1823 operationType, RO_nsr_id=None, RO_scaling_info=None):
1824 # Find this sub-operation
1825 if (RO_nsr_id and RO_scaling_info):
1826 operationType = 'SCALE-RO'
1827 match = {
1828 'member_vnf_index': vnf_index,
1829 'RO_nsr_id': RO_nsr_id,
1830 'RO_scaling_info': RO_scaling_info,
1831 }
1832 else:
1833 match = {
1834 'member_vnf_index': vnf_index,
1835 'primitive': vnf_config_primitive,
1836 'primitive_params': primitive_params,
1837 'lcmOperationType': operationType
1838 }
1839 op_index = self._find_suboperation(db_nslcmop, match)
1840 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1841 # a. New sub-operation
1842 # The sub-operation does not exist, add it.
1843 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1844 # The following parameters are set to None for all kind of scaling:
1845 vdu_id = None
1846 vdu_count_index = None
1847 vdu_name = None
1848 if (RO_nsr_id and RO_scaling_info):
1849 vnf_config_primitive = None
1850 primitive_params = None
1851 else:
1852 RO_nsr_id = None
1853 RO_scaling_info = None
1854 # Initial status for sub-operation
1855 operationState = 'PROCESSING'
1856 detailed_status = 'In progress'
1857 # Add sub-operation for pre/post-scaling (zero or more operations)
1858 self._add_suboperation(db_nslcmop,
1859 vnf_index,
1860 vdu_id,
1861 vdu_count_index,
1862 vdu_name,
1863 vnf_config_primitive,
1864 primitive_params,
1865 operationState,
1866 detailed_status,
1867 operationType,
1868 RO_nsr_id,
1869 RO_scaling_info)
1870 return self.SUBOPERATION_STATUS_NEW
1871 else:
1872 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1873 # or op_index (operationState != 'COMPLETED')
1874 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1875
1876 # Helper methods for terminate()
1877
1878 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1879 """ Create a primitive with params from VNFD
1880 Called from terminate() before deleting instance
1881 Calls action() to execute the primitive """
1882 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1883 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1884 db_vnfds = {}
1885 # Loop over VNFRs
1886 for vnfr in db_vnfrs_list:
1887 vnfd_id = vnfr["vnfd-id"]
1888 vnf_index = vnfr["member-vnf-index-ref"]
1889 if vnfd_id not in db_vnfds:
1890 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1891 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1892 db_vnfds[vnfd_id] = vnfd
1893 vnfd = db_vnfds[vnfd_id]
1894 if not self._has_terminate_config_primitive(vnfd):
1895 continue
1896 # Get the primitive's sorted sequence list
1897 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1898 for seq in seq_list:
1899 # For each sequence in list, get primitive and call _ns_execute_primitive()
1900 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1901 vnf_index, seq.get("name"))
1902 self.logger.debug(logging_text + step)
1903 # Create the primitive for each sequence, i.e. "primitive": "touch"
1904 primitive = seq.get('name')
1905 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1906 # The following 3 parameters are currently set to None for 'terminate':
1907 # vdu_id, vdu_count_index, vdu_name
1908 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1909 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1910 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1911 # Add sub-operation
1912 self._add_suboperation(db_nslcmop,
1913 nslcmop_id,
1914 vnf_index,
1915 vdu_id,
1916 vdu_count_index,
1917 vdu_name,
1918 primitive,
1919 mapped_primitive_params)
1920 # Sub-operations: Call _ns_execute_primitive() instead of action()
1921 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1922 # nsr_deployed = db_nsr["_admin"]["deployed"]
1923
1924 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1925 # nsr_id, nslcmop_terminate_action_id)
1926 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1927 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1928 # if result not in result_ok:
1929 # raise LcmException(
1930 # "terminate_primitive_action for vnf_member_index={}",
1931 # " primitive={} fails with error {}".format(
1932 # vnf_index, seq.get("name"), result_detail))
1933
1934 # TODO: find ee_id
1935 ee_id = None
1936 try:
1937 await self.n2vc.exec_primitive(
1938 ee_id=ee_id,
1939 primitive_name=primitive,
1940 params_dict=mapped_primitive_params
1941 )
1942 except Exception as e:
1943 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
1944 raise LcmException(
1945 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
1946 .format(vnf_index, seq.get("name"), e),
1947 )
1948
1949 async def terminate(self, nsr_id, nslcmop_id):
1950
1951 # Try to lock HA task here
1952 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1953 if not task_is_locked_by_me:
1954 return
1955
1956 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1957 self.logger.debug(logging_text + "Enter")
1958 db_nsr = None
1959 db_nslcmop = None
1960 exc = None
1961 failed_detail = [] # annotates all failed error messages
1962 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1963 "_admin.current-operation": nslcmop_id,
1964 "_admin.operation-type": "terminate"}
1965 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1966 db_nslcmop_update = {}
1967 nslcmop_operation_state = None
1968 autoremove = False # autoremove after terminated
1969 pending_tasks = []
1970 try:
1971 # wait for any previous tasks in process
1972 step = "Waiting for previous operations to terminate"
1973 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
1974
1975 step = "Getting nslcmop={} from db".format(nslcmop_id)
1976 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1977 step = "Getting nsr={} from db".format(nsr_id)
1978 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1979 # nsd = db_nsr["nsd"]
1980 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1981 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1982 return
1983 # #TODO check if VIM is creating and wait
1984 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1985 # Call internal terminate action
1986 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
1987
1988 pending_tasks = []
1989
1990 db_nsr_update["operational-status"] = "terminating"
1991 db_nsr_update["config-status"] = "terminating"
1992
1993 # remove NS
1994 try:
1995 step = "delete execution environment"
1996 self.logger.debug(logging_text + step)
1997
1998 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
1999 pending_tasks.append(task_delete_ee)
2000 except Exception as e:
2001 msg = "Failed while deleting NS in VCA: {}".format(e)
2002 self.logger.error(msg)
2003 failed_detail.append(msg)
2004
2005 try:
2006 # Delete from k8scluster
2007 step = "delete kdus"
2008 self.logger.debug(logging_text + step)
2009 # print(nsr_deployed)
2010 if nsr_deployed:
2011 for kdu in nsr_deployed.get("K8s", ()):
2012 kdu_instance = kdu.get("kdu-instance")
2013 if not kdu_instance:
2014 continue
2015 if kdu.get("k8scluster-type") == "chart":
2016 task_delete_kdu_instance = asyncio.ensure_future(
2017 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2018 kdu_instance=kdu_instance))
2019 elif kdu.get("k8scluster-type") == "juju":
2020 task_delete_kdu_instance = asyncio.ensure_future(
2021 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2022 kdu_instance=kdu_instance))
2023 else:
2024 self.error(logging_text + "Unknown k8s deployment type {}".
2025 format(kdu.get("k8scluster-type")))
2026 continue
2027 pending_tasks.append(task_delete_kdu_instance)
2028 except LcmException as e:
2029 msg = "Failed while deleting KDUs from NS: {}".format(e)
2030 self.logger.error(msg)
2031 failed_detail.append(msg)
2032
2033 # remove from RO
2034 RO_fail = False
2035
2036 # Delete ns
2037 RO_nsr_id = RO_delete_action = None
2038 if nsr_deployed and nsr_deployed.get("RO"):
2039 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2040 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2041 try:
2042 if RO_nsr_id:
2043 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2044 "Deleting ns from VIM"
2045 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2046 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2047 self.logger.debug(logging_text + step)
2048 desc = await self.RO.delete("ns", RO_nsr_id)
2049 RO_delete_action = desc["action_id"]
2050 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2051 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2052 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2053 if RO_delete_action:
2054 # wait until NS is deleted from VIM
2055 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2056 format(RO_nsr_id, RO_delete_action)
2057 detailed_status_old = None
2058 self.logger.debug(logging_text + step)
2059
2060 delete_timeout = 20 * 60 # 20 minutes
2061 while delete_timeout > 0:
2062 desc = await self.RO.show(
2063 "ns",
2064 item_id_name=RO_nsr_id,
2065 extra_item="action",
2066 extra_item_id=RO_delete_action)
2067 ns_status, ns_status_info = self.RO.check_action_status(desc)
2068 if ns_status == "ERROR":
2069 raise ROclient.ROClientException(ns_status_info)
2070 elif ns_status == "BUILD":
2071 detailed_status = step + "; {}".format(ns_status_info)
2072 elif ns_status == "ACTIVE":
2073 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2074 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2075 break
2076 else:
2077 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2078 if detailed_status != detailed_status_old:
2079 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2080 db_nsr_update["detailed-status"] = detailed_status
2081 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2082 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2083 await asyncio.sleep(5, loop=self.loop)
2084 delete_timeout -= 5
2085 else: # delete_timeout <= 0:
2086 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2087
2088 except ROclient.ROClientException as e:
2089 if e.http_code == 404: # not found
2090 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2091 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2092 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2093 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2094 elif e.http_code == 409: # conflict
2095 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2096 self.logger.debug(logging_text + failed_detail[-1])
2097 RO_fail = True
2098 else:
2099 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2100 self.logger.error(logging_text + failed_detail[-1])
2101 RO_fail = True
2102
2103 # Delete nsd
2104 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2105 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2106 try:
2107 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2108 "Deleting nsd from RO"
2109 await self.RO.delete("nsd", RO_nsd_id)
2110 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2111 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2112 except ROclient.ROClientException as e:
2113 if e.http_code == 404: # not found
2114 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2115 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2116 elif e.http_code == 409: # conflict
2117 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2118 self.logger.debug(logging_text + failed_detail[-1])
2119 RO_fail = True
2120 else:
2121 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2122 self.logger.error(logging_text + failed_detail[-1])
2123 RO_fail = True
2124
2125 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2126 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2127 if not vnf_deployed or not vnf_deployed["id"]:
2128 continue
2129 try:
2130 RO_vnfd_id = vnf_deployed["id"]
2131 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2132 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2133 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2134 await self.RO.delete("vnfd", RO_vnfd_id)
2135 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2136 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2137 except ROclient.ROClientException as e:
2138 if e.http_code == 404: # not found
2139 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2140 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2141 elif e.http_code == 409: # conflict
2142 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2143 self.logger.debug(logging_text + failed_detail[-1])
2144 else:
2145 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2146 self.logger.error(logging_text + failed_detail[-1])
2147
2148 if failed_detail:
2149 self.logger.error(logging_text + " ;".join(failed_detail))
2150 db_nsr_update["operational-status"] = "failed"
2151 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2152 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2153 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2154 db_nslcmop_update["statusEnteredTime"] = time()
2155 else:
2156 db_nsr_update["operational-status"] = "terminated"
2157 db_nsr_update["detailed-status"] = "Done"
2158 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2159 db_nslcmop_update["detailed-status"] = "Done"
2160 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2161 db_nslcmop_update["statusEnteredTime"] = time()
2162 if db_nslcmop["operationParams"].get("autoremove"):
2163 autoremove = True
2164
2165 except (ROclient.ROClientException, DbException, LcmException) as e:
2166 self.logger.error(logging_text + "Exit Exception {}".format(e))
2167 exc = e
2168 except asyncio.CancelledError:
2169 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2170 exc = "Operation was cancelled"
2171 except Exception as e:
2172 exc = traceback.format_exc()
2173 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2174 finally:
2175 if exc and db_nslcmop:
2176 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2177 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2178 db_nslcmop_update["statusEnteredTime"] = time()
2179 try:
2180 if db_nslcmop and db_nslcmop_update:
2181 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2182 if db_nsr:
2183 db_nsr_update["_admin.nslcmop"] = None
2184 db_nsr_update["_admin.current-operation"] = None
2185 db_nsr_update["_admin.operation-type"] = None
2186 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2187 except DbException as e:
2188 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2189 if nslcmop_operation_state:
2190 try:
2191 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2192 "operationState": nslcmop_operation_state,
2193 "autoremove": autoremove},
2194 loop=self.loop)
2195 except Exception as e:
2196 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2197
2198 # wait for pending tasks
2199 done = None
2200 pending = None
2201 if pending_tasks:
2202 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2203 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2204 if not pending:
2205 self.logger.debug(logging_text + 'All tasks finished...')
2206 else:
2207 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2208
2209 self.logger.debug(logging_text + "Exit")
2210 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2211
2212 @staticmethod
2213 def _map_primitive_params(primitive_desc, params, instantiation_params):
2214 """
2215 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2216 The default-value is used. If it is between < > it look for a value at instantiation_params
2217 :param primitive_desc: portion of VNFD/NSD that describes primitive
2218 :param params: Params provided by user
2219 :param instantiation_params: Instantiation params provided by user
2220 :return: a dictionary with the calculated params
2221 """
2222 calculated_params = {}
2223 for parameter in primitive_desc.get("parameter", ()):
2224 param_name = parameter["name"]
2225 if param_name in params:
2226 calculated_params[param_name] = params[param_name]
2227 elif "default-value" in parameter or "value" in parameter:
2228 if "value" in parameter:
2229 calculated_params[param_name] = parameter["value"]
2230 else:
2231 calculated_params[param_name] = parameter["default-value"]
2232 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2233 and calculated_params[param_name].endswith(">"):
2234 if calculated_params[param_name][1:-1] in instantiation_params:
2235 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2236 else:
2237 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2238 format(calculated_params[param_name], primitive_desc["name"]))
2239 else:
2240 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2241 format(param_name, primitive_desc["name"]))
2242
2243 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2244 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2245 width=256)
2246 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2247 calculated_params[param_name] = calculated_params[param_name][7:]
2248
2249 # add always ns_config_info if primitive name is config
2250 if primitive_desc["name"] == "config":
2251 if "ns_config_info" in instantiation_params:
2252 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2253 return calculated_params
2254
2255 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2256 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2257
2258 # find vca_deployed record for this action
2259 try:
2260 for vca_deployed in db_deployed["VCA"]:
2261 if not vca_deployed:
2262 continue
2263 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2264 continue
2265 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2266 continue
2267 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2268 continue
2269 break
2270 else:
2271 # vca_deployed not found
2272 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2273 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2274
2275 # get ee_id
2276 ee_id = vca_deployed.get("ee_id")
2277 if not ee_id:
2278 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2279 "execution environment"
2280 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2281
2282 if primitive == "config":
2283 primitive_params = {"params": primitive_params}
2284
2285 while retries >= 0:
2286 try:
2287 output = await self.n2vc.exec_primitive(
2288 ee_id=ee_id,
2289 primitive_name=primitive,
2290 params_dict=primitive_params
2291 )
2292 # execution was OK
2293 break
2294 except Exception as e:
2295 retries -= 1
2296 if retries >= 0:
2297 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2298 # wait and retry
2299 await asyncio.sleep(retries_interval, loop=self.loop)
2300 else:
2301 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2302
2303 return output, 'OK'
2304
2305 except Exception as e:
2306 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2307
2308 async def action(self, nsr_id, nslcmop_id):
2309
2310 # Try to lock HA task here
2311 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2312 if not task_is_locked_by_me:
2313 return
2314
2315 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2316 self.logger.debug(logging_text + "Enter")
2317 # get all needed from database
2318 db_nsr = None
2319 db_nslcmop = None
2320 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2321 "_admin.current-operation": nslcmop_id,
2322 "_admin.operation-type": "action"}
2323 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2324 db_nslcmop_update = {}
2325 nslcmop_operation_state = None
2326 nslcmop_operation_state_detail = None
2327 exc = None
2328 try:
2329 # wait for any previous tasks in process
2330 step = "Waiting for previous operations to terminate"
2331 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2332
2333 step = "Getting information from database"
2334 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2335 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2336
2337 nsr_deployed = db_nsr["_admin"].get("deployed")
2338 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2339 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2340 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2341 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2342 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2343
2344 if vnf_index:
2345 step = "Getting vnfr from database"
2346 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2347 step = "Getting vnfd from database"
2348 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2349 else:
2350 if db_nsr.get("nsd"):
2351 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2352 else:
2353 step = "Getting nsd from database"
2354 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2355
2356 # for backward compatibility
2357 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2358 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2359 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2360 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2361
2362 primitive = db_nslcmop["operationParams"]["primitive"]
2363 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2364
2365 # look for primitive
2366 config_primitive_desc = None
2367 if vdu_id:
2368 for vdu in get_iterable(db_vnfd, "vdu"):
2369 if vdu_id == vdu["id"]:
2370 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2371 if config_primitive["name"] == primitive:
2372 config_primitive_desc = config_primitive
2373 break
2374 elif kdu_name:
2375 self.logger.debug(logging_text + "Checking actions in KDUs")
2376 kdur = next((x for x in db_vnfr["kdur"] if x["kdu_name"] == kdu_name), None)
2377 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2378 if primitive_params:
2379 desc_params.update(primitive_params)
2380 # TODO Check if we will need something at vnf level
2381 index = 0
2382 for kdu in get_iterable(nsr_deployed, "K8s"):
2383 if kdu_name == kdu["kdu-name"]:
2384 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2385 "path": "_admin.deployed.K8s.{}".format(index)}
2386 if primitive == "upgrade":
2387 if desc_params.get("kdu_model"):
2388 kdu_model = desc_params.get("kdu_model")
2389 del desc_params["kdu_model"]
2390 else:
2391 kdu_model = kdu.get("kdu-model")
2392 parts = kdu_model.split(sep=":")
2393 if len(parts) == 2:
2394 kdu_model = parts[0]
2395
2396 if kdu.get("k8scluster-type") == "chart":
2397 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2398 kdu_instance=kdu.get("kdu-instance"),
2399 atomic=True, kdu_model=kdu_model,
2400 params=desc_params, db_dict=db_dict,
2401 timeout=300)
2402 elif kdu.get("k8scluster-type") == "juju":
2403 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2404 kdu_instance=kdu.get("kdu-instance"),
2405 atomic=True, kdu_model=kdu_model,
2406 params=desc_params, db_dict=db_dict,
2407 timeout=300)
2408
2409 else:
2410 msg = "k8scluster-type not defined"
2411 raise LcmException(msg)
2412
2413 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2414 break
2415 elif primitive == "rollback":
2416 if kdu.get("k8scluster-type") == "chart":
2417 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2418 kdu_instance=kdu.get("kdu-instance"),
2419 db_dict=db_dict)
2420 elif kdu.get("k8scluster-type") == "juju":
2421 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2422 kdu_instance=kdu.get("kdu-instance"),
2423 db_dict=db_dict)
2424 else:
2425 msg = "k8scluster-type not defined"
2426 raise LcmException(msg)
2427 break
2428 elif primitive == "status":
2429 if kdu.get("k8scluster-type") == "chart":
2430 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2431 kdu_instance=kdu.get("kdu-instance"))
2432 elif kdu.get("k8scluster-type") == "juju":
2433 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2434 kdu_instance=kdu.get("kdu-instance"))
2435 else:
2436 msg = "k8scluster-type not defined"
2437 raise LcmException(msg)
2438 break
2439 index += 1
2440
2441 else:
2442 raise LcmException("KDU '{}' not found".format(kdu_name))
2443 if output:
2444 db_nslcmop_update["detailed-status"] = output
2445 db_nslcmop_update["operationState"] = 'COMPLETED'
2446 db_nslcmop_update["statusEnteredTime"] = time()
2447 else:
2448 db_nslcmop_update["detailed-status"] = ''
2449 db_nslcmop_update["operationState"] = 'FAILED'
2450 db_nslcmop_update["statusEnteredTime"] = time()
2451 return
2452 elif vnf_index:
2453 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2454 if config_primitive["name"] == primitive:
2455 config_primitive_desc = config_primitive
2456 break
2457 else:
2458 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2459 if config_primitive["name"] == primitive:
2460 config_primitive_desc = config_primitive
2461 break
2462
2463 if not config_primitive_desc:
2464 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2465 format(primitive))
2466
2467 desc_params = {}
2468 if vnf_index:
2469 if db_vnfr.get("additionalParamsForVnf"):
2470 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2471 if vdu_id:
2472 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2473 if vdur.get("additionalParams"):
2474 desc_params = self._format_additional_params(vdur["additionalParams"])
2475 else:
2476 if db_nsr.get("additionalParamsForNs"):
2477 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2478
2479 # TODO check if ns is in a proper status
2480 output, detail = await self._ns_execute_primitive(
2481 db_deployed=nsr_deployed,
2482 member_vnf_index=vnf_index,
2483 vdu_id=vdu_id,
2484 vdu_name=vdu_name,
2485 vdu_count_index=vdu_count_index,
2486 primitive=primitive,
2487 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2488
2489 detailed_status = output
2490 if detail == 'OK':
2491 result = 'COMPLETED'
2492 else:
2493 result = 'FAILED'
2494
2495 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2496 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2497 db_nslcmop_update["statusEnteredTime"] = time()
2498 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2499 return # database update is called inside finally
2500
2501 except (DbException, LcmException) as e:
2502 self.logger.error(logging_text + "Exit Exception {}".format(e))
2503 exc = e
2504 except asyncio.CancelledError:
2505 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2506 exc = "Operation was cancelled"
2507 except Exception as e:
2508 exc = traceback.format_exc()
2509 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2510 finally:
2511 if exc and db_nslcmop:
2512 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2513 "FAILED {}: {}".format(step, exc)
2514 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2515 db_nslcmop_update["statusEnteredTime"] = time()
2516 try:
2517 if db_nslcmop_update:
2518 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2519 if db_nsr:
2520 db_nsr_update["_admin.nslcmop"] = None
2521 db_nsr_update["_admin.operation-type"] = None
2522 db_nsr_update["_admin.nslcmop"] = None
2523 db_nsr_update["_admin.current-operation"] = None
2524 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2525 except DbException as e:
2526 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2527 self.logger.debug(logging_text + "Exit")
2528 if nslcmop_operation_state:
2529 try:
2530 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2531 "operationState": nslcmop_operation_state},
2532 loop=self.loop)
2533 except Exception as e:
2534 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2535 self.logger.debug(logging_text + "Exit")
2536 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2537 return nslcmop_operation_state, nslcmop_operation_state_detail
2538
2539 async def scale(self, nsr_id, nslcmop_id):
2540
2541 # Try to lock HA task here
2542 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2543 if not task_is_locked_by_me:
2544 return
2545
2546 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2547 self.logger.debug(logging_text + "Enter")
2548 # get all needed from database
2549 db_nsr = None
2550 db_nslcmop = None
2551 db_nslcmop_update = {}
2552 nslcmop_operation_state = None
2553 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2554 "_admin.current-operation": nslcmop_id,
2555 "_admin.operation-type": "scale"}
2556 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2557 exc = None
2558 # in case of error, indicates what part of scale was failed to put nsr at error status
2559 scale_process = None
2560 old_operational_status = ""
2561 old_config_status = ""
2562 vnfr_scaled = False
2563 try:
2564 # wait for any previous tasks in process
2565 step = "Waiting for previous operations to terminate"
2566 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2567
2568 step = "Getting nslcmop from database"
2569 self.logger.debug(step + " after having waited for previous tasks to be completed")
2570 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2571 step = "Getting nsr from database"
2572 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2573
2574 old_operational_status = db_nsr["operational-status"]
2575 old_config_status = db_nsr["config-status"]
2576 step = "Parsing scaling parameters"
2577 # self.logger.debug(step)
2578 db_nsr_update["operational-status"] = "scaling"
2579 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2580 nsr_deployed = db_nsr["_admin"].get("deployed")
2581
2582 #######
2583 nsr_deployed = db_nsr["_admin"].get("deployed")
2584 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2585 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2586 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2587 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2588 #######
2589
2590 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2591 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2592 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2593 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2594 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2595
2596 # for backward compatibility
2597 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2598 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2599 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2600 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2601
2602 step = "Getting vnfr from database"
2603 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2604 step = "Getting vnfd from database"
2605 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2606
2607 step = "Getting scaling-group-descriptor"
2608 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2609 if scaling_descriptor["name"] == scaling_group:
2610 break
2611 else:
2612 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2613 "at vnfd:scaling-group-descriptor".format(scaling_group))
2614
2615 # cooldown_time = 0
2616 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2617 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2618 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2619 # break
2620
2621 # TODO check if ns is in a proper status
2622 step = "Sending scale order to VIM"
2623 nb_scale_op = 0
2624 if not db_nsr["_admin"].get("scaling-group"):
2625 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2626 admin_scale_index = 0
2627 else:
2628 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2629 if admin_scale_info["name"] == scaling_group:
2630 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2631 break
2632 else: # not found, set index one plus last element and add new entry with the name
2633 admin_scale_index += 1
2634 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2635 RO_scaling_info = []
2636 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2637 if scaling_type == "SCALE_OUT":
2638 # count if max-instance-count is reached
2639 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2640 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2641 if nb_scale_op >= max_instance_count:
2642 raise LcmException("reached the limit of {} (max-instance-count) "
2643 "scaling-out operations for the "
2644 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2645
2646 nb_scale_op += 1
2647 vdu_scaling_info["scaling_direction"] = "OUT"
2648 vdu_scaling_info["vdu-create"] = {}
2649 for vdu_scale_info in scaling_descriptor["vdu"]:
2650 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2651 "type": "create", "count": vdu_scale_info.get("count", 1)})
2652 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2653
2654 elif scaling_type == "SCALE_IN":
2655 # count if min-instance-count is reached
2656 min_instance_count = 0
2657 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2658 min_instance_count = int(scaling_descriptor["min-instance-count"])
2659 if nb_scale_op <= min_instance_count:
2660 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2661 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2662 nb_scale_op -= 1
2663 vdu_scaling_info["scaling_direction"] = "IN"
2664 vdu_scaling_info["vdu-delete"] = {}
2665 for vdu_scale_info in scaling_descriptor["vdu"]:
2666 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2667 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2668 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2669
2670 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2671 vdu_create = vdu_scaling_info.get("vdu-create")
2672 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2673 if vdu_scaling_info["scaling_direction"] == "IN":
2674 for vdur in reversed(db_vnfr["vdur"]):
2675 if vdu_delete.get(vdur["vdu-id-ref"]):
2676 vdu_delete[vdur["vdu-id-ref"]] -= 1
2677 vdu_scaling_info["vdu"].append({
2678 "name": vdur["name"],
2679 "vdu_id": vdur["vdu-id-ref"],
2680 "interface": []
2681 })
2682 for interface in vdur["interfaces"]:
2683 vdu_scaling_info["vdu"][-1]["interface"].append({
2684 "name": interface["name"],
2685 "ip_address": interface["ip-address"],
2686 "mac_address": interface.get("mac-address"),
2687 })
2688 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2689
2690 # PRE-SCALE BEGIN
2691 step = "Executing pre-scale vnf-config-primitive"
2692 if scaling_descriptor.get("scaling-config-action"):
2693 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2694 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2695 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2696 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2697 step = db_nslcmop_update["detailed-status"] = \
2698 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2699
2700 # look for primitive
2701 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2702 if config_primitive["name"] == vnf_config_primitive:
2703 break
2704 else:
2705 raise LcmException(
2706 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2707 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2708 "primitive".format(scaling_group, config_primitive))
2709
2710 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2711 if db_vnfr.get("additionalParamsForVnf"):
2712 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2713
2714 scale_process = "VCA"
2715 db_nsr_update["config-status"] = "configuring pre-scaling"
2716 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2717
2718 # Pre-scale reintent check: Check if this sub-operation has been executed before
2719 op_index = self._check_or_add_scale_suboperation(
2720 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2721 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2722 # Skip sub-operation
2723 result = 'COMPLETED'
2724 result_detail = 'Done'
2725 self.logger.debug(logging_text +
2726 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2727 vnf_config_primitive, result, result_detail))
2728 else:
2729 if (op_index == self.SUBOPERATION_STATUS_NEW):
2730 # New sub-operation: Get index of this sub-operation
2731 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2732 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2733 format(vnf_config_primitive))
2734 else:
2735 # Reintent: Get registered params for this existing sub-operation
2736 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2737 vnf_index = op.get('member_vnf_index')
2738 vnf_config_primitive = op.get('primitive')
2739 primitive_params = op.get('primitive_params')
2740 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2741 format(vnf_config_primitive))
2742 # Execute the primitive, either with new (first-time) or registered (reintent) args
2743 result, result_detail = await self._ns_execute_primitive(
2744 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2745 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2746 vnf_config_primitive, result, result_detail))
2747 # Update operationState = COMPLETED | FAILED
2748 self._update_suboperation_status(
2749 db_nslcmop, op_index, result, result_detail)
2750
2751 if result == "FAILED":
2752 raise LcmException(result_detail)
2753 db_nsr_update["config-status"] = old_config_status
2754 scale_process = None
2755 # PRE-SCALE END
2756
2757 # SCALE RO - BEGIN
2758 # Should this block be skipped if 'RO_nsr_id' == None ?
2759 # if (RO_nsr_id and RO_scaling_info):
2760 if RO_scaling_info:
2761 scale_process = "RO"
2762 # Scale RO reintent check: Check if this sub-operation has been executed before
2763 op_index = self._check_or_add_scale_suboperation(
2764 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2765 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2766 # Skip sub-operation
2767 result = 'COMPLETED'
2768 result_detail = 'Done'
2769 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2770 result, result_detail))
2771 else:
2772 if (op_index == self.SUBOPERATION_STATUS_NEW):
2773 # New sub-operation: Get index of this sub-operation
2774 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2775 self.logger.debug(logging_text + "New sub-operation RO")
2776 else:
2777 # Reintent: Get registered params for this existing sub-operation
2778 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2779 RO_nsr_id = op.get('RO_nsr_id')
2780 RO_scaling_info = op.get('RO_scaling_info')
2781 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2782 vnf_config_primitive))
2783
2784 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2785 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2786 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2787 # wait until ready
2788 RO_nslcmop_id = RO_desc["instance_action_id"]
2789 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2790
2791 RO_task_done = False
2792 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2793 detailed_status_old = None
2794 self.logger.debug(logging_text + step)
2795
2796 deployment_timeout = 1 * 3600 # One hour
2797 while deployment_timeout > 0:
2798 if not RO_task_done:
2799 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2800 extra_item_id=RO_nslcmop_id)
2801 ns_status, ns_status_info = self.RO.check_action_status(desc)
2802 if ns_status == "ERROR":
2803 raise ROclient.ROClientException(ns_status_info)
2804 elif ns_status == "BUILD":
2805 detailed_status = step + "; {}".format(ns_status_info)
2806 elif ns_status == "ACTIVE":
2807 RO_task_done = True
2808 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2809 self.logger.debug(logging_text + step)
2810 else:
2811 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2812 else:
2813
2814 if ns_status == "ERROR":
2815 raise ROclient.ROClientException(ns_status_info)
2816 elif ns_status == "BUILD":
2817 detailed_status = step + "; {}".format(ns_status_info)
2818 elif ns_status == "ACTIVE":
2819 step = detailed_status = \
2820 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2821 if not vnfr_scaled:
2822 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2823 vnfr_scaled = True
2824 try:
2825 desc = await self.RO.show("ns", RO_nsr_id)
2826 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2827 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2828 break
2829 except LcmExceptionNoMgmtIP:
2830 pass
2831 else:
2832 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2833 if detailed_status != detailed_status_old:
2834 self._update_suboperation_status(
2835 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2836 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2837 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2838
2839 await asyncio.sleep(5, loop=self.loop)
2840 deployment_timeout -= 5
2841 if deployment_timeout <= 0:
2842 self._update_suboperation_status(
2843 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2844 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2845
2846 # update VDU_SCALING_INFO with the obtained ip_addresses
2847 if vdu_scaling_info["scaling_direction"] == "OUT":
2848 for vdur in reversed(db_vnfr["vdur"]):
2849 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2850 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2851 vdu_scaling_info["vdu"].append({
2852 "name": vdur["name"],
2853 "vdu_id": vdur["vdu-id-ref"],
2854 "interface": []
2855 })
2856 for interface in vdur["interfaces"]:
2857 vdu_scaling_info["vdu"][-1]["interface"].append({
2858 "name": interface["name"],
2859 "ip_address": interface["ip-address"],
2860 "mac_address": interface.get("mac-address"),
2861 })
2862 del vdu_scaling_info["vdu-create"]
2863
2864 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2865 # SCALE RO - END
2866
2867 scale_process = None
2868 if db_nsr_update:
2869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2870
2871 # POST-SCALE BEGIN
2872 # execute primitive service POST-SCALING
2873 step = "Executing post-scale vnf-config-primitive"
2874 if scaling_descriptor.get("scaling-config-action"):
2875 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2876 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2877 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2878 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2879 step = db_nslcmop_update["detailed-status"] = \
2880 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
2881
2882 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2883 if db_vnfr.get("additionalParamsForVnf"):
2884 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2885
2886 # look for primitive
2887 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2888 if config_primitive["name"] == vnf_config_primitive:
2889 break
2890 else:
2891 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
2892 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
2893 "match any vnf-configuration:config-primitive".format(scaling_group,
2894 config_primitive))
2895 scale_process = "VCA"
2896 db_nsr_update["config-status"] = "configuring post-scaling"
2897 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2898
2899 # Post-scale reintent check: Check if this sub-operation has been executed before
2900 op_index = self._check_or_add_scale_suboperation(
2901 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
2902 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2903 # Skip sub-operation
2904 result = 'COMPLETED'
2905 result_detail = 'Done'
2906 self.logger.debug(logging_text +
2907 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
2908 format(vnf_config_primitive, result, result_detail))
2909 else:
2910 if (op_index == self.SUBOPERATION_STATUS_NEW):
2911 # New sub-operation: Get index of this sub-operation
2912 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2913 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2914 format(vnf_config_primitive))
2915 else:
2916 # Reintent: Get registered params for this existing sub-operation
2917 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2918 vnf_index = op.get('member_vnf_index')
2919 vnf_config_primitive = op.get('primitive')
2920 primitive_params = op.get('primitive_params')
2921 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2922 format(vnf_config_primitive))
2923 # Execute the primitive, either with new (first-time) or registered (reintent) args
2924 result, result_detail = await self._ns_execute_primitive(
2925 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2926 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2927 vnf_config_primitive, result, result_detail))
2928 # Update operationState = COMPLETED | FAILED
2929 self._update_suboperation_status(
2930 db_nslcmop, op_index, result, result_detail)
2931
2932 if result == "FAILED":
2933 raise LcmException(result_detail)
2934 db_nsr_update["config-status"] = old_config_status
2935 scale_process = None
2936 # POST-SCALE END
2937
2938 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2939 db_nslcmop_update["statusEnteredTime"] = time()
2940 db_nslcmop_update["detailed-status"] = "done"
2941 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
2942 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
2943 else old_operational_status
2944 db_nsr_update["config-status"] = old_config_status
2945 return
2946 except (ROclient.ROClientException, DbException, LcmException) as e:
2947 self.logger.error(logging_text + "Exit Exception {}".format(e))
2948 exc = e
2949 except asyncio.CancelledError:
2950 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2951 exc = "Operation was cancelled"
2952 except Exception as e:
2953 exc = traceback.format_exc()
2954 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2955 finally:
2956 if exc:
2957 if db_nslcmop:
2958 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2959 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2960 db_nslcmop_update["statusEnteredTime"] = time()
2961 if db_nsr:
2962 db_nsr_update["operational-status"] = old_operational_status
2963 db_nsr_update["config-status"] = old_config_status
2964 db_nsr_update["detailed-status"] = ""
2965 db_nsr_update["_admin.nslcmop"] = None
2966 if scale_process:
2967 if "VCA" in scale_process:
2968 db_nsr_update["config-status"] = "failed"
2969 if "RO" in scale_process:
2970 db_nsr_update["operational-status"] = "failed"
2971 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
2972 exc)
2973 try:
2974 if db_nslcmop and db_nslcmop_update:
2975 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2976 if db_nsr:
2977 db_nsr_update["_admin.current-operation"] = None
2978 db_nsr_update["_admin.operation-type"] = None
2979 db_nsr_update["_admin.nslcmop"] = None
2980 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2981 except DbException as e:
2982 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2983 if nslcmop_operation_state:
2984 try:
2985 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2986 "operationState": nslcmop_operation_state},
2987 loop=self.loop)
2988 # if cooldown_time:
2989 # await asyncio.sleep(cooldown_time, loop=self.loop)
2990 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
2991 except Exception as e:
2992 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2993 self.logger.debug(logging_text + "Exit")
2994 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")