fix 931: No need to wait for VM up on ns configuration
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
28 from n2vc.k8s_helm_conn import K8sHelmConnector
29 from n2vc.k8s_juju_conn import K8sJujuConnector
30
31 from osm_common.dbbase import DbException
32 from osm_common.fsbase import FsException
33
34 from n2vc.n2vc_juju_conn import N2VCJujuConnector
35 from n2vc.exceptions import N2VCException
36
37 from copy import copy, deepcopy
38 from http import HTTPStatus
39 from time import time
40 from uuid import uuid4
41
42 __author__ = "Alfonso Tierno"
43
44
45 def get_iterable(in_dict, in_key):
46 """
47 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
48 :param in_dict: a dictionary
49 :param in_key: the key to look for at in_dict
50 :return: in_dict[in_var] or () if it is None or not present
51 """
52 if not in_dict.get(in_key):
53 return ()
54 return in_dict[in_key]
55
56
57 def populate_dict(target_dict, key_list, value):
58 """
59 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
60 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
61 :param target_dict: dictionary to be changed
62 :param key_list: list of keys to insert at target_dict
63 :param value:
64 :return: None
65 """
66 for key in key_list[0:-1]:
67 if key not in target_dict:
68 target_dict[key] = {}
69 target_dict = target_dict[key]
70 target_dict[key_list[-1]] = value
71
72
73 class NsLcm(LcmBase):
74 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
75 total_deploy_timeout = 2 * 3600 # global timeout for deployment
76 timeout_charm_delete = 10 * 60
77 timeout_primitive = 10 * 60 # timeout for primitive execution
78
79 SUBOPERATION_STATUS_NOT_FOUND = -1
80 SUBOPERATION_STATUS_NEW = -2
81 SUBOPERATION_STATUS_SKIP = -3
82
83 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
84 """
85 Init, Connect to database, filesystem storage, and messaging
86 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
87 :return: None
88 """
89 super().__init__(
90 db=db,
91 msg=msg,
92 fs=fs,
93 logger=logging.getLogger('lcm.ns')
94 )
95
96 self.loop = loop
97 self.lcm_tasks = lcm_tasks
98 self.ro_config = ro_config
99 self.vca_config = vca_config
100 if 'pubkey' in self.vca_config:
101 self.vca_config['public_key'] = self.vca_config['pubkey']
102 if 'cacert' in self.vca_config:
103 self.vca_config['ca_cert'] = self.vca_config['cacert']
104 if 'apiproxy' in self.vca_config:
105 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
106
107 # create N2VC connector
108 self.n2vc = N2VCJujuConnector(
109 db=self.db,
110 fs=self.fs,
111 log=self.logger,
112 loop=self.loop,
113 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
114 username=self.vca_config.get('user', None),
115 vca_config=self.vca_config,
116 on_update_db=self._on_update_n2vc_db,
117 # ca_cert=self.vca_config.get('cacert'),
118 # api_proxy=self.vca_config.get('apiproxy'),
119 )
120
121 self.k8sclusterhelm = K8sHelmConnector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helmpath"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 fs=self.fs,
134 log=self.logger,
135 db=self.db,
136 on_update_db=None,
137 )
138
139 # create RO client
140 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
141
142 def _on_update_n2vc_db(self, table, filter, path, updated_data):
143
144 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
145 .format(table, filter, path, updated_data))
146
147 return
148 # write NS status to database
149 # try:
150 # # nsrs_id = filter.get('_id')
151 # # print(nsrs_id)
152 # # get ns record
153 # nsr = self.db.get_one(table=table, q_filter=filter)
154 # # get VCA deployed list
155 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
156 # # get RO deployed
157 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
158 # for vca in vca_list:
159 # # status = vca.get('status')
160 # # print(status)
161 # # detailed_status = vca.get('detailed-status')
162 # # print(detailed_status)
163 # # for ro in ro_list:
164 # # print(ro)
165 #
166 # except Exception as e:
167 # self.logger.error('Error writing NS status to db: {}'.format(e))
168
169 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
170 """
171 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
172 :param vnfd: input vnfd
173 :param new_id: overrides vnf id if provided
174 :param additionalParams: Instantiation params for VNFs provided
175 :param nsrId: Id of the NSR
176 :return: copy of vnfd
177 """
178 try:
179 vnfd_RO = deepcopy(vnfd)
180 # remove unused by RO configuration, monitoring, scaling and internal keys
181 vnfd_RO.pop("_id", None)
182 vnfd_RO.pop("_admin", None)
183 vnfd_RO.pop("vnf-configuration", None)
184 vnfd_RO.pop("monitoring-param", None)
185 vnfd_RO.pop("scaling-group-descriptor", None)
186 vnfd_RO.pop("kdu", None)
187 vnfd_RO.pop("k8s-cluster", None)
188 if new_id:
189 vnfd_RO["id"] = new_id
190
191 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
192 for vdu in get_iterable(vnfd_RO, "vdu"):
193 cloud_init_file = None
194 if vdu.get("cloud-init-file"):
195 base_folder = vnfd["_admin"]["storage"]
196 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
197 vdu["cloud-init-file"])
198 with self.fs.file_open(cloud_init_file, "r") as ci_file:
199 cloud_init_content = ci_file.read()
200 vdu.pop("cloud-init-file", None)
201 elif vdu.get("cloud-init"):
202 cloud_init_content = vdu["cloud-init"]
203 else:
204 continue
205
206 env = Environment()
207 ast = env.parse(cloud_init_content)
208 mandatory_vars = meta.find_undeclared_variables(ast)
209 if mandatory_vars:
210 for var in mandatory_vars:
211 if not additionalParams or var not in additionalParams.keys():
212 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
213 "file, must be provided in the instantiation parameters inside the "
214 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
215 template = Template(cloud_init_content)
216 cloud_init_content = template.render(additionalParams or {})
217 vdu["cloud-init"] = cloud_init_content
218
219 return vnfd_RO
220 except FsException as e:
221 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
222 format(vnfd["id"], vdu["id"], cloud_init_file, e))
223 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
224 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
225 format(vnfd["id"], vdu["id"], e))
226
227 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
228 """
229 Creates a RO ns descriptor from OSM ns_instantiate params
230 :param ns_params: OSM instantiate params
231 :return: The RO ns descriptor
232 """
233 vim_2_RO = {}
234 wim_2_RO = {}
235 # TODO feature 1417: Check that no instantiation is set over PDU
236 # check if PDU forces a concrete vim-network-id and add it
237 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
238
239 def vim_account_2_RO(vim_account):
240 if vim_account in vim_2_RO:
241 return vim_2_RO[vim_account]
242
243 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
244 if db_vim["_admin"]["operationalState"] != "ENABLED":
245 raise LcmException("VIM={} is not available. operationalState={}".format(
246 vim_account, db_vim["_admin"]["operationalState"]))
247 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
248 vim_2_RO[vim_account] = RO_vim_id
249 return RO_vim_id
250
251 def wim_account_2_RO(wim_account):
252 if isinstance(wim_account, str):
253 if wim_account in wim_2_RO:
254 return wim_2_RO[wim_account]
255
256 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
257 if db_wim["_admin"]["operationalState"] != "ENABLED":
258 raise LcmException("WIM={} is not available. operationalState={}".format(
259 wim_account, db_wim["_admin"]["operationalState"]))
260 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
261 wim_2_RO[wim_account] = RO_wim_id
262 return RO_wim_id
263 else:
264 return wim_account
265
266 def ip_profile_2_RO(ip_profile):
267 RO_ip_profile = deepcopy((ip_profile))
268 if "dns-server" in RO_ip_profile:
269 if isinstance(RO_ip_profile["dns-server"], list):
270 RO_ip_profile["dns-address"] = []
271 for ds in RO_ip_profile.pop("dns-server"):
272 RO_ip_profile["dns-address"].append(ds['address'])
273 else:
274 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
275 if RO_ip_profile.get("ip-version") == "ipv4":
276 RO_ip_profile["ip-version"] = "IPv4"
277 if RO_ip_profile.get("ip-version") == "ipv6":
278 RO_ip_profile["ip-version"] = "IPv6"
279 if "dhcp-params" in RO_ip_profile:
280 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
281 return RO_ip_profile
282
283 if not ns_params:
284 return None
285 RO_ns_params = {
286 # "name": ns_params["nsName"],
287 # "description": ns_params.get("nsDescription"),
288 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
289 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
290 # "scenario": ns_params["nsdId"],
291 }
292
293 n2vc_key_list = n2vc_key_list or []
294 for vnfd_ref, vnfd in vnfd_dict.items():
295 vdu_needed_access = []
296 mgmt_cp = None
297 if vnfd.get("vnf-configuration"):
298 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
299 if ssh_required and vnfd.get("mgmt-interface"):
300 if vnfd["mgmt-interface"].get("vdu-id"):
301 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
302 elif vnfd["mgmt-interface"].get("cp"):
303 mgmt_cp = vnfd["mgmt-interface"]["cp"]
304
305 for vdu in vnfd.get("vdu", ()):
306 if vdu.get("vdu-configuration"):
307 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
308 if ssh_required:
309 vdu_needed_access.append(vdu["id"])
310 elif mgmt_cp:
311 for vdu_interface in vdu.get("interface"):
312 if vdu_interface.get("external-connection-point-ref") and \
313 vdu_interface["external-connection-point-ref"] == mgmt_cp:
314 vdu_needed_access.append(vdu["id"])
315 mgmt_cp = None
316 break
317
318 if vdu_needed_access:
319 for vnf_member in nsd.get("constituent-vnfd"):
320 if vnf_member["vnfd-id-ref"] != vnfd_ref:
321 continue
322 for vdu in vdu_needed_access:
323 populate_dict(RO_ns_params,
324 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
325 n2vc_key_list)
326
327 if ns_params.get("vduImage"):
328 RO_ns_params["vduImage"] = ns_params["vduImage"]
329
330 if ns_params.get("ssh_keys"):
331 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
332 for vnf_params in get_iterable(ns_params, "vnf"):
333 for constituent_vnfd in nsd["constituent-vnfd"]:
334 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
335 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
336 break
337 else:
338 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
339 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
340 if vnf_params.get("vimAccountId"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
342 vim_account_2_RO(vnf_params["vimAccountId"]))
343
344 for vdu_params in get_iterable(vnf_params, "vdu"):
345 # TODO feature 1417: check that this VDU exist and it is not a PDU
346 if vdu_params.get("volume"):
347 for volume_params in vdu_params["volume"]:
348 if volume_params.get("vim-volume-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
351 volume_params["vim-volume-id"])
352 if vdu_params.get("interface"):
353 for interface_params in vdu_params["interface"]:
354 if interface_params.get("ip-address"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "ip_address"),
358 interface_params["ip-address"])
359 if interface_params.get("mac-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_params["id"], "interfaces", interface_params["name"],
362 "mac_address"),
363 interface_params["mac-address"])
364 if interface_params.get("floating-ip-required"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_params["id"], "interfaces", interface_params["name"],
367 "floating-ip"),
368 interface_params["floating-ip-required"])
369
370 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
371 if internal_vld_params.get("vim-network-name"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
373 internal_vld_params["name"], "vim-network-name"),
374 internal_vld_params["vim-network-name"])
375 if internal_vld_params.get("vim-network-id"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-id"),
378 internal_vld_params["vim-network-id"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383 if internal_vld_params.get("provider-network"):
384
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
386 internal_vld_params["name"], "provider-network"),
387 internal_vld_params["provider-network"].copy())
388
389 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
390 # look for interface
391 iface_found = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for vdu_interface in vdu_descriptor["interface"]:
394 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
395 if icp_params.get("ip-address"):
396 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
397 vdu_descriptor["id"], "interfaces",
398 vdu_interface["name"], "ip_address"),
399 icp_params["ip-address"])
400
401 if icp_params.get("mac-address"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_descriptor["id"], "interfaces",
404 vdu_interface["name"], "mac_address"),
405 icp_params["mac-address"])
406 iface_found = True
407 break
408 if iface_found:
409 break
410 else:
411 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
412 "internal-vld:id-ref={} is not present at vnfd:internal-"
413 "connection-point".format(vnf_params["member-vnf-index"],
414 icp_params["id-ref"]))
415
416 for vld_params in get_iterable(ns_params, "vld"):
417 if "ip-profile" in vld_params:
418 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
419 ip_profile_2_RO(vld_params["ip-profile"]))
420
421 if vld_params.get("provider-network"):
422
423 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
424 vld_params["provider-network"].copy())
425
426 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
428 wim_account_2_RO(vld_params["wimAccountId"])),
429 if vld_params.get("vim-network-name"):
430 RO_vld_sites = []
431 if isinstance(vld_params["vim-network-name"], dict):
432 for vim_account, vim_net in vld_params["vim-network-name"].items():
433 RO_vld_sites.append({
434 "netmap-use": vim_net,
435 "datacenter": vim_account_2_RO(vim_account)
436 })
437 else: # isinstance str
438 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
439 if RO_vld_sites:
440 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
441
442 if vld_params.get("vim-network-id"):
443 RO_vld_sites = []
444 if isinstance(vld_params["vim-network-id"], dict):
445 for vim_account, vim_net in vld_params["vim-network-id"].items():
446 RO_vld_sites.append({
447 "netmap-use": vim_net,
448 "datacenter": vim_account_2_RO(vim_account)
449 })
450 else: # isinstance str
451 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
452 if RO_vld_sites:
453 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
454 if vld_params.get("ns-net"):
455 if isinstance(vld_params["ns-net"], dict):
456 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
457 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
458 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
459 if "vnfd-connection-point-ref" in vld_params:
460 for cp_params in vld_params["vnfd-connection-point-ref"]:
461 # look for interface
462 for constituent_vnfd in nsd["constituent-vnfd"]:
463 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
464 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
465 break
466 else:
467 raise LcmException(
468 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
469 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
470 match_cp = False
471 for vdu_descriptor in vnf_descriptor["vdu"]:
472 for interface_descriptor in vdu_descriptor["interface"]:
473 if interface_descriptor.get("external-connection-point-ref") == \
474 cp_params["vnfd-connection-point-ref"]:
475 match_cp = True
476 break
477 if match_cp:
478 break
479 else:
480 raise LcmException(
481 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
482 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
483 cp_params["member-vnf-index-ref"],
484 cp_params["vnfd-connection-point-ref"],
485 vnf_descriptor["id"]))
486 if cp_params.get("ip-address"):
487 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
488 vdu_descriptor["id"], "interfaces",
489 interface_descriptor["name"], "ip_address"),
490 cp_params["ip-address"])
491 if cp_params.get("mac-address"):
492 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
493 vdu_descriptor["id"], "interfaces",
494 interface_descriptor["name"], "mac_address"),
495 cp_params["mac-address"])
496 return RO_ns_params
497
498 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
499 # make a copy to do not change
500 vdu_create = copy(vdu_create)
501 vdu_delete = copy(vdu_delete)
502
503 vdurs = db_vnfr.get("vdur")
504 if vdurs is None:
505 vdurs = []
506 vdu_index = len(vdurs)
507 while vdu_index:
508 vdu_index -= 1
509 vdur = vdurs[vdu_index]
510 if vdur.get("pdu-type"):
511 continue
512 vdu_id_ref = vdur["vdu-id-ref"]
513 if vdu_create and vdu_create.get(vdu_id_ref):
514 for index in range(0, vdu_create[vdu_id_ref]):
515 vdur = deepcopy(vdur)
516 vdur["_id"] = str(uuid4())
517 vdur["count-index"] += 1
518 vdurs.insert(vdu_index+1+index, vdur)
519 del vdu_create[vdu_id_ref]
520 if vdu_delete and vdu_delete.get(vdu_id_ref):
521 del vdurs[vdu_index]
522 vdu_delete[vdu_id_ref] -= 1
523 if not vdu_delete[vdu_id_ref]:
524 del vdu_delete[vdu_id_ref]
525 # check all operations are done
526 if vdu_create or vdu_delete:
527 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
528 vdu_create))
529 if vdu_delete:
530 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
531 vdu_delete))
532
533 vnfr_update = {"vdur": vdurs}
534 db_vnfr["vdur"] = vdurs
535 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
536
537 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
538 """
539 Updates database nsr with the RO info for the created vld
540 :param ns_update_nsr: dictionary to be filled with the updated info
541 :param db_nsr: content of db_nsr. This is also modified
542 :param nsr_desc_RO: nsr descriptor from RO
543 :return: Nothing, LcmException is raised on errors
544 """
545
546 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
547 for net_RO in get_iterable(nsr_desc_RO, "nets"):
548 if vld["id"] != net_RO.get("ns_net_osm_id"):
549 continue
550 vld["vim-id"] = net_RO.get("vim_net_id")
551 vld["name"] = net_RO.get("vim_name")
552 vld["status"] = net_RO.get("status")
553 vld["status-detailed"] = net_RO.get("error_msg")
554 ns_update_nsr["vld.{}".format(vld_index)] = vld
555 break
556 else:
557 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
558
559 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
560 """
561 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
562 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
563 :param nsr_desc_RO: nsr descriptor from RO
564 :return: Nothing, LcmException is raised on errors
565 """
566 for vnf_index, db_vnfr in db_vnfrs.items():
567 for vnf_RO in nsr_desc_RO["vnfs"]:
568 if vnf_RO["member_vnf_index"] != vnf_index:
569 continue
570 vnfr_update = {}
571 if vnf_RO.get("ip_address"):
572 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
573 elif not db_vnfr.get("ip-address"):
574 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
575
576 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
577 vdur_RO_count_index = 0
578 if vdur.get("pdu-type"):
579 continue
580 for vdur_RO in get_iterable(vnf_RO, "vms"):
581 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
582 continue
583 if vdur["count-index"] != vdur_RO_count_index:
584 vdur_RO_count_index += 1
585 continue
586 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
587 if vdur_RO.get("ip_address"):
588 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
589 else:
590 vdur["ip-address"] = None
591 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
592 vdur["name"] = vdur_RO.get("vim_name")
593 vdur["status"] = vdur_RO.get("status")
594 vdur["status-detailed"] = vdur_RO.get("error_msg")
595 for ifacer in get_iterable(vdur, "interfaces"):
596 for interface_RO in get_iterable(vdur_RO, "interfaces"):
597 if ifacer["name"] == interface_RO.get("internal_name"):
598 ifacer["ip-address"] = interface_RO.get("ip_address")
599 ifacer["mac-address"] = interface_RO.get("mac_address")
600 break
601 else:
602 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
603 "from VIM info"
604 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
605 vnfr_update["vdur.{}".format(vdu_index)] = vdur
606 break
607 else:
608 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
609 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
610
611 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
612 for net_RO in get_iterable(nsr_desc_RO, "nets"):
613 if vld["id"] != net_RO.get("vnf_net_osm_id"):
614 continue
615 vld["vim-id"] = net_RO.get("vim_net_id")
616 vld["name"] = net_RO.get("vim_name")
617 vld["status"] = net_RO.get("status")
618 vld["status-detailed"] = net_RO.get("error_msg")
619 vnfr_update["vld.{}".format(vld_index)] = vld
620 break
621 else:
622 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
623 vnf_index, vld["id"]))
624
625 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
626 break
627
628 else:
629 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
630
631 def _get_ns_config_info(self, nsr_id):
632 """
633 Generates a mapping between vnf,vdu elements and the N2VC id
634 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
635 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
636 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
637 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
638 """
639 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
640 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
641 mapping = {}
642 ns_config_info = {"osm-config-mapping": mapping}
643 for vca in vca_deployed_list:
644 if not vca["member-vnf-index"]:
645 continue
646 if not vca["vdu_id"]:
647 mapping[vca["member-vnf-index"]] = vca["application"]
648 else:
649 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
650 vca["application"]
651 return ns_config_info
652
653 @staticmethod
654 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
655 """
656 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
657 primitives as verify-ssh-credentials, or config when needed
658 :param desc_primitive_list: information of the descriptor
659 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
660 this element contains a ssh public key
661 :return: The modified list. Can ba an empty list, but always a list
662 """
663 if desc_primitive_list:
664 primitive_list = desc_primitive_list.copy()
665 else:
666 primitive_list = []
667 # look for primitive config, and get the position. None if not present
668 config_position = None
669 for index, primitive in enumerate(primitive_list):
670 if primitive["name"] == "config":
671 config_position = index
672 break
673
674 # for NS, add always a config primitive if not present (bug 874)
675 if not vca_deployed["member-vnf-index"] and config_position is None:
676 primitive_list.insert(0, {"name": "config", "parameter": []})
677 config_position = 0
678 # for VNF/VDU add verify-ssh-credentials after config
679 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
680 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
681 return primitive_list
682
683 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
684 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
685
686 db_nsr_update = {}
687 RO_descriptor_number = 0 # number of descriptors created at RO
688 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
689 start_deploy = time()
690 vdu_flag = False # If any of the VNFDs has VDUs
691 ns_params = db_nslcmop.get("operationParams")
692
693 # deploy RO
694
695 # get vnfds, instantiate at RO
696
697 for c_vnf in nsd.get("constituent-vnfd", ()):
698 member_vnf_index = c_vnf["member-vnf-index"]
699 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
700 if vnfd.get("vdu"):
701 vdu_flag = True
702 vnfd_ref = vnfd["id"]
703 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
704 " RO".format(vnfd_ref, member_vnf_index)
705 # self.logger.debug(logging_text + step)
706 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
707 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
708 RO_descriptor_number += 1
709
710 # look position at deployed.RO.vnfd if not present it will be appended at the end
711 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
712 if vnf_deployed["member-vnf-index"] == member_vnf_index:
713 break
714 else:
715 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
716 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
717
718 # look if present
719 RO_update = {"member-vnf-index": member_vnf_index}
720 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
721 if vnfd_list:
722 RO_update["id"] = vnfd_list[0]["uuid"]
723 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
724 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
725 else:
726 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
727 get("additionalParamsForVnf"), nsr_id)
728 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
729 RO_update["id"] = desc["uuid"]
730 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
731 vnfd_ref, member_vnf_index, desc["uuid"]))
732 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
733 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
734 self.update_db_2("nsrs", nsr_id, db_nsr_update)
735 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
736
737 # create nsd at RO
738 nsd_ref = nsd["id"]
739 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
740 # self.logger.debug(logging_text + step)
741
742 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
743 RO_descriptor_number += 1
744 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
745 if nsd_list:
746 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
747 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
748 nsd_ref, RO_nsd_uuid))
749 else:
750 nsd_RO = deepcopy(nsd)
751 nsd_RO["id"] = RO_osm_nsd_id
752 nsd_RO.pop("_id", None)
753 nsd_RO.pop("_admin", None)
754 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
755 member_vnf_index = c_vnf["member-vnf-index"]
756 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
757 for c_vld in nsd_RO.get("vld", ()):
758 for cp in c_vld.get("vnfd-connection-point-ref", ()):
759 member_vnf_index = cp["member-vnf-index-ref"]
760 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
761
762 desc = await self.RO.create("nsd", descriptor=nsd_RO)
763 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
764 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
765 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
767 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
768
769 # Crate ns at RO
770 # if present use it unless in error status
771 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
772 if RO_nsr_id:
773 try:
774 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
775 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
776 desc = await self.RO.show("ns", RO_nsr_id)
777 except ROclient.ROClientException as e:
778 if e.http_code != HTTPStatus.NOT_FOUND:
779 raise
780 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
781 if RO_nsr_id:
782 ns_status, ns_status_info = self.RO.check_ns_status(desc)
783 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
784 if ns_status == "ERROR":
785 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
786 .format(RO_nsr_id)
787 self.logger.debug(logging_text + step)
788 await self.RO.delete("ns", RO_nsr_id)
789 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
790 if not RO_nsr_id:
791 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
792 # self.logger.debug(logging_text + step)
793
794 # check if VIM is creating and wait look if previous tasks in process
795 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
796 if task_dependency:
797 step = "Waiting for related tasks to be completed: {}".format(task_name)
798 self.logger.debug(logging_text + step)
799 await asyncio.wait(task_dependency, timeout=3600)
800 if ns_params.get("vnf"):
801 for vnf in ns_params["vnf"]:
802 if "vimAccountId" in vnf:
803 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
804 vnf["vimAccountId"])
805 if task_dependency:
806 step = "Waiting for related tasks to be completed: {}".format(task_name)
807 self.logger.debug(logging_text + step)
808 await asyncio.wait(task_dependency, timeout=3600)
809
810 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
811
812 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
813
814 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
815 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
816 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
817 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
818 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
819 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
820 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
822 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
823
824 # wait until NS is ready
825 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
826 detailed_status_old = None
827 self.logger.debug(logging_text + step)
828
829 while time() <= start_deploy + self.total_deploy_timeout:
830 desc = await self.RO.show("ns", RO_nsr_id)
831 ns_status, ns_status_info = self.RO.check_ns_status(desc)
832 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
833 if ns_status == "ERROR":
834 raise ROclient.ROClientException(ns_status_info)
835 elif ns_status == "BUILD":
836 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
837 elif ns_status == "ACTIVE":
838 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
839 try:
840 if vdu_flag:
841 self.ns_update_vnfr(db_vnfrs, desc)
842 break
843 except LcmExceptionNoMgmtIP:
844 pass
845 else:
846 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
847 if detailed_status != detailed_status_old:
848 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
849 self.update_db_2("nsrs", nsr_id, db_nsr_update)
850 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
851 await asyncio.sleep(5, loop=self.loop)
852 else: # total_deploy_timeout
853 raise ROclient.ROClientException("Timeout waiting ns to be ready")
854
855 step = "Updating NSR"
856 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
857
858 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
859 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
860 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 self.update_db_2("nsrs", nsr_id, db_nsr_update)
862 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
863
864 step = "Deployed at VIM"
865 self.logger.debug(logging_text + step)
866
867 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
868 """
869 Wait for ip addres at RO, and optionally, insert public key in virtual machine
870 :param logging_text: prefix use for logging
871 :param nsr_id:
872 :param vnfr_id:
873 :param vdu_id:
874 :param vdu_index:
875 :param pub_key: public ssh key to inject, None to skip
876 :param user: user to apply the public ssh key
877 :return: IP address
878 """
879
880 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
881 ro_nsr_id = None
882 ip_address = None
883 nb_tries = 0
884 target_vdu_id = None
885 ro_retries = 0
886
887 while True:
888
889 ro_retries += 1
890 if ro_retries >= 360: # 1 hour
891 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
892
893 await asyncio.sleep(10, loop=self.loop)
894 # wait until NS is deployed at RO
895 if not ro_nsr_id:
896 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
897 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
898 if not ro_nsr_id:
899 continue
900
901 # get ip address
902 if not target_vdu_id:
903 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
904
905 if not vdu_id: # for the VNF case
906 ip_address = db_vnfr.get("ip-address")
907 if not ip_address:
908 continue
909 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
910 else: # VDU case
911 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
912 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
913
914 if not vdur:
915 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
916 vnfr_id, vdu_id, vdu_index
917 ))
918
919 if vdur.get("status") == "ACTIVE":
920 ip_address = vdur.get("ip-address")
921 if not ip_address:
922 continue
923 target_vdu_id = vdur["vdu-id-ref"]
924 elif vdur.get("status") == "ERROR":
925 raise LcmException("Cannot inject ssh-key because target VM is in error state")
926
927 if not target_vdu_id:
928 continue
929
930 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
931
932 # inject public key into machine
933 if pub_key and user:
934 # self.logger.debug(logging_text + "Inserting RO key")
935 try:
936 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
937 result_dict = await self.RO.create_action(
938 item="ns",
939 item_id_name=ro_nsr_id,
940 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
941 )
942 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
943 if not result_dict or not isinstance(result_dict, dict):
944 raise LcmException("Unknown response from RO when injecting key")
945 for result in result_dict.values():
946 if result.get("vim_result") == 200:
947 break
948 else:
949 raise ROclient.ROClientException("error injecting key: {}".format(
950 result.get("description")))
951 break
952 except ROclient.ROClientException as e:
953 if not nb_tries:
954 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
955 format(e, 20*10))
956 nb_tries += 1
957 if nb_tries >= 20:
958 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
959 else:
960 break
961
962 return ip_address
963
964 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
965 """
966 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
967 """
968 my_vca = vca_deployed_list[vca_index]
969 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
970 return
971 timeout = 300
972 while timeout >= 0:
973 for index, vca_deployed in enumerate(vca_deployed_list):
974 if index == vca_index:
975 continue
976 if not my_vca.get("member-vnf-index") or \
977 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
978 if not vca_deployed.get("instantiation"):
979 break # wait
980 if vca_deployed["instantiation"] == "FAILED":
981 raise LcmException("Configuration aborted because dependent charm/s has failed")
982 else:
983 return
984 await asyncio.sleep(10)
985 timeout -= 1
986 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
987 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
988
989 raise LcmException("Configuration aborted because dependent charm/s timeout")
990
991 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
992 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
993 nsr_id = db_nsr["_id"]
994 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
995 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
996 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
997 db_dict = {
998 'collection': 'nsrs',
999 'filter': {'_id': nsr_id},
1000 'path': db_update_entry
1001 }
1002 step = ""
1003 try:
1004 vnfr_id = None
1005 if db_vnfr:
1006 vnfr_id = db_vnfr["_id"]
1007
1008 namespace = "{nsi}.{ns}".format(
1009 nsi=nsi_id if nsi_id else "",
1010 ns=nsr_id)
1011 if vnfr_id:
1012 namespace += "." + vnfr_id
1013 if vdu_id:
1014 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1015
1016 # Get artifact path
1017 artifact_path = "{}/{}/charms/{}".format(
1018 base_folder["folder"],
1019 base_folder["pkg-dir"],
1020 config_descriptor["juju"]["charm"]
1021 )
1022
1023 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1024 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1025 is_proxy_charm = False
1026
1027 # n2vc_redesign STEP 3.1
1028
1029 # find old ee_id if exists
1030 ee_id = vca_deployed.get("ee_id")
1031
1032 # create or register execution environment in VCA
1033 if is_proxy_charm:
1034 step = "create execution environment"
1035 self.logger.debug(logging_text + step)
1036 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1037 reuse_ee_id=ee_id,
1038 db_dict=db_dict)
1039 else:
1040 step = "Waiting to VM being up and getting IP address"
1041 self.logger.debug(logging_text + step)
1042 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1043 user=None, pub_key=None)
1044 credentials = {"hostname": rw_mgmt_ip}
1045 # get username
1046 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1047 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1048 # merged. Meanwhile let's get username from initial-config-primitive
1049 if not username and config_descriptor.get("initial-config-primitive"):
1050 for config_primitive in config_descriptor["initial-config-primitive"]:
1051 for param in config_primitive.get("parameter", ()):
1052 if param["name"] == "ssh-username":
1053 username = param["value"]
1054 break
1055 if not username:
1056 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1057 "'config-access.ssh-access.default-user'")
1058 credentials["username"] = username
1059 # n2vc_redesign STEP 3.2
1060
1061 step = "register execution environment {}".format(credentials)
1062 self.logger.debug(logging_text + step)
1063 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1064 db_dict=db_dict)
1065
1066 # for compatibility with MON/POL modules, the need model and application name at database
1067 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1068 ee_id_parts = ee_id.split('.')
1069 model_name = ee_id_parts[0]
1070 application_name = ee_id_parts[1]
1071 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1072 db_update_entry + "application": application_name,
1073 db_update_entry + "ee_id": ee_id})
1074
1075 # n2vc_redesign STEP 3.3
1076
1077 step = "Install configuration Software"
1078 # TODO check if already done
1079 self.logger.debug(logging_text + step)
1080 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1081
1082 # if SSH access is required, then get execution environment SSH public
1083 if is_proxy_charm: # if native charm we have waited already to VM be UP
1084 pub_key = None
1085 user = None
1086 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1087 # Needed to inject a ssh key
1088 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1089 step = "Install configuration Software, getting public ssh key"
1090 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1091
1092 step = "Insert public key into VM"
1093 else:
1094 step = "Waiting to VM being up and getting IP address"
1095 self.logger.debug(logging_text + step)
1096
1097 # n2vc_redesign STEP 5.1
1098 # wait for RO (ip-address) Insert pub_key into VM
1099 if vnfr_id:
1100 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1101 user=user, pub_key=pub_key)
1102 else:
1103 rw_mgmt_ip = None # This is for a NS configuration
1104
1105 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1106
1107 # store rw_mgmt_ip in deploy params for later replacement
1108 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1109
1110 # n2vc_redesign STEP 6 Execute initial config primitive
1111 step = 'execute initial config primitive'
1112 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1113
1114 # sort initial config primitives by 'seq'
1115 try:
1116 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1117 except Exception as e:
1118 self.logger.error(logging_text + step + ": " + str(e))
1119
1120 # add config if not present for NS charm
1121 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1122 vca_deployed)
1123 if initial_config_primitive_list:
1124 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1125 for initial_config_primitive in initial_config_primitive_list:
1126 # adding information on the vca_deployed if it is a NS execution environment
1127 if not vca_deployed["member-vnf-index"]:
1128 deploy_params["ns_config_info"] = self._get_ns_config_info(nsr_id)
1129 # TODO check if already done
1130 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1131
1132 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1133 self.logger.debug(logging_text + step)
1134 await self.n2vc.exec_primitive(
1135 ee_id=ee_id,
1136 primitive_name=initial_config_primitive["name"],
1137 params_dict=primitive_params_,
1138 db_dict=db_dict
1139 )
1140 # TODO register in database that primitive is done
1141
1142 step = "instantiated at VCA"
1143 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "COMPLETED"})
1144 self.logger.debug(logging_text + step)
1145
1146 except Exception as e: # TODO not use Exception but N2VC exception
1147 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1148 raise Exception("{} {}".format(step, e)) from e
1149 # TODO raise N2VC exception with 'step' extra information
1150
1151 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1152 error_description: str = None, error_detail: str = None):
1153 try:
1154 db_dict = dict()
1155 if ns_state:
1156 db_dict["nsState"] = ns_state
1157 db_dict["currentOperation"] = current_operation
1158 db_dict["currentOperationID"] = current_operation_id
1159 db_dict["errorDescription"] = error_description
1160 db_dict["errorDetail"] = error_detail
1161 self.update_db_2("nsrs", nsr_id, db_dict)
1162 except Exception as e:
1163 self.logger.warn('Error writing NS status: {}'.format(e))
1164
1165 async def instantiate(self, nsr_id, nslcmop_id):
1166 """
1167
1168 :param nsr_id: ns instance to deploy
1169 :param nslcmop_id: operation to run
1170 :return:
1171 """
1172
1173 # Try to lock HA task here
1174 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1175 if not task_is_locked_by_me:
1176 self.logger.debug('instantiate() task is not locked by me')
1177 return
1178
1179 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1180 self.logger.debug(logging_text + "Enter")
1181
1182 # get all needed from database
1183
1184 # database nsrs record
1185 db_nsr = None
1186
1187 # database nslcmops record
1188 db_nslcmop = None
1189
1190 # update operation on nsrs
1191 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1192 "_admin.current-operation": nslcmop_id,
1193 "_admin.operation-type": "instantiate"}
1194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1195
1196 # update operation on nslcmops
1197 db_nslcmop_update = {}
1198
1199 nslcmop_operation_state = None
1200 db_vnfrs = {} # vnf's info indexed by member-index
1201 # n2vc_info = {}
1202 task_instantiation_list = []
1203 task_instantiation_info = {} # from task to info text
1204 exc = None
1205 try:
1206 # wait for any previous tasks in process
1207 step = "Waiting for previous operations to terminate"
1208 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1209
1210 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1211
1212 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1213 self._write_ns_status(
1214 nsr_id=nsr_id,
1215 ns_state="BUILDING",
1216 current_operation="INSTANTIATING",
1217 current_operation_id=nslcmop_id
1218 )
1219
1220 # read from db: operation
1221 step = "Getting nslcmop={} from db".format(nslcmop_id)
1222 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1223
1224 # read from db: ns
1225 step = "Getting nsr={} from db".format(nsr_id)
1226 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1227 # nsd is replicated into ns (no db read)
1228 nsd = db_nsr["nsd"]
1229 # nsr_name = db_nsr["name"] # TODO short-name??
1230
1231 # read from db: vnf's of this ns
1232 step = "Getting vnfrs from db"
1233 self.logger.debug(logging_text + step)
1234 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1235
1236 # read from db: vnfd's for every vnf
1237 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1238 db_vnfds = {} # every vnfd data indexed by vnf id
1239 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1240
1241 # for each vnf in ns, read vnfd
1242 for vnfr in db_vnfrs_list:
1243 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1244 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1245 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1246 # if we haven't this vnfd, read it from db
1247 if vnfd_id not in db_vnfds:
1248 # read from cb
1249 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1250 self.logger.debug(logging_text + step)
1251 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1252
1253 # store vnfd
1254 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1255 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1256 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1257
1258 # Get or generates the _admin.deployed.VCA list
1259 vca_deployed_list = None
1260 if db_nsr["_admin"].get("deployed"):
1261 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1262 if vca_deployed_list is None:
1263 vca_deployed_list = []
1264 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1265 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1266 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1267 elif isinstance(vca_deployed_list, dict):
1268 # maintain backward compatibility. Change a dict to list at database
1269 vca_deployed_list = list(vca_deployed_list.values())
1270 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1271 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1272
1273 db_nsr_update["detailed-status"] = "creating"
1274 db_nsr_update["operational-status"] = "init"
1275
1276 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1277 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1278 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1279
1280 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1281 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1282 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1283 self.logger.debug(logging_text + "Before deploy_kdus")
1284 # Call to deploy_kdus in case exists the "vdu:kdu" param
1285 task_kdu = asyncio.ensure_future(
1286 self.deploy_kdus(
1287 logging_text=logging_text,
1288 nsr_id=nsr_id,
1289 db_nsr=db_nsr,
1290 db_vnfrs=db_vnfrs,
1291 )
1292 )
1293 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1294 task_instantiation_info[task_kdu] = "Deploy KDUs"
1295 task_instantiation_list.append(task_kdu)
1296 # n2vc_redesign STEP 1 Get VCA public ssh-key
1297 # feature 1429. Add n2vc public key to needed VMs
1298 n2vc_key = self.n2vc.get_public_key()
1299 n2vc_key_list = [n2vc_key]
1300 if self.vca_config.get("public_key"):
1301 n2vc_key_list.append(self.vca_config["public_key"])
1302
1303 # n2vc_redesign STEP 2 Deploy Network Scenario
1304 task_ro = asyncio.ensure_future(
1305 self.instantiate_RO(
1306 logging_text=logging_text,
1307 nsr_id=nsr_id,
1308 nsd=nsd,
1309 db_nsr=db_nsr,
1310 db_nslcmop=db_nslcmop,
1311 db_vnfrs=db_vnfrs,
1312 db_vnfds_ref=db_vnfds_ref,
1313 n2vc_key_list=n2vc_key_list
1314 )
1315 )
1316 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1317 task_instantiation_info[task_ro] = "Deploy at VIM"
1318 task_instantiation_list.append(task_ro)
1319
1320 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1321 step = "Looking for needed vnfd to configure with proxy charm"
1322 self.logger.debug(logging_text + step)
1323
1324 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1325 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1326 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1327 vnfd_id = c_vnf["vnfd-id-ref"]
1328 vnfd = db_vnfds_ref[vnfd_id]
1329 member_vnf_index = str(c_vnf["member-vnf-index"])
1330 db_vnfr = db_vnfrs[member_vnf_index]
1331 base_folder = vnfd["_admin"]["storage"]
1332 vdu_id = None
1333 vdu_index = 0
1334 vdu_name = None
1335 kdu_name = None
1336
1337 # Get additional parameters
1338 deploy_params = {}
1339 if db_vnfr.get("additionalParamsForVnf"):
1340 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1341
1342 descriptor_config = vnfd.get("vnf-configuration")
1343 if descriptor_config and descriptor_config.get("juju"):
1344 self._deploy_n2vc(
1345 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1346 db_nsr=db_nsr,
1347 db_vnfr=db_vnfr,
1348 nslcmop_id=nslcmop_id,
1349 nsr_id=nsr_id,
1350 nsi_id=nsi_id,
1351 vnfd_id=vnfd_id,
1352 vdu_id=vdu_id,
1353 kdu_name=kdu_name,
1354 member_vnf_index=member_vnf_index,
1355 vdu_index=vdu_index,
1356 vdu_name=vdu_name,
1357 deploy_params=deploy_params,
1358 descriptor_config=descriptor_config,
1359 base_folder=base_folder,
1360 task_instantiation_list=task_instantiation_list,
1361 task_instantiation_info=task_instantiation_info
1362 )
1363
1364 # Deploy charms for each VDU that supports one.
1365 for vdud in get_iterable(vnfd, 'vdu'):
1366 vdu_id = vdud["id"]
1367 descriptor_config = vdud.get('vdu-configuration')
1368 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1369 if vdur.get("additionalParams"):
1370 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1371 else:
1372 deploy_params_vdu = deploy_params
1373 if descriptor_config and descriptor_config.get("juju"):
1374 # look for vdu index in the db_vnfr["vdu"] section
1375 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1376 # if vdur["vdu-id-ref"] == vdu_id:
1377 # break
1378 # else:
1379 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1380 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1381 # vdu_name = vdur.get("name")
1382 vdu_name = None
1383 kdu_name = None
1384 for vdu_index in range(int(vdud.get("count", 1))):
1385 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1386 self._deploy_n2vc(
1387 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1388 member_vnf_index, vdu_id, vdu_index),
1389 db_nsr=db_nsr,
1390 db_vnfr=db_vnfr,
1391 nslcmop_id=nslcmop_id,
1392 nsr_id=nsr_id,
1393 nsi_id=nsi_id,
1394 vnfd_id=vnfd_id,
1395 vdu_id=vdu_id,
1396 kdu_name=kdu_name,
1397 member_vnf_index=member_vnf_index,
1398 vdu_index=vdu_index,
1399 vdu_name=vdu_name,
1400 deploy_params=deploy_params_vdu,
1401 descriptor_config=descriptor_config,
1402 base_folder=base_folder,
1403 task_instantiation_list=task_instantiation_list,
1404 task_instantiation_info=task_instantiation_info
1405 )
1406 for kdud in get_iterable(vnfd, 'kdu'):
1407 kdu_name = kdud["name"]
1408 descriptor_config = kdud.get('kdu-configuration')
1409 if descriptor_config and descriptor_config.get("juju"):
1410 vdu_id = None
1411 vdu_index = 0
1412 vdu_name = None
1413 # look for vdu index in the db_vnfr["vdu"] section
1414 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1415 # if vdur["vdu-id-ref"] == vdu_id:
1416 # break
1417 # else:
1418 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1419 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1420 # vdu_name = vdur.get("name")
1421 # vdu_name = None
1422
1423 self._deploy_n2vc(
1424 logging_text=logging_text,
1425 db_nsr=db_nsr,
1426 db_vnfr=db_vnfr,
1427 nslcmop_id=nslcmop_id,
1428 nsr_id=nsr_id,
1429 nsi_id=nsi_id,
1430 vnfd_id=vnfd_id,
1431 vdu_id=vdu_id,
1432 kdu_name=kdu_name,
1433 member_vnf_index=member_vnf_index,
1434 vdu_index=vdu_index,
1435 vdu_name=vdu_name,
1436 deploy_params=deploy_params,
1437 descriptor_config=descriptor_config,
1438 base_folder=base_folder,
1439 task_instantiation_list=task_instantiation_list,
1440 task_instantiation_info=task_instantiation_info
1441 )
1442
1443 # Check if this NS has a charm configuration
1444 descriptor_config = nsd.get("ns-configuration")
1445 if descriptor_config and descriptor_config.get("juju"):
1446 vnfd_id = None
1447 db_vnfr = None
1448 member_vnf_index = None
1449 vdu_id = None
1450 kdu_name = None
1451 vdu_index = 0
1452 vdu_name = None
1453
1454 # Get additional parameters
1455 deploy_params = {}
1456 if db_nsr.get("additionalParamsForNs"):
1457 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1458 base_folder = nsd["_admin"]["storage"]
1459 self._deploy_n2vc(
1460 logging_text=logging_text,
1461 db_nsr=db_nsr,
1462 db_vnfr=db_vnfr,
1463 nslcmop_id=nslcmop_id,
1464 nsr_id=nsr_id,
1465 nsi_id=nsi_id,
1466 vnfd_id=vnfd_id,
1467 vdu_id=vdu_id,
1468 kdu_name=kdu_name,
1469 member_vnf_index=member_vnf_index,
1470 vdu_index=vdu_index,
1471 vdu_name=vdu_name,
1472 deploy_params=deploy_params,
1473 descriptor_config=descriptor_config,
1474 base_folder=base_folder,
1475 task_instantiation_list=task_instantiation_list,
1476 task_instantiation_info=task_instantiation_info
1477 )
1478
1479 # Wait until all tasks of "task_instantiation_list" have been finished
1480
1481 # while time() <= start_deploy + self.total_deploy_timeout:
1482 error_text_list = []
1483 timeout = 3600
1484
1485 # let's begin with all OK
1486 instantiated_ok = True
1487 # let's begin with RO 'running' status (later we can change it)
1488 db_nsr_update["operational-status"] = "running"
1489 # let's begin with VCA 'configured' status (later we can change it)
1490 db_nsr_update["config-status"] = "configured"
1491
1492 if task_instantiation_list:
1493 # wait for all tasks completion
1494 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1495
1496 for task in pending:
1497 instantiated_ok = False
1498 if task == task_ro:
1499 db_nsr_update["operational-status"] = "failed"
1500 else:
1501 db_nsr_update["config-status"] = "failed"
1502 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1503 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1504 for task in done:
1505 if task.cancelled():
1506 instantiated_ok = False
1507 if task == task_ro:
1508 db_nsr_update["operational-status"] = "failed"
1509 else:
1510 db_nsr_update["config-status"] = "failed"
1511 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1512 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1513 else:
1514 exc = task.exception()
1515 if exc:
1516 instantiated_ok = False
1517 if task == task_ro:
1518 db_nsr_update["operational-status"] = "failed"
1519 else:
1520 db_nsr_update["config-status"] = "failed"
1521 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1522 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1523 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1524 else:
1525 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1526 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1527 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1528 else:
1529 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1530
1531 if error_text_list:
1532 error_text = "\n".join(error_text_list)
1533 db_nsr_update["detailed-status"] = error_text
1534 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1535 db_nslcmop_update["detailed-status"] = error_text
1536 db_nslcmop_update["statusEnteredTime"] = time()
1537 else:
1538 # all is done
1539 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1540 db_nslcmop_update["statusEnteredTime"] = time()
1541 db_nslcmop_update["detailed-status"] = "done"
1542 db_nsr_update["detailed-status"] = "done"
1543
1544 except (ROclient.ROClientException, DbException, LcmException) as e:
1545 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1546 exc = e
1547 except asyncio.CancelledError:
1548 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1549 exc = "Operation was cancelled"
1550 except Exception as e:
1551 exc = traceback.format_exc()
1552 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1553 exc_info=True)
1554 finally:
1555 if exc:
1556 if db_nsr:
1557 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1558 db_nsr_update["operational-status"] = "failed"
1559 db_nsr_update["config-status"] = "failed"
1560 if db_nslcmop:
1561 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1562 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1563 db_nslcmop_update["statusEnteredTime"] = time()
1564 try:
1565 if db_nsr:
1566 db_nsr_update["_admin.nslcmop"] = None
1567 db_nsr_update["_admin.current-operation"] = None
1568 db_nsr_update["_admin.operation-type"] = None
1569 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1570
1571 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1572 ns_state = None
1573 error_description = None
1574 error_detail = None
1575 if instantiated_ok:
1576 ns_state = "READY"
1577 else:
1578 ns_state = "BROKEN"
1579 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1580 error_detail = error_text
1581 self._write_ns_status(
1582 nsr_id=nsr_id,
1583 ns_state=ns_state,
1584 current_operation="IDLE",
1585 current_operation_id=None,
1586 error_description=error_description,
1587 error_detail=error_detail
1588 )
1589
1590 if db_nslcmop_update:
1591 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1592 except DbException as e:
1593 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1594 if nslcmop_operation_state:
1595 try:
1596 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1597 "operationState": nslcmop_operation_state},
1598 loop=self.loop)
1599 except Exception as e:
1600 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1601
1602 self.logger.debug(logging_text + "Exit")
1603 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1604
1605 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1606 # Launch kdus if present in the descriptor
1607
1608 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1609
1610 def _get_cluster_id(cluster_id, cluster_type):
1611 nonlocal k8scluster_id_2_uuic
1612 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1613 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1614
1615 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1616 if not db_k8scluster:
1617 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1618 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1619 if not k8s_id:
1620 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1621 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1622 return k8s_id
1623
1624 logging_text += "Deploy kdus: "
1625 try:
1626 db_nsr_update = {"_admin.deployed.K8s": []}
1627 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1628
1629 # Look for all vnfds
1630 pending_tasks = {}
1631 index = 0
1632 for vnfr_data in db_vnfrs.values():
1633 for kdur in get_iterable(vnfr_data, "kdur"):
1634 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1635 kdumodel = None
1636 k8sclustertype = None
1637 error_text = None
1638 cluster_uuid = None
1639 if kdur.get("helm-chart"):
1640 kdumodel = kdur["helm-chart"]
1641 k8sclustertype = "chart"
1642 k8sclustertype_full = "helm-chart"
1643 elif kdur.get("juju-bundle"):
1644 kdumodel = kdur["juju-bundle"]
1645 k8sclustertype = "juju"
1646 k8sclustertype_full = "juju-bundle"
1647 else:
1648 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1649 " running"
1650 try:
1651 if not error_text:
1652 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1653 except LcmException as e:
1654 error_text = str(e)
1655 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1656
1657 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1658 "k8scluster-type": k8sclustertype,
1659 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1660 if error_text:
1661 k8s_instace_info["detailed-status"] = error_text
1662 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1663 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1664 if error_text:
1665 continue
1666
1667 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1668 "{}".format(index)}
1669 if k8sclustertype == "chart":
1670 task = asyncio.ensure_future(
1671 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1672 params=desc_params, db_dict=db_dict, timeout=3600)
1673 )
1674 else:
1675 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1676 atomic=True, params=desc_params,
1677 db_dict=db_dict, timeout=600)
1678
1679 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1680 index += 1
1681 if not pending_tasks:
1682 return
1683 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1684 pending_list = list(pending_tasks.keys())
1685 while pending_list:
1686 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1687 return_when=asyncio.FIRST_COMPLETED)
1688 if not done_list: # timeout
1689 for task in pending_list:
1690 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1691 break
1692 for task in done_list:
1693 exc = task.exception()
1694 if exc:
1695 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1696 else:
1697 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1698
1699 except Exception as e:
1700 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1701 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1702 finally:
1703 # TODO Write in data base
1704 if db_nsr_update:
1705 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1706
1707 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1708 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1709 base_folder, task_instantiation_list, task_instantiation_info):
1710 # launch instantiate_N2VC in a asyncio task and register task object
1711 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1712 # if not found, create one entry and update database
1713
1714 # fill db_nsr._admin.deployed.VCA.<index>
1715 vca_index = -1
1716 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1717 if not vca_deployed:
1718 continue
1719 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1720 vca_deployed.get("vdu_id") == vdu_id and \
1721 vca_deployed.get("kdu_name") == kdu_name and \
1722 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1723 break
1724 else:
1725 # not found, create one.
1726 vca_deployed = {
1727 "member-vnf-index": member_vnf_index,
1728 "vdu_id": vdu_id,
1729 "kdu_name": kdu_name,
1730 "vdu_count_index": vdu_index,
1731 "operational-status": "init", # TODO revise
1732 "detailed-status": "", # TODO revise
1733 "step": "initial-deploy", # TODO revise
1734 "vnfd_id": vnfd_id,
1735 "vdu_name": vdu_name,
1736 }
1737 vca_index += 1
1738 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1739 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1740
1741 # Launch task
1742 task_n2vc = asyncio.ensure_future(
1743 self.instantiate_N2VC(
1744 logging_text=logging_text,
1745 vca_index=vca_index,
1746 nsi_id=nsi_id,
1747 db_nsr=db_nsr,
1748 db_vnfr=db_vnfr,
1749 vdu_id=vdu_id,
1750 kdu_name=kdu_name,
1751 vdu_index=vdu_index,
1752 deploy_params=deploy_params,
1753 config_descriptor=descriptor_config,
1754 base_folder=base_folder,
1755 )
1756 )
1757 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1758 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1759 task_instantiation_list.append(task_n2vc)
1760
1761 # Check if this VNFD has a configured terminate action
1762 def _has_terminate_config_primitive(self, vnfd):
1763 vnf_config = vnfd.get("vnf-configuration")
1764 if vnf_config and vnf_config.get("terminate-config-primitive"):
1765 return True
1766 else:
1767 return False
1768
1769 @staticmethod
1770 def _get_terminate_config_primitive_seq_list(vnfd):
1771 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1772 # No need to check for existing primitive twice, already done before
1773 vnf_config = vnfd.get("vnf-configuration")
1774 seq_list = vnf_config.get("terminate-config-primitive")
1775 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1776 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1777 return seq_list_sorted
1778
1779 @staticmethod
1780 def _create_nslcmop(nsr_id, operation, params):
1781 """
1782 Creates a ns-lcm-opp content to be stored at database.
1783 :param nsr_id: internal id of the instance
1784 :param operation: instantiate, terminate, scale, action, ...
1785 :param params: user parameters for the operation
1786 :return: dictionary following SOL005 format
1787 """
1788 # Raise exception if invalid arguments
1789 if not (nsr_id and operation and params):
1790 raise LcmException(
1791 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1792 now = time()
1793 _id = str(uuid4())
1794 nslcmop = {
1795 "id": _id,
1796 "_id": _id,
1797 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1798 "operationState": "PROCESSING",
1799 "statusEnteredTime": now,
1800 "nsInstanceId": nsr_id,
1801 "lcmOperationType": operation,
1802 "startTime": now,
1803 "isAutomaticInvocation": False,
1804 "operationParams": params,
1805 "isCancelPending": False,
1806 "links": {
1807 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1808 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1809 }
1810 }
1811 return nslcmop
1812
1813 def _format_additional_params(self, params):
1814 params = params or {}
1815 for key, value in params.items():
1816 if str(value).startswith("!!yaml "):
1817 params[key] = yaml.safe_load(value[7:])
1818 return params
1819
1820 def _get_terminate_primitive_params(self, seq, vnf_index):
1821 primitive = seq.get('name')
1822 primitive_params = {}
1823 params = {
1824 "member_vnf_index": vnf_index,
1825 "primitive": primitive,
1826 "primitive_params": primitive_params,
1827 }
1828 desc_params = {}
1829 return self._map_primitive_params(seq, params, desc_params)
1830
1831 # sub-operations
1832
1833 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1834 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1835 if (op.get('operationState') == 'COMPLETED'):
1836 # b. Skip sub-operation
1837 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1838 return self.SUBOPERATION_STATUS_SKIP
1839 else:
1840 # c. Reintent executing sub-operation
1841 # The sub-operation exists, and operationState != 'COMPLETED'
1842 # Update operationState = 'PROCESSING' to indicate a reintent.
1843 operationState = 'PROCESSING'
1844 detailed_status = 'In progress'
1845 self._update_suboperation_status(
1846 db_nslcmop, op_index, operationState, detailed_status)
1847 # Return the sub-operation index
1848 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1849 # with arguments extracted from the sub-operation
1850 return op_index
1851
1852 # Find a sub-operation where all keys in a matching dictionary must match
1853 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1854 def _find_suboperation(self, db_nslcmop, match):
1855 if (db_nslcmop and match):
1856 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1857 for i, op in enumerate(op_list):
1858 if all(op.get(k) == match[k] for k in match):
1859 return i
1860 return self.SUBOPERATION_STATUS_NOT_FOUND
1861
1862 # Update status for a sub-operation given its index
1863 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1864 # Update DB for HA tasks
1865 q_filter = {'_id': db_nslcmop['_id']}
1866 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1867 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1868 self.db.set_one("nslcmops",
1869 q_filter=q_filter,
1870 update_dict=update_dict,
1871 fail_on_empty=False)
1872
1873 # Add sub-operation, return the index of the added sub-operation
1874 # Optionally, set operationState, detailed-status, and operationType
1875 # Status and type are currently set for 'scale' sub-operations:
1876 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1877 # 'detailed-status' : status message
1878 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1879 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1880 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1881 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1882 RO_nsr_id=None, RO_scaling_info=None):
1883 if not (db_nslcmop):
1884 return self.SUBOPERATION_STATUS_NOT_FOUND
1885 # Get the "_admin.operations" list, if it exists
1886 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1887 op_list = db_nslcmop_admin.get('operations')
1888 # Create or append to the "_admin.operations" list
1889 new_op = {'member_vnf_index': vnf_index,
1890 'vdu_id': vdu_id,
1891 'vdu_count_index': vdu_count_index,
1892 'primitive': primitive,
1893 'primitive_params': mapped_primitive_params}
1894 if operationState:
1895 new_op['operationState'] = operationState
1896 if detailed_status:
1897 new_op['detailed-status'] = detailed_status
1898 if operationType:
1899 new_op['lcmOperationType'] = operationType
1900 if RO_nsr_id:
1901 new_op['RO_nsr_id'] = RO_nsr_id
1902 if RO_scaling_info:
1903 new_op['RO_scaling_info'] = RO_scaling_info
1904 if not op_list:
1905 # No existing operations, create key 'operations' with current operation as first list element
1906 db_nslcmop_admin.update({'operations': [new_op]})
1907 op_list = db_nslcmop_admin.get('operations')
1908 else:
1909 # Existing operations, append operation to list
1910 op_list.append(new_op)
1911
1912 db_nslcmop_update = {'_admin.operations': op_list}
1913 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1914 op_index = len(op_list) - 1
1915 return op_index
1916
1917 # Helper methods for scale() sub-operations
1918
1919 # pre-scale/post-scale:
1920 # Check for 3 different cases:
1921 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1922 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1923 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1924 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1925 operationType, RO_nsr_id=None, RO_scaling_info=None):
1926 # Find this sub-operation
1927 if (RO_nsr_id and RO_scaling_info):
1928 operationType = 'SCALE-RO'
1929 match = {
1930 'member_vnf_index': vnf_index,
1931 'RO_nsr_id': RO_nsr_id,
1932 'RO_scaling_info': RO_scaling_info,
1933 }
1934 else:
1935 match = {
1936 'member_vnf_index': vnf_index,
1937 'primitive': vnf_config_primitive,
1938 'primitive_params': primitive_params,
1939 'lcmOperationType': operationType
1940 }
1941 op_index = self._find_suboperation(db_nslcmop, match)
1942 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1943 # a. New sub-operation
1944 # The sub-operation does not exist, add it.
1945 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1946 # The following parameters are set to None for all kind of scaling:
1947 vdu_id = None
1948 vdu_count_index = None
1949 vdu_name = None
1950 if (RO_nsr_id and RO_scaling_info):
1951 vnf_config_primitive = None
1952 primitive_params = None
1953 else:
1954 RO_nsr_id = None
1955 RO_scaling_info = None
1956 # Initial status for sub-operation
1957 operationState = 'PROCESSING'
1958 detailed_status = 'In progress'
1959 # Add sub-operation for pre/post-scaling (zero or more operations)
1960 self._add_suboperation(db_nslcmop,
1961 vnf_index,
1962 vdu_id,
1963 vdu_count_index,
1964 vdu_name,
1965 vnf_config_primitive,
1966 primitive_params,
1967 operationState,
1968 detailed_status,
1969 operationType,
1970 RO_nsr_id,
1971 RO_scaling_info)
1972 return self.SUBOPERATION_STATUS_NEW
1973 else:
1974 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1975 # or op_index (operationState != 'COMPLETED')
1976 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1977
1978 # Helper methods for terminate()
1979
1980 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1981 """ Create a primitive with params from VNFD
1982 Called from terminate() before deleting instance
1983 Calls action() to execute the primitive """
1984 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1985 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1986 db_vnfds = {}
1987 # Loop over VNFRs
1988 for vnfr in db_vnfrs_list:
1989 vnfd_id = vnfr["vnfd-id"]
1990 vnf_index = vnfr["member-vnf-index-ref"]
1991 if vnfd_id not in db_vnfds:
1992 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1993 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1994 db_vnfds[vnfd_id] = vnfd
1995 vnfd = db_vnfds[vnfd_id]
1996 if not self._has_terminate_config_primitive(vnfd):
1997 continue
1998 # Get the primitive's sorted sequence list
1999 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2000 for seq in seq_list:
2001 # For each sequence in list, get primitive and call _ns_execute_primitive()
2002 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2003 vnf_index, seq.get("name"))
2004 self.logger.debug(logging_text + step)
2005 # Create the primitive for each sequence, i.e. "primitive": "touch"
2006 primitive = seq.get('name')
2007 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2008 # The following 3 parameters are currently set to None for 'terminate':
2009 # vdu_id, vdu_count_index, vdu_name
2010 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2011 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2012 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2013 # Add sub-operation
2014 self._add_suboperation(db_nslcmop,
2015 nslcmop_id,
2016 vnf_index,
2017 vdu_id,
2018 vdu_count_index,
2019 vdu_name,
2020 primitive,
2021 mapped_primitive_params)
2022 # Sub-operations: Call _ns_execute_primitive() instead of action()
2023 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2024 # nsr_deployed = db_nsr["_admin"]["deployed"]
2025
2026 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2027 # nsr_id, nslcmop_terminate_action_id)
2028 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2029 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2030 # if result not in result_ok:
2031 # raise LcmException(
2032 # "terminate_primitive_action for vnf_member_index={}",
2033 # " primitive={} fails with error {}".format(
2034 # vnf_index, seq.get("name"), result_detail))
2035
2036 # TODO: find ee_id
2037 ee_id = None
2038 try:
2039 await self.n2vc.exec_primitive(
2040 ee_id=ee_id,
2041 primitive_name=primitive,
2042 params_dict=mapped_primitive_params
2043 )
2044 except Exception as e:
2045 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2046 raise LcmException(
2047 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2048 .format(vnf_index, seq.get("name"), e),
2049 )
2050
2051 async def terminate(self, nsr_id, nslcmop_id):
2052
2053 # Try to lock HA task here
2054 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2055 if not task_is_locked_by_me:
2056 return
2057
2058 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2059 self.logger.debug(logging_text + "Enter")
2060 db_nsr = None
2061 db_nslcmop = None
2062 exc = None
2063 failed_detail = [] # annotates all failed error messages
2064 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2065 "_admin.current-operation": nslcmop_id,
2066 "_admin.operation-type": "terminate"}
2067 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2068 db_nslcmop_update = {}
2069 nslcmop_operation_state = None
2070 autoremove = False # autoremove after terminated
2071 pending_tasks = []
2072 try:
2073 # wait for any previous tasks in process
2074 step = "Waiting for previous operations to terminate"
2075 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2076
2077 self._write_ns_status(
2078 nsr_id=nsr_id,
2079 ns_state="TERMINATING",
2080 current_operation="TERMINATING",
2081 current_operation_id=nslcmop_id
2082 )
2083
2084 step = "Getting nslcmop={} from db".format(nslcmop_id)
2085 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2086 step = "Getting nsr={} from db".format(nsr_id)
2087 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2088 # nsd = db_nsr["nsd"]
2089 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2090 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2091 return
2092 # #TODO check if VIM is creating and wait
2093 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2094 # Call internal terminate action
2095 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2096
2097 pending_tasks = []
2098
2099 db_nsr_update["operational-status"] = "terminating"
2100 db_nsr_update["config-status"] = "terminating"
2101
2102 # remove NS
2103 try:
2104 step = "delete execution environment"
2105 self.logger.debug(logging_text + step)
2106
2107 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2108 pending_tasks.append(task_delete_ee)
2109 except Exception as e:
2110 msg = "Failed while deleting NS in VCA: {}".format(e)
2111 self.logger.error(msg)
2112 failed_detail.append(msg)
2113
2114 try:
2115 # Delete from k8scluster
2116 step = "delete kdus"
2117 self.logger.debug(logging_text + step)
2118 # print(nsr_deployed)
2119 if nsr_deployed:
2120 for kdu in nsr_deployed.get("K8s", ()):
2121 kdu_instance = kdu.get("kdu-instance")
2122 if not kdu_instance:
2123 continue
2124 if kdu.get("k8scluster-type") == "chart":
2125 task_delete_kdu_instance = asyncio.ensure_future(
2126 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2127 kdu_instance=kdu_instance))
2128 elif kdu.get("k8scluster-type") == "juju":
2129 task_delete_kdu_instance = asyncio.ensure_future(
2130 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2131 kdu_instance=kdu_instance))
2132 else:
2133 self.error(logging_text + "Unknown k8s deployment type {}".
2134 format(kdu.get("k8scluster-type")))
2135 continue
2136 pending_tasks.append(task_delete_kdu_instance)
2137 except LcmException as e:
2138 msg = "Failed while deleting KDUs from NS: {}".format(e)
2139 self.logger.error(msg)
2140 failed_detail.append(msg)
2141
2142 # remove from RO
2143 RO_fail = False
2144
2145 # Delete ns
2146 RO_nsr_id = RO_delete_action = None
2147 if nsr_deployed and nsr_deployed.get("RO"):
2148 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2149 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2150 try:
2151 if RO_nsr_id:
2152 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2153 "Deleting ns from VIM"
2154 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2155 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2156 self.logger.debug(logging_text + step)
2157 desc = await self.RO.delete("ns", RO_nsr_id)
2158 RO_delete_action = desc["action_id"]
2159 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2160 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2161 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2162 if RO_delete_action:
2163 # wait until NS is deleted from VIM
2164 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2165 format(RO_nsr_id, RO_delete_action)
2166 detailed_status_old = None
2167 self.logger.debug(logging_text + step)
2168
2169 delete_timeout = 20 * 60 # 20 minutes
2170 while delete_timeout > 0:
2171 desc = await self.RO.show(
2172 "ns",
2173 item_id_name=RO_nsr_id,
2174 extra_item="action",
2175 extra_item_id=RO_delete_action)
2176 ns_status, ns_status_info = self.RO.check_action_status(desc)
2177 if ns_status == "ERROR":
2178 raise ROclient.ROClientException(ns_status_info)
2179 elif ns_status == "BUILD":
2180 detailed_status = step + "; {}".format(ns_status_info)
2181 elif ns_status == "ACTIVE":
2182 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2183 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2184 break
2185 else:
2186 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2187 if detailed_status != detailed_status_old:
2188 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2189 db_nsr_update["detailed-status"] = detailed_status
2190 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2191 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2192 await asyncio.sleep(5, loop=self.loop)
2193 delete_timeout -= 5
2194 else: # delete_timeout <= 0:
2195 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2196
2197 except ROclient.ROClientException as e:
2198 if e.http_code == 404: # not found
2199 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2200 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2201 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2202 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2203 elif e.http_code == 409: # conflict
2204 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2205 self.logger.debug(logging_text + failed_detail[-1])
2206 RO_fail = True
2207 else:
2208 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2209 self.logger.error(logging_text + failed_detail[-1])
2210 RO_fail = True
2211
2212 # Delete nsd
2213 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2214 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2215 try:
2216 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2217 "Deleting nsd from RO"
2218 await self.RO.delete("nsd", RO_nsd_id)
2219 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2220 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2221 except ROclient.ROClientException as e:
2222 if e.http_code == 404: # not found
2223 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2224 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2225 elif e.http_code == 409: # conflict
2226 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2227 self.logger.debug(logging_text + failed_detail[-1])
2228 RO_fail = True
2229 else:
2230 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2231 self.logger.error(logging_text + failed_detail[-1])
2232 RO_fail = True
2233
2234 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2235 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2236 if not vnf_deployed or not vnf_deployed["id"]:
2237 continue
2238 try:
2239 RO_vnfd_id = vnf_deployed["id"]
2240 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2241 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2242 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2243 await self.RO.delete("vnfd", RO_vnfd_id)
2244 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2245 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2246 except ROclient.ROClientException as e:
2247 if e.http_code == 404: # not found
2248 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2249 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2250 elif e.http_code == 409: # conflict
2251 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2252 self.logger.debug(logging_text + failed_detail[-1])
2253 else:
2254 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2255 self.logger.error(logging_text + failed_detail[-1])
2256
2257 if failed_detail:
2258 terminate_ok = False
2259 self.logger.error(logging_text + " ;".join(failed_detail))
2260 db_nsr_update["operational-status"] = "failed"
2261 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2262 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2263 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2264 db_nslcmop_update["statusEnteredTime"] = time()
2265 else:
2266 terminate_ok = True
2267 db_nsr_update["operational-status"] = "terminated"
2268 db_nsr_update["detailed-status"] = "Done"
2269 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2270 db_nslcmop_update["detailed-status"] = "Done"
2271 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2272 db_nslcmop_update["statusEnteredTime"] = time()
2273 if db_nslcmop["operationParams"].get("autoremove"):
2274 autoremove = True
2275
2276 except (ROclient.ROClientException, DbException, LcmException) as e:
2277 self.logger.error(logging_text + "Exit Exception {}".format(e))
2278 exc = e
2279 except asyncio.CancelledError:
2280 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2281 exc = "Operation was cancelled"
2282 except Exception as e:
2283 exc = traceback.format_exc()
2284 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2285 finally:
2286 if exc and db_nslcmop:
2287 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2288 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2289 db_nslcmop_update["statusEnteredTime"] = time()
2290 try:
2291 if db_nslcmop and db_nslcmop_update:
2292 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2293 if db_nsr:
2294 db_nsr_update["_admin.nslcmop"] = None
2295 db_nsr_update["_admin.current-operation"] = None
2296 db_nsr_update["_admin.operation-type"] = None
2297 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2298
2299 if terminate_ok:
2300 ns_state = "IDLE"
2301 error_description = None
2302 error_detail = None
2303 else:
2304 ns_state = "BROKEN"
2305 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2306 error_detail = "; ".join(failed_detail)
2307
2308 self._write_ns_status(
2309 nsr_id=nsr_id,
2310 ns_state=ns_state,
2311 current_operation="IDLE",
2312 current_operation_id=None,
2313 error_description=error_description,
2314 error_detail=error_detail
2315 )
2316
2317 except DbException as e:
2318 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2319 if nslcmop_operation_state:
2320 try:
2321 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2322 "operationState": nslcmop_operation_state,
2323 "autoremove": autoremove},
2324 loop=self.loop)
2325 except Exception as e:
2326 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2327
2328 # wait for pending tasks
2329 done = None
2330 pending = None
2331 if pending_tasks:
2332 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2333 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2334 if not pending:
2335 self.logger.debug(logging_text + 'All tasks finished...')
2336 else:
2337 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2338
2339 self.logger.debug(logging_text + "Exit")
2340 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2341
2342 @staticmethod
2343 def _map_primitive_params(primitive_desc, params, instantiation_params):
2344 """
2345 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2346 The default-value is used. If it is between < > it look for a value at instantiation_params
2347 :param primitive_desc: portion of VNFD/NSD that describes primitive
2348 :param params: Params provided by user
2349 :param instantiation_params: Instantiation params provided by user
2350 :return: a dictionary with the calculated params
2351 """
2352 calculated_params = {}
2353 for parameter in primitive_desc.get("parameter", ()):
2354 param_name = parameter["name"]
2355 if param_name in params:
2356 calculated_params[param_name] = params[param_name]
2357 elif "default-value" in parameter or "value" in parameter:
2358 if "value" in parameter:
2359 calculated_params[param_name] = parameter["value"]
2360 else:
2361 calculated_params[param_name] = parameter["default-value"]
2362 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2363 and calculated_params[param_name].endswith(">"):
2364 if calculated_params[param_name][1:-1] in instantiation_params:
2365 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2366 else:
2367 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2368 format(calculated_params[param_name], primitive_desc["name"]))
2369 else:
2370 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2371 format(param_name, primitive_desc["name"]))
2372
2373 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2374 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2375 width=256)
2376 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2377 calculated_params[param_name] = calculated_params[param_name][7:]
2378
2379 # add always ns_config_info if primitive name is config
2380 if primitive_desc["name"] == "config":
2381 if "ns_config_info" in instantiation_params:
2382 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2383 return calculated_params
2384
2385 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2386 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2387
2388 # find vca_deployed record for this action
2389 try:
2390 for vca_deployed in db_deployed["VCA"]:
2391 if not vca_deployed:
2392 continue
2393 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2394 continue
2395 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2396 continue
2397 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2398 continue
2399 break
2400 else:
2401 # vca_deployed not found
2402 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2403 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2404
2405 # get ee_id
2406 ee_id = vca_deployed.get("ee_id")
2407 if not ee_id:
2408 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2409 "execution environment"
2410 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2411
2412 if primitive == "config":
2413 primitive_params = {"params": primitive_params}
2414
2415 while retries >= 0:
2416 try:
2417 output = await self.n2vc.exec_primitive(
2418 ee_id=ee_id,
2419 primitive_name=primitive,
2420 params_dict=primitive_params
2421 )
2422 # execution was OK
2423 break
2424 except Exception as e:
2425 retries -= 1
2426 if retries >= 0:
2427 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2428 # wait and retry
2429 await asyncio.sleep(retries_interval, loop=self.loop)
2430 else:
2431 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2432
2433 return output, 'OK'
2434
2435 except Exception as e:
2436 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2437
2438 async def action(self, nsr_id, nslcmop_id):
2439
2440 # Try to lock HA task here
2441 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2442 if not task_is_locked_by_me:
2443 return
2444
2445 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2446 self.logger.debug(logging_text + "Enter")
2447 # get all needed from database
2448 db_nsr = None
2449 db_nslcmop = None
2450 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2451 "_admin.current-operation": nslcmop_id,
2452 "_admin.operation-type": "action"}
2453 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2454 db_nslcmop_update = {}
2455 nslcmop_operation_state = None
2456 nslcmop_operation_state_detail = None
2457 exc = None
2458 try:
2459 # wait for any previous tasks in process
2460 step = "Waiting for previous operations to terminate"
2461 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2462
2463 self._write_ns_status(
2464 nsr_id=nsr_id,
2465 ns_state=None,
2466 current_operation="RUNNING ACTION",
2467 current_operation_id=nslcmop_id
2468 )
2469
2470 step = "Getting information from database"
2471 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2472 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2473
2474 nsr_deployed = db_nsr["_admin"].get("deployed")
2475 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2476 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2477 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2478 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2479 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2480
2481 if vnf_index:
2482 step = "Getting vnfr from database"
2483 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2484 step = "Getting vnfd from database"
2485 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2486 else:
2487 if db_nsr.get("nsd"):
2488 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2489 else:
2490 step = "Getting nsd from database"
2491 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2492
2493 # for backward compatibility
2494 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2495 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2496 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2497 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2498
2499 primitive = db_nslcmop["operationParams"]["primitive"]
2500 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2501
2502 # look for primitive
2503 config_primitive_desc = None
2504 if vdu_id:
2505 for vdu in get_iterable(db_vnfd, "vdu"):
2506 if vdu_id == vdu["id"]:
2507 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2508 if config_primitive["name"] == primitive:
2509 config_primitive_desc = config_primitive
2510 break
2511 elif kdu_name:
2512 self.logger.debug(logging_text + "Checking actions in KDUs")
2513 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2514 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2515 if primitive_params:
2516 desc_params.update(primitive_params)
2517 # TODO Check if we will need something at vnf level
2518 index = 0
2519 for kdu in get_iterable(nsr_deployed, "K8s"):
2520 if kdu_name == kdu["kdu-name"]:
2521 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2522 "path": "_admin.deployed.K8s.{}".format(index)}
2523 if primitive == "upgrade":
2524 if desc_params.get("kdu_model"):
2525 kdu_model = desc_params.get("kdu_model")
2526 del desc_params["kdu_model"]
2527 else:
2528 kdu_model = kdu.get("kdu-model")
2529 parts = kdu_model.split(sep=":")
2530 if len(parts) == 2:
2531 kdu_model = parts[0]
2532
2533 if kdu.get("k8scluster-type") == "chart":
2534 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2535 kdu_instance=kdu.get("kdu-instance"),
2536 atomic=True, kdu_model=kdu_model,
2537 params=desc_params, db_dict=db_dict,
2538 timeout=300)
2539 elif kdu.get("k8scluster-type") == "juju":
2540 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2541 kdu_instance=kdu.get("kdu-instance"),
2542 atomic=True, kdu_model=kdu_model,
2543 params=desc_params, db_dict=db_dict,
2544 timeout=300)
2545
2546 else:
2547 msg = "k8scluster-type not defined"
2548 raise LcmException(msg)
2549
2550 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2551 break
2552 elif primitive == "rollback":
2553 if kdu.get("k8scluster-type") == "chart":
2554 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2555 kdu_instance=kdu.get("kdu-instance"),
2556 db_dict=db_dict)
2557 elif kdu.get("k8scluster-type") == "juju":
2558 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2559 kdu_instance=kdu.get("kdu-instance"),
2560 db_dict=db_dict)
2561 else:
2562 msg = "k8scluster-type not defined"
2563 raise LcmException(msg)
2564 break
2565 elif primitive == "status":
2566 if kdu.get("k8scluster-type") == "chart":
2567 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2568 kdu_instance=kdu.get("kdu-instance"))
2569 elif kdu.get("k8scluster-type") == "juju":
2570 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2571 kdu_instance=kdu.get("kdu-instance"))
2572 else:
2573 msg = "k8scluster-type not defined"
2574 raise LcmException(msg)
2575 break
2576 index += 1
2577
2578 else:
2579 raise LcmException("KDU '{}' not found".format(kdu_name))
2580 if output:
2581 db_nslcmop_update["detailed-status"] = output
2582 db_nslcmop_update["operationState"] = 'COMPLETED'
2583 db_nslcmop_update["statusEnteredTime"] = time()
2584 else:
2585 db_nslcmop_update["detailed-status"] = ''
2586 db_nslcmop_update["operationState"] = 'FAILED'
2587 db_nslcmop_update["statusEnteredTime"] = time()
2588 return
2589 elif vnf_index:
2590 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2591 if config_primitive["name"] == primitive:
2592 config_primitive_desc = config_primitive
2593 break
2594 else:
2595 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2596 if config_primitive["name"] == primitive:
2597 config_primitive_desc = config_primitive
2598 break
2599
2600 if not config_primitive_desc:
2601 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2602 format(primitive))
2603
2604 desc_params = {}
2605 if vnf_index:
2606 if db_vnfr.get("additionalParamsForVnf"):
2607 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2608 if vdu_id:
2609 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2610 if vdur.get("additionalParams"):
2611 desc_params = self._format_additional_params(vdur["additionalParams"])
2612 else:
2613 if db_nsr.get("additionalParamsForNs"):
2614 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2615
2616 # TODO check if ns is in a proper status
2617 output, detail = await self._ns_execute_primitive(
2618 db_deployed=nsr_deployed,
2619 member_vnf_index=vnf_index,
2620 vdu_id=vdu_id,
2621 vdu_name=vdu_name,
2622 vdu_count_index=vdu_count_index,
2623 primitive=primitive,
2624 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2625
2626 detailed_status = output
2627 if detail == 'OK':
2628 result = 'COMPLETED'
2629 else:
2630 result = 'FAILED'
2631
2632 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2633 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2634 db_nslcmop_update["statusEnteredTime"] = time()
2635 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2636 return # database update is called inside finally
2637
2638 except (DbException, LcmException) as e:
2639 self.logger.error(logging_text + "Exit Exception {}".format(e))
2640 exc = e
2641 except asyncio.CancelledError:
2642 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2643 exc = "Operation was cancelled"
2644 except Exception as e:
2645 exc = traceback.format_exc()
2646 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2647 finally:
2648 if exc and db_nslcmop:
2649 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2650 "FAILED {}: {}".format(step, exc)
2651 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2652 db_nslcmop_update["statusEnteredTime"] = time()
2653 try:
2654 if db_nslcmop_update:
2655 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2656 if db_nsr:
2657 db_nsr_update["_admin.nslcmop"] = None
2658 db_nsr_update["_admin.operation-type"] = None
2659 db_nsr_update["_admin.nslcmop"] = None
2660 db_nsr_update["_admin.current-operation"] = None
2661 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2662 self._write_ns_status(
2663 nsr_id=nsr_id,
2664 ns_state=None,
2665 current_operation="IDLE",
2666 current_operation_id=None
2667 )
2668 except DbException as e:
2669 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2670 self.logger.debug(logging_text + "Exit")
2671 if nslcmop_operation_state:
2672 try:
2673 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2674 "operationState": nslcmop_operation_state},
2675 loop=self.loop)
2676 except Exception as e:
2677 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2678 self.logger.debug(logging_text + "Exit")
2679 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2680 return nslcmop_operation_state, nslcmop_operation_state_detail
2681
2682 async def scale(self, nsr_id, nslcmop_id):
2683
2684 # Try to lock HA task here
2685 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2686 if not task_is_locked_by_me:
2687 return
2688
2689 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2690 self.logger.debug(logging_text + "Enter")
2691 # get all needed from database
2692 db_nsr = None
2693 db_nslcmop = None
2694 db_nslcmop_update = {}
2695 nslcmop_operation_state = None
2696 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2697 "_admin.current-operation": nslcmop_id,
2698 "_admin.operation-type": "scale"}
2699 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2700 exc = None
2701 # in case of error, indicates what part of scale was failed to put nsr at error status
2702 scale_process = None
2703 old_operational_status = ""
2704 old_config_status = ""
2705 vnfr_scaled = False
2706 try:
2707 # wait for any previous tasks in process
2708 step = "Waiting for previous operations to terminate"
2709 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2710
2711 self._write_ns_status(
2712 nsr_id=nsr_id,
2713 ns_state=None,
2714 current_operation="SCALING",
2715 current_operation_id=nslcmop_id
2716 )
2717
2718 step = "Getting nslcmop from database"
2719 self.logger.debug(step + " after having waited for previous tasks to be completed")
2720 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2721 step = "Getting nsr from database"
2722 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2723
2724 old_operational_status = db_nsr["operational-status"]
2725 old_config_status = db_nsr["config-status"]
2726 step = "Parsing scaling parameters"
2727 # self.logger.debug(step)
2728 db_nsr_update["operational-status"] = "scaling"
2729 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2730 nsr_deployed = db_nsr["_admin"].get("deployed")
2731
2732 #######
2733 nsr_deployed = db_nsr["_admin"].get("deployed")
2734 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2735 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2736 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2737 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2738 #######
2739
2740 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2741 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2742 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2743 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2744 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2745
2746 # for backward compatibility
2747 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2748 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2749 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2750 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2751
2752 step = "Getting vnfr from database"
2753 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2754 step = "Getting vnfd from database"
2755 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2756
2757 step = "Getting scaling-group-descriptor"
2758 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2759 if scaling_descriptor["name"] == scaling_group:
2760 break
2761 else:
2762 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2763 "at vnfd:scaling-group-descriptor".format(scaling_group))
2764
2765 # cooldown_time = 0
2766 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2767 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2768 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2769 # break
2770
2771 # TODO check if ns is in a proper status
2772 step = "Sending scale order to VIM"
2773 nb_scale_op = 0
2774 if not db_nsr["_admin"].get("scaling-group"):
2775 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2776 admin_scale_index = 0
2777 else:
2778 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2779 if admin_scale_info["name"] == scaling_group:
2780 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2781 break
2782 else: # not found, set index one plus last element and add new entry with the name
2783 admin_scale_index += 1
2784 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2785 RO_scaling_info = []
2786 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2787 if scaling_type == "SCALE_OUT":
2788 # count if max-instance-count is reached
2789 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2790 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2791 if nb_scale_op >= max_instance_count:
2792 raise LcmException("reached the limit of {} (max-instance-count) "
2793 "scaling-out operations for the "
2794 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2795
2796 nb_scale_op += 1
2797 vdu_scaling_info["scaling_direction"] = "OUT"
2798 vdu_scaling_info["vdu-create"] = {}
2799 for vdu_scale_info in scaling_descriptor["vdu"]:
2800 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2801 "type": "create", "count": vdu_scale_info.get("count", 1)})
2802 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2803
2804 elif scaling_type == "SCALE_IN":
2805 # count if min-instance-count is reached
2806 min_instance_count = 0
2807 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2808 min_instance_count = int(scaling_descriptor["min-instance-count"])
2809 if nb_scale_op <= min_instance_count:
2810 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2811 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2812 nb_scale_op -= 1
2813 vdu_scaling_info["scaling_direction"] = "IN"
2814 vdu_scaling_info["vdu-delete"] = {}
2815 for vdu_scale_info in scaling_descriptor["vdu"]:
2816 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2817 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2818 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2819
2820 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2821 vdu_create = vdu_scaling_info.get("vdu-create")
2822 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2823 if vdu_scaling_info["scaling_direction"] == "IN":
2824 for vdur in reversed(db_vnfr["vdur"]):
2825 if vdu_delete.get(vdur["vdu-id-ref"]):
2826 vdu_delete[vdur["vdu-id-ref"]] -= 1
2827 vdu_scaling_info["vdu"].append({
2828 "name": vdur["name"],
2829 "vdu_id": vdur["vdu-id-ref"],
2830 "interface": []
2831 })
2832 for interface in vdur["interfaces"]:
2833 vdu_scaling_info["vdu"][-1]["interface"].append({
2834 "name": interface["name"],
2835 "ip_address": interface["ip-address"],
2836 "mac_address": interface.get("mac-address"),
2837 })
2838 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2839
2840 # PRE-SCALE BEGIN
2841 step = "Executing pre-scale vnf-config-primitive"
2842 if scaling_descriptor.get("scaling-config-action"):
2843 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2844 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2845 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2846 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2847 step = db_nslcmop_update["detailed-status"] = \
2848 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2849
2850 # look for primitive
2851 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2852 if config_primitive["name"] == vnf_config_primitive:
2853 break
2854 else:
2855 raise LcmException(
2856 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2857 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2858 "primitive".format(scaling_group, config_primitive))
2859
2860 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2861 if db_vnfr.get("additionalParamsForVnf"):
2862 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2863
2864 scale_process = "VCA"
2865 db_nsr_update["config-status"] = "configuring pre-scaling"
2866 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2867
2868 # Pre-scale reintent check: Check if this sub-operation has been executed before
2869 op_index = self._check_or_add_scale_suboperation(
2870 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2871 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2872 # Skip sub-operation
2873 result = 'COMPLETED'
2874 result_detail = 'Done'
2875 self.logger.debug(logging_text +
2876 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2877 vnf_config_primitive, result, result_detail))
2878 else:
2879 if (op_index == self.SUBOPERATION_STATUS_NEW):
2880 # New sub-operation: Get index of this sub-operation
2881 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2882 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2883 format(vnf_config_primitive))
2884 else:
2885 # Reintent: Get registered params for this existing sub-operation
2886 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2887 vnf_index = op.get('member_vnf_index')
2888 vnf_config_primitive = op.get('primitive')
2889 primitive_params = op.get('primitive_params')
2890 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2891 format(vnf_config_primitive))
2892 # Execute the primitive, either with new (first-time) or registered (reintent) args
2893 result, result_detail = await self._ns_execute_primitive(
2894 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2895 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2896 vnf_config_primitive, result, result_detail))
2897 # Update operationState = COMPLETED | FAILED
2898 self._update_suboperation_status(
2899 db_nslcmop, op_index, result, result_detail)
2900
2901 if result == "FAILED":
2902 raise LcmException(result_detail)
2903 db_nsr_update["config-status"] = old_config_status
2904 scale_process = None
2905 # PRE-SCALE END
2906
2907 # SCALE RO - BEGIN
2908 # Should this block be skipped if 'RO_nsr_id' == None ?
2909 # if (RO_nsr_id and RO_scaling_info):
2910 if RO_scaling_info:
2911 scale_process = "RO"
2912 # Scale RO reintent check: Check if this sub-operation has been executed before
2913 op_index = self._check_or_add_scale_suboperation(
2914 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2915 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2916 # Skip sub-operation
2917 result = 'COMPLETED'
2918 result_detail = 'Done'
2919 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2920 result, result_detail))
2921 else:
2922 if (op_index == self.SUBOPERATION_STATUS_NEW):
2923 # New sub-operation: Get index of this sub-operation
2924 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2925 self.logger.debug(logging_text + "New sub-operation RO")
2926 else:
2927 # Reintent: Get registered params for this existing sub-operation
2928 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2929 RO_nsr_id = op.get('RO_nsr_id')
2930 RO_scaling_info = op.get('RO_scaling_info')
2931 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2932 vnf_config_primitive))
2933
2934 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2935 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2936 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2937 # wait until ready
2938 RO_nslcmop_id = RO_desc["instance_action_id"]
2939 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2940
2941 RO_task_done = False
2942 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2943 detailed_status_old = None
2944 self.logger.debug(logging_text + step)
2945
2946 deployment_timeout = 1 * 3600 # One hour
2947 while deployment_timeout > 0:
2948 if not RO_task_done:
2949 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2950 extra_item_id=RO_nslcmop_id)
2951 ns_status, ns_status_info = self.RO.check_action_status(desc)
2952 if ns_status == "ERROR":
2953 raise ROclient.ROClientException(ns_status_info)
2954 elif ns_status == "BUILD":
2955 detailed_status = step + "; {}".format(ns_status_info)
2956 elif ns_status == "ACTIVE":
2957 RO_task_done = True
2958 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2959 self.logger.debug(logging_text + step)
2960 else:
2961 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2962 else:
2963
2964 if ns_status == "ERROR":
2965 raise ROclient.ROClientException(ns_status_info)
2966 elif ns_status == "BUILD":
2967 detailed_status = step + "; {}".format(ns_status_info)
2968 elif ns_status == "ACTIVE":
2969 step = detailed_status = \
2970 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2971 if not vnfr_scaled:
2972 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2973 vnfr_scaled = True
2974 try:
2975 desc = await self.RO.show("ns", RO_nsr_id)
2976 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2977 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2978 break
2979 except LcmExceptionNoMgmtIP:
2980 pass
2981 else:
2982 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2983 if detailed_status != detailed_status_old:
2984 self._update_suboperation_status(
2985 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2986 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2987 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2988
2989 await asyncio.sleep(5, loop=self.loop)
2990 deployment_timeout -= 5
2991 if deployment_timeout <= 0:
2992 self._update_suboperation_status(
2993 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2994 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2995
2996 # update VDU_SCALING_INFO with the obtained ip_addresses
2997 if vdu_scaling_info["scaling_direction"] == "OUT":
2998 for vdur in reversed(db_vnfr["vdur"]):
2999 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3000 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3001 vdu_scaling_info["vdu"].append({
3002 "name": vdur["name"],
3003 "vdu_id": vdur["vdu-id-ref"],
3004 "interface": []
3005 })
3006 for interface in vdur["interfaces"]:
3007 vdu_scaling_info["vdu"][-1]["interface"].append({
3008 "name": interface["name"],
3009 "ip_address": interface["ip-address"],
3010 "mac_address": interface.get("mac-address"),
3011 })
3012 del vdu_scaling_info["vdu-create"]
3013
3014 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3015 # SCALE RO - END
3016
3017 scale_process = None
3018 if db_nsr_update:
3019 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3020
3021 # POST-SCALE BEGIN
3022 # execute primitive service POST-SCALING
3023 step = "Executing post-scale vnf-config-primitive"
3024 if scaling_descriptor.get("scaling-config-action"):
3025 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3026 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3027 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3028 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3029 step = db_nslcmop_update["detailed-status"] = \
3030 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3031
3032 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3033 if db_vnfr.get("additionalParamsForVnf"):
3034 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3035
3036 # look for primitive
3037 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3038 if config_primitive["name"] == vnf_config_primitive:
3039 break
3040 else:
3041 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3042 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3043 "match any vnf-configuration:config-primitive".format(scaling_group,
3044 config_primitive))
3045 scale_process = "VCA"
3046 db_nsr_update["config-status"] = "configuring post-scaling"
3047 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3048
3049 # Post-scale reintent check: Check if this sub-operation has been executed before
3050 op_index = self._check_or_add_scale_suboperation(
3051 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3052 if op_index == self.SUBOPERATION_STATUS_SKIP:
3053 # Skip sub-operation
3054 result = 'COMPLETED'
3055 result_detail = 'Done'
3056 self.logger.debug(logging_text +
3057 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3058 format(vnf_config_primitive, result, result_detail))
3059 else:
3060 if op_index == self.SUBOPERATION_STATUS_NEW:
3061 # New sub-operation: Get index of this sub-operation
3062 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3063 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3064 format(vnf_config_primitive))
3065 else:
3066 # Reintent: Get registered params for this existing sub-operation
3067 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3068 vnf_index = op.get('member_vnf_index')
3069 vnf_config_primitive = op.get('primitive')
3070 primitive_params = op.get('primitive_params')
3071 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3072 format(vnf_config_primitive))
3073 # Execute the primitive, either with new (first-time) or registered (reintent) args
3074 result, result_detail = await self._ns_execute_primitive(
3075 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3076 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3077 vnf_config_primitive, result, result_detail))
3078 # Update operationState = COMPLETED | FAILED
3079 self._update_suboperation_status(
3080 db_nslcmop, op_index, result, result_detail)
3081
3082 if result == "FAILED":
3083 raise LcmException(result_detail)
3084 db_nsr_update["config-status"] = old_config_status
3085 scale_process = None
3086 # POST-SCALE END
3087
3088 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3089 db_nslcmop_update["statusEnteredTime"] = time()
3090 db_nslcmop_update["detailed-status"] = "done"
3091 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3092 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3093 else old_operational_status
3094 db_nsr_update["config-status"] = old_config_status
3095 return
3096 except (ROclient.ROClientException, DbException, LcmException) as e:
3097 self.logger.error(logging_text + "Exit Exception {}".format(e))
3098 exc = e
3099 except asyncio.CancelledError:
3100 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3101 exc = "Operation was cancelled"
3102 except Exception as e:
3103 exc = traceback.format_exc()
3104 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3105 finally:
3106 if exc:
3107 if db_nslcmop:
3108 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3109 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3110 db_nslcmop_update["statusEnteredTime"] = time()
3111 if db_nsr:
3112 db_nsr_update["operational-status"] = old_operational_status
3113 db_nsr_update["config-status"] = old_config_status
3114 db_nsr_update["detailed-status"] = ""
3115 db_nsr_update["_admin.nslcmop"] = None
3116 if scale_process:
3117 if "VCA" in scale_process:
3118 db_nsr_update["config-status"] = "failed"
3119 if "RO" in scale_process:
3120 db_nsr_update["operational-status"] = "failed"
3121 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3122 exc)
3123 try:
3124 if db_nslcmop and db_nslcmop_update:
3125 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3126 if db_nsr:
3127 db_nsr_update["_admin.current-operation"] = None
3128 db_nsr_update["_admin.operation-type"] = None
3129 db_nsr_update["_admin.nslcmop"] = None
3130 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3131
3132 self._write_ns_status(
3133 nsr_id=nsr_id,
3134 ns_state=None,
3135 current_operation="IDLE",
3136 current_operation_id=None
3137 )
3138
3139 except DbException as e:
3140 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3141 if nslcmop_operation_state:
3142 try:
3143 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3144 "operationState": nslcmop_operation_state},
3145 loop=self.loop)
3146 # if cooldown_time:
3147 # await asyncio.sleep(cooldown_time, loop=self.loop)
3148 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3149 except Exception as e:
3150 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3151 self.logger.debug(logging_text + "Exit")
3152 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")