Quick deployment of charms: added apt_mirror and enable_os_upgrade model config options
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107 if 'enableosupgrade' in self.vca_config:
108 if self.vca_config['enableosupgrade'].lower() == 'false':
109 self.vca_config['enable_os_upgrade'] = False
110 elif self.vca_config['enableosupgrade'].lower() == 'true':
111 self.vca_config['enable_os_upgrade'] = True
112 if 'aptmirror' in self.vca_config:
113 self.vca_config['apt_mirror'] = self.vca_config['aptmirror']
114
115 # create N2VC connector
116 self.n2vc = N2VCJujuConnector(
117 db=self.db,
118 fs=self.fs,
119 log=self.logger,
120 loop=self.loop,
121 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
122 username=self.vca_config.get('user', None),
123 vca_config=self.vca_config,
124 on_update_db=self._on_update_n2vc_db,
125 # ca_cert=self.vca_config.get('cacert'),
126 # api_proxy=self.vca_config.get('apiproxy'),
127 )
128
129 self.k8sclusterhelm = K8sHelmConnector(
130 kubectl_command=self.vca_config.get("kubectlpath"),
131 helm_command=self.vca_config.get("helmpath"),
132 fs=self.fs,
133 log=self.logger,
134 db=self.db,
135 on_update_db=None,
136 )
137
138 self.k8sclusterjuju = K8sJujuConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 juju_command=self.vca_config.get("jujupath"),
141 fs=self.fs,
142 log=self.logger,
143 db=self.db,
144 on_update_db=None,
145 )
146
147 # create RO client
148 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
149
150 def _on_update_n2vc_db(self, table, filter, path, updated_data):
151
152 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
153 .format(table, filter, path, updated_data))
154
155 return
156 # write NS status to database
157 # try:
158 # # nsrs_id = filter.get('_id')
159 # # print(nsrs_id)
160 # # get ns record
161 # nsr = self.db.get_one(table=table, q_filter=filter)
162 # # get VCA deployed list
163 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
164 # # get RO deployed
165 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
166 # for vca in vca_list:
167 # # status = vca.get('status')
168 # # print(status)
169 # # detailed_status = vca.get('detailed-status')
170 # # print(detailed_status)
171 # # for ro in ro_list:
172 # # print(ro)
173 #
174 # except Exception as e:
175 # self.logger.error('Error writing NS status to db: {}'.format(e))
176
177 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
178 """
179 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
180 :param vnfd: input vnfd
181 :param new_id: overrides vnf id if provided
182 :param additionalParams: Instantiation params for VNFs provided
183 :param nsrId: Id of the NSR
184 :return: copy of vnfd
185 """
186 try:
187 vnfd_RO = deepcopy(vnfd)
188 # remove unused by RO configuration, monitoring, scaling and internal keys
189 vnfd_RO.pop("_id", None)
190 vnfd_RO.pop("_admin", None)
191 vnfd_RO.pop("vnf-configuration", None)
192 vnfd_RO.pop("monitoring-param", None)
193 vnfd_RO.pop("scaling-group-descriptor", None)
194 vnfd_RO.pop("kdu", None)
195 vnfd_RO.pop("k8s-cluster", None)
196 if new_id:
197 vnfd_RO["id"] = new_id
198
199 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
200 for vdu in get_iterable(vnfd_RO, "vdu"):
201 cloud_init_file = None
202 if vdu.get("cloud-init-file"):
203 base_folder = vnfd["_admin"]["storage"]
204 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
205 vdu["cloud-init-file"])
206 with self.fs.file_open(cloud_init_file, "r") as ci_file:
207 cloud_init_content = ci_file.read()
208 vdu.pop("cloud-init-file", None)
209 elif vdu.get("cloud-init"):
210 cloud_init_content = vdu["cloud-init"]
211 else:
212 continue
213
214 env = Environment()
215 ast = env.parse(cloud_init_content)
216 mandatory_vars = meta.find_undeclared_variables(ast)
217 if mandatory_vars:
218 for var in mandatory_vars:
219 if not additionalParams or var not in additionalParams.keys():
220 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
221 "file, must be provided in the instantiation parameters inside the "
222 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
223 template = Template(cloud_init_content)
224 cloud_init_content = template.render(additionalParams or {})
225 vdu["cloud-init"] = cloud_init_content
226
227 return vnfd_RO
228 except FsException as e:
229 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
230 format(vnfd["id"], vdu["id"], cloud_init_file, e))
231 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
232 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
233 format(vnfd["id"], vdu["id"], e))
234
235 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
236 """
237 Creates a RO ns descriptor from OSM ns_instantiate params
238 :param ns_params: OSM instantiate params
239 :return: The RO ns descriptor
240 """
241 vim_2_RO = {}
242 wim_2_RO = {}
243 # TODO feature 1417: Check that no instantiation is set over PDU
244 # check if PDU forces a concrete vim-network-id and add it
245 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
246
247 def vim_account_2_RO(vim_account):
248 if vim_account in vim_2_RO:
249 return vim_2_RO[vim_account]
250
251 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
252 if db_vim["_admin"]["operationalState"] != "ENABLED":
253 raise LcmException("VIM={} is not available. operationalState={}".format(
254 vim_account, db_vim["_admin"]["operationalState"]))
255 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
256 vim_2_RO[vim_account] = RO_vim_id
257 return RO_vim_id
258
259 def wim_account_2_RO(wim_account):
260 if isinstance(wim_account, str):
261 if wim_account in wim_2_RO:
262 return wim_2_RO[wim_account]
263
264 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
265 if db_wim["_admin"]["operationalState"] != "ENABLED":
266 raise LcmException("WIM={} is not available. operationalState={}".format(
267 wim_account, db_wim["_admin"]["operationalState"]))
268 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
269 wim_2_RO[wim_account] = RO_wim_id
270 return RO_wim_id
271 else:
272 return wim_account
273
274 def ip_profile_2_RO(ip_profile):
275 RO_ip_profile = deepcopy((ip_profile))
276 if "dns-server" in RO_ip_profile:
277 if isinstance(RO_ip_profile["dns-server"], list):
278 RO_ip_profile["dns-address"] = []
279 for ds in RO_ip_profile.pop("dns-server"):
280 RO_ip_profile["dns-address"].append(ds['address'])
281 else:
282 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
283 if RO_ip_profile.get("ip-version") == "ipv4":
284 RO_ip_profile["ip-version"] = "IPv4"
285 if RO_ip_profile.get("ip-version") == "ipv6":
286 RO_ip_profile["ip-version"] = "IPv6"
287 if "dhcp-params" in RO_ip_profile:
288 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
289 return RO_ip_profile
290
291 if not ns_params:
292 return None
293 RO_ns_params = {
294 # "name": ns_params["nsName"],
295 # "description": ns_params.get("nsDescription"),
296 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
297 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
298 # "scenario": ns_params["nsdId"],
299 }
300
301 n2vc_key_list = n2vc_key_list or []
302 for vnfd_ref, vnfd in vnfd_dict.items():
303 vdu_needed_access = []
304 mgmt_cp = None
305 if vnfd.get("vnf-configuration"):
306 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
307 if ssh_required and vnfd.get("mgmt-interface"):
308 if vnfd["mgmt-interface"].get("vdu-id"):
309 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
310 elif vnfd["mgmt-interface"].get("cp"):
311 mgmt_cp = vnfd["mgmt-interface"]["cp"]
312
313 for vdu in vnfd.get("vdu", ()):
314 if vdu.get("vdu-configuration"):
315 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
316 if ssh_required:
317 vdu_needed_access.append(vdu["id"])
318 elif mgmt_cp:
319 for vdu_interface in vdu.get("interface"):
320 if vdu_interface.get("external-connection-point-ref") and \
321 vdu_interface["external-connection-point-ref"] == mgmt_cp:
322 vdu_needed_access.append(vdu["id"])
323 mgmt_cp = None
324 break
325
326 if vdu_needed_access:
327 for vnf_member in nsd.get("constituent-vnfd"):
328 if vnf_member["vnfd-id-ref"] != vnfd_ref:
329 continue
330 for vdu in vdu_needed_access:
331 populate_dict(RO_ns_params,
332 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
333 n2vc_key_list)
334
335 if ns_params.get("vduImage"):
336 RO_ns_params["vduImage"] = ns_params["vduImage"]
337
338 if ns_params.get("ssh_keys"):
339 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
340 for vnf_params in get_iterable(ns_params, "vnf"):
341 for constituent_vnfd in nsd["constituent-vnfd"]:
342 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
343 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
344 break
345 else:
346 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
347 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
348 if vnf_params.get("vimAccountId"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
350 vim_account_2_RO(vnf_params["vimAccountId"]))
351
352 for vdu_params in get_iterable(vnf_params, "vdu"):
353 # TODO feature 1417: check that this VDU exist and it is not a PDU
354 if vdu_params.get("volume"):
355 for volume_params in vdu_params["volume"]:
356 if volume_params.get("vim-volume-id"):
357 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
358 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
359 volume_params["vim-volume-id"])
360 if vdu_params.get("interface"):
361 for interface_params in vdu_params["interface"]:
362 if interface_params.get("ip-address"):
363 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
364 vdu_params["id"], "interfaces", interface_params["name"],
365 "ip_address"),
366 interface_params["ip-address"])
367 if interface_params.get("mac-address"):
368 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
369 vdu_params["id"], "interfaces", interface_params["name"],
370 "mac_address"),
371 interface_params["mac-address"])
372 if interface_params.get("floating-ip-required"):
373 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
374 vdu_params["id"], "interfaces", interface_params["name"],
375 "floating-ip"),
376 interface_params["floating-ip-required"])
377
378 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
379 if internal_vld_params.get("vim-network-name"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "vim-network-name"),
382 internal_vld_params["vim-network-name"])
383 if internal_vld_params.get("vim-network-id"):
384 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
385 internal_vld_params["name"], "vim-network-id"),
386 internal_vld_params["vim-network-id"])
387 if internal_vld_params.get("ip-profile"):
388 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
389 internal_vld_params["name"], "ip-profile"),
390 ip_profile_2_RO(internal_vld_params["ip-profile"]))
391 if internal_vld_params.get("provider-network"):
392
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
394 internal_vld_params["name"], "provider-network"),
395 internal_vld_params["provider-network"].copy())
396
397 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
398 # look for interface
399 iface_found = False
400 for vdu_descriptor in vnf_descriptor["vdu"]:
401 for vdu_interface in vdu_descriptor["interface"]:
402 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
403 if icp_params.get("ip-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_descriptor["id"], "interfaces",
406 vdu_interface["name"], "ip_address"),
407 icp_params["ip-address"])
408
409 if icp_params.get("mac-address"):
410 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
411 vdu_descriptor["id"], "interfaces",
412 vdu_interface["name"], "mac_address"),
413 icp_params["mac-address"])
414 iface_found = True
415 break
416 if iface_found:
417 break
418 else:
419 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
420 "internal-vld:id-ref={} is not present at vnfd:internal-"
421 "connection-point".format(vnf_params["member-vnf-index"],
422 icp_params["id-ref"]))
423
424 for vld_params in get_iterable(ns_params, "vld"):
425 if "ip-profile" in vld_params:
426 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
427 ip_profile_2_RO(vld_params["ip-profile"]))
428
429 if vld_params.get("provider-network"):
430
431 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
432 vld_params["provider-network"].copy())
433
434 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
435 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
436 wim_account_2_RO(vld_params["wimAccountId"])),
437 if vld_params.get("vim-network-name"):
438 RO_vld_sites = []
439 if isinstance(vld_params["vim-network-name"], dict):
440 for vim_account, vim_net in vld_params["vim-network-name"].items():
441 RO_vld_sites.append({
442 "netmap-use": vim_net,
443 "datacenter": vim_account_2_RO(vim_account)
444 })
445 else: # isinstance str
446 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
447 if RO_vld_sites:
448 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
449
450 if vld_params.get("vim-network-id"):
451 RO_vld_sites = []
452 if isinstance(vld_params["vim-network-id"], dict):
453 for vim_account, vim_net in vld_params["vim-network-id"].items():
454 RO_vld_sites.append({
455 "netmap-use": vim_net,
456 "datacenter": vim_account_2_RO(vim_account)
457 })
458 else: # isinstance str
459 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
460 if RO_vld_sites:
461 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
462 if vld_params.get("ns-net"):
463 if isinstance(vld_params["ns-net"], dict):
464 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
465 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
466 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
467 if "vnfd-connection-point-ref" in vld_params:
468 for cp_params in vld_params["vnfd-connection-point-ref"]:
469 # look for interface
470 for constituent_vnfd in nsd["constituent-vnfd"]:
471 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
472 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
473 break
474 else:
475 raise LcmException(
476 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
477 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
478 match_cp = False
479 for vdu_descriptor in vnf_descriptor["vdu"]:
480 for interface_descriptor in vdu_descriptor["interface"]:
481 if interface_descriptor.get("external-connection-point-ref") == \
482 cp_params["vnfd-connection-point-ref"]:
483 match_cp = True
484 break
485 if match_cp:
486 break
487 else:
488 raise LcmException(
489 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
490 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
491 cp_params["member-vnf-index-ref"],
492 cp_params["vnfd-connection-point-ref"],
493 vnf_descriptor["id"]))
494 if cp_params.get("ip-address"):
495 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
496 vdu_descriptor["id"], "interfaces",
497 interface_descriptor["name"], "ip_address"),
498 cp_params["ip-address"])
499 if cp_params.get("mac-address"):
500 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
501 vdu_descriptor["id"], "interfaces",
502 interface_descriptor["name"], "mac_address"),
503 cp_params["mac-address"])
504 return RO_ns_params
505
506 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
507 # make a copy to do not change
508 vdu_create = copy(vdu_create)
509 vdu_delete = copy(vdu_delete)
510
511 vdurs = db_vnfr.get("vdur")
512 if vdurs is None:
513 vdurs = []
514 vdu_index = len(vdurs)
515 while vdu_index:
516 vdu_index -= 1
517 vdur = vdurs[vdu_index]
518 if vdur.get("pdu-type"):
519 continue
520 vdu_id_ref = vdur["vdu-id-ref"]
521 if vdu_create and vdu_create.get(vdu_id_ref):
522 for index in range(0, vdu_create[vdu_id_ref]):
523 vdur = deepcopy(vdur)
524 vdur["_id"] = str(uuid4())
525 vdur["count-index"] += 1
526 vdurs.insert(vdu_index+1+index, vdur)
527 del vdu_create[vdu_id_ref]
528 if vdu_delete and vdu_delete.get(vdu_id_ref):
529 del vdurs[vdu_index]
530 vdu_delete[vdu_id_ref] -= 1
531 if not vdu_delete[vdu_id_ref]:
532 del vdu_delete[vdu_id_ref]
533 # check all operations are done
534 if vdu_create or vdu_delete:
535 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
536 vdu_create))
537 if vdu_delete:
538 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
539 vdu_delete))
540
541 vnfr_update = {"vdur": vdurs}
542 db_vnfr["vdur"] = vdurs
543 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
544
545 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
546 """
547 Updates database nsr with the RO info for the created vld
548 :param ns_update_nsr: dictionary to be filled with the updated info
549 :param db_nsr: content of db_nsr. This is also modified
550 :param nsr_desc_RO: nsr descriptor from RO
551 :return: Nothing, LcmException is raised on errors
552 """
553
554 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
555 for net_RO in get_iterable(nsr_desc_RO, "nets"):
556 if vld["id"] != net_RO.get("ns_net_osm_id"):
557 continue
558 vld["vim-id"] = net_RO.get("vim_net_id")
559 vld["name"] = net_RO.get("vim_name")
560 vld["status"] = net_RO.get("status")
561 vld["status-detailed"] = net_RO.get("error_msg")
562 ns_update_nsr["vld.{}".format(vld_index)] = vld
563 break
564 else:
565 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
566
567 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
568 """
569 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
570 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
571 :param nsr_desc_RO: nsr descriptor from RO
572 :return: Nothing, LcmException is raised on errors
573 """
574 for vnf_index, db_vnfr in db_vnfrs.items():
575 for vnf_RO in nsr_desc_RO["vnfs"]:
576 if vnf_RO["member_vnf_index"] != vnf_index:
577 continue
578 vnfr_update = {}
579 if vnf_RO.get("ip_address"):
580 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
581 elif not db_vnfr.get("ip-address"):
582 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
583
584 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
585 vdur_RO_count_index = 0
586 if vdur.get("pdu-type"):
587 continue
588 for vdur_RO in get_iterable(vnf_RO, "vms"):
589 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
590 continue
591 if vdur["count-index"] != vdur_RO_count_index:
592 vdur_RO_count_index += 1
593 continue
594 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
595 if vdur_RO.get("ip_address"):
596 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
597 else:
598 vdur["ip-address"] = None
599 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
600 vdur["name"] = vdur_RO.get("vim_name")
601 vdur["status"] = vdur_RO.get("status")
602 vdur["status-detailed"] = vdur_RO.get("error_msg")
603 for ifacer in get_iterable(vdur, "interfaces"):
604 for interface_RO in get_iterable(vdur_RO, "interfaces"):
605 if ifacer["name"] == interface_RO.get("internal_name"):
606 ifacer["ip-address"] = interface_RO.get("ip_address")
607 ifacer["mac-address"] = interface_RO.get("mac_address")
608 break
609 else:
610 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
611 "from VIM info"
612 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
613 vnfr_update["vdur.{}".format(vdu_index)] = vdur
614 break
615 else:
616 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
617 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
618
619 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
620 for net_RO in get_iterable(nsr_desc_RO, "nets"):
621 if vld["id"] != net_RO.get("vnf_net_osm_id"):
622 continue
623 vld["vim-id"] = net_RO.get("vim_net_id")
624 vld["name"] = net_RO.get("vim_name")
625 vld["status"] = net_RO.get("status")
626 vld["status-detailed"] = net_RO.get("error_msg")
627 vnfr_update["vld.{}".format(vld_index)] = vld
628 break
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
631 vnf_index, vld["id"]))
632
633 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
634 break
635
636 else:
637 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
638
639 def _get_ns_config_info(self, nsr_id):
640 """
641 Generates a mapping between vnf,vdu elements and the N2VC id
642 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
643 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
644 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
645 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
646 """
647 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
648 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
649 mapping = {}
650 ns_config_info = {"osm-config-mapping": mapping}
651 for vca in vca_deployed_list:
652 if not vca["member-vnf-index"]:
653 continue
654 if not vca["vdu_id"]:
655 mapping[vca["member-vnf-index"]] = vca["application"]
656 else:
657 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
658 vca["application"]
659 return ns_config_info
660
661 @staticmethod
662 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
663 """
664 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
665 primitives as verify-ssh-credentials, or config when needed
666 :param desc_primitive_list: information of the descriptor
667 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
668 this element contains a ssh public key
669 :return: The modified list. Can ba an empty list, but always a list
670 """
671 if desc_primitive_list:
672 primitive_list = desc_primitive_list.copy()
673 else:
674 primitive_list = []
675 # look for primitive config, and get the position. None if not present
676 config_position = None
677 for index, primitive in enumerate(primitive_list):
678 if primitive["name"] == "config":
679 config_position = index
680 break
681
682 # for NS, add always a config primitive if not present (bug 874)
683 if not vca_deployed["member-vnf-index"] and config_position is None:
684 primitive_list.insert(0, {"name": "config", "parameter": []})
685 config_position = 0
686 # for VNF/VDU add verify-ssh-credentials after config
687 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
688 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
689 return primitive_list
690
691 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
692 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
693
694 db_nsr_update = {}
695 RO_descriptor_number = 0 # number of descriptors created at RO
696 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
697 start_deploy = time()
698 vdu_flag = False # If any of the VNFDs has VDUs
699 ns_params = db_nslcmop.get("operationParams")
700
701 # deploy RO
702
703 # get vnfds, instantiate at RO
704
705 for c_vnf in nsd.get("constituent-vnfd", ()):
706 member_vnf_index = c_vnf["member-vnf-index"]
707 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
708 if vnfd.get("vdu"):
709 vdu_flag = True
710 vnfd_ref = vnfd["id"]
711 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
712 " RO".format(vnfd_ref, member_vnf_index)
713 # self.logger.debug(logging_text + step)
714 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
715 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
716 RO_descriptor_number += 1
717
718 # look position at deployed.RO.vnfd if not present it will be appended at the end
719 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
720 if vnf_deployed["member-vnf-index"] == member_vnf_index:
721 break
722 else:
723 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
724 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
725
726 # look if present
727 RO_update = {"member-vnf-index": member_vnf_index}
728 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
729 if vnfd_list:
730 RO_update["id"] = vnfd_list[0]["uuid"]
731 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
732 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
733 else:
734 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
735 get("additionalParamsForVnf"), nsr_id)
736 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
737 RO_update["id"] = desc["uuid"]
738 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
739 vnfd_ref, member_vnf_index, desc["uuid"]))
740 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
741 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
742 self.update_db_2("nsrs", nsr_id, db_nsr_update)
743 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
744
745 # create nsd at RO
746 nsd_ref = nsd["id"]
747 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
748 # self.logger.debug(logging_text + step)
749
750 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
751 RO_descriptor_number += 1
752 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
753 if nsd_list:
754 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
755 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
756 nsd_ref, RO_nsd_uuid))
757 else:
758 nsd_RO = deepcopy(nsd)
759 nsd_RO["id"] = RO_osm_nsd_id
760 nsd_RO.pop("_id", None)
761 nsd_RO.pop("_admin", None)
762 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
763 member_vnf_index = c_vnf["member-vnf-index"]
764 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
765 for c_vld in nsd_RO.get("vld", ()):
766 for cp in c_vld.get("vnfd-connection-point-ref", ()):
767 member_vnf_index = cp["member-vnf-index-ref"]
768 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
769
770 desc = await self.RO.create("nsd", descriptor=nsd_RO)
771 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
772 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
773 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
774 self.update_db_2("nsrs", nsr_id, db_nsr_update)
775 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
776
777 # Crate ns at RO
778 # if present use it unless in error status
779 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
780 if RO_nsr_id:
781 try:
782 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
783 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
784 desc = await self.RO.show("ns", RO_nsr_id)
785 except ROclient.ROClientException as e:
786 if e.http_code != HTTPStatus.NOT_FOUND:
787 raise
788 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
789 if RO_nsr_id:
790 ns_status, ns_status_info = self.RO.check_ns_status(desc)
791 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
792 if ns_status == "ERROR":
793 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
794 .format(RO_nsr_id)
795 self.logger.debug(logging_text + step)
796 await self.RO.delete("ns", RO_nsr_id)
797 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
798 if not RO_nsr_id:
799 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
800 # self.logger.debug(logging_text + step)
801
802 # check if VIM is creating and wait look if previous tasks in process
803 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
804 if task_dependency:
805 step = "Waiting for related tasks to be completed: {}".format(task_name)
806 self.logger.debug(logging_text + step)
807 await asyncio.wait(task_dependency, timeout=3600)
808 if ns_params.get("vnf"):
809 for vnf in ns_params["vnf"]:
810 if "vimAccountId" in vnf:
811 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
812 vnf["vimAccountId"])
813 if task_dependency:
814 step = "Waiting for related tasks to be completed: {}".format(task_name)
815 self.logger.debug(logging_text + step)
816 await asyncio.wait(task_dependency, timeout=3600)
817
818 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
819
820 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
821
822 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
823 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
824 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
825 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
826 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
827 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
828 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
829 self.update_db_2("nsrs", nsr_id, db_nsr_update)
830 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
831
832 # wait until NS is ready
833 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
834 detailed_status_old = None
835 self.logger.debug(logging_text + step)
836
837 while time() <= start_deploy + self.total_deploy_timeout:
838 desc = await self.RO.show("ns", RO_nsr_id)
839 ns_status, ns_status_info = self.RO.check_ns_status(desc)
840 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
841 if ns_status == "ERROR":
842 raise ROclient.ROClientException(ns_status_info)
843 elif ns_status == "BUILD":
844 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
845 elif ns_status == "ACTIVE":
846 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
847 try:
848 if vdu_flag:
849 self.ns_update_vnfr(db_vnfrs, desc)
850 break
851 except LcmExceptionNoMgmtIP:
852 pass
853 else:
854 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
855 if detailed_status != detailed_status_old:
856 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
857 self.update_db_2("nsrs", nsr_id, db_nsr_update)
858 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
859 await asyncio.sleep(5, loop=self.loop)
860 else: # total_deploy_timeout
861 raise ROclient.ROClientException("Timeout waiting ns to be ready")
862
863 step = "Updating NSR"
864 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
865
866 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
867 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
868 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
870 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
871
872 step = "Deployed at VIM"
873 self.logger.debug(logging_text + step)
874
875 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
876 """
877 Wait for ip addres at RO, and optionally, insert public key in virtual machine
878 :param logging_text: prefix use for logging
879 :param nsr_id:
880 :param vnfr_id:
881 :param vdu_id:
882 :param vdu_index:
883 :param pub_key: public ssh key to inject, None to skip
884 :param user: user to apply the public ssh key
885 :return: IP address
886 """
887
888 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
889 ro_nsr_id = None
890 ip_address = None
891 nb_tries = 0
892 target_vdu_id = None
893 ro_retries = 0
894
895 while True:
896
897 ro_retries += 1
898 if ro_retries >= 360: # 1 hour
899 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
900
901 await asyncio.sleep(10, loop=self.loop)
902 # wait until NS is deployed at RO
903 if not ro_nsr_id:
904 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
905 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
906 if not ro_nsr_id:
907 continue
908
909 # get ip address
910 if not target_vdu_id:
911 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
912
913 if not vdu_id: # for the VNF case
914 ip_address = db_vnfr.get("ip-address")
915 if not ip_address:
916 continue
917 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
918 else: # VDU case
919 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
920 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
921
922 if not vdur:
923 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
924 vnfr_id, vdu_id, vdu_index
925 ))
926
927 if vdur.get("status") == "ACTIVE":
928 ip_address = vdur.get("ip-address")
929 if not ip_address:
930 continue
931 target_vdu_id = vdur["vdu-id-ref"]
932 elif vdur.get("status") == "ERROR":
933 raise LcmException("Cannot inject ssh-key because target VM is in error state")
934
935 if not target_vdu_id:
936 continue
937
938 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
939
940 # inject public key into machine
941 if pub_key and user:
942 # self.logger.debug(logging_text + "Inserting RO key")
943 try:
944 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
945 result_dict = await self.RO.create_action(
946 item="ns",
947 item_id_name=ro_nsr_id,
948 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
949 )
950 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
951 if not result_dict or not isinstance(result_dict, dict):
952 raise LcmException("Unknown response from RO when injecting key")
953 for result in result_dict.values():
954 if result.get("vim_result") == 200:
955 break
956 else:
957 raise ROclient.ROClientException("error injecting key: {}".format(
958 result.get("description")))
959 break
960 except ROclient.ROClientException as e:
961 if not nb_tries:
962 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
963 format(e, 20*10))
964 nb_tries += 1
965 if nb_tries >= 20:
966 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
967 else:
968 break
969
970 return ip_address
971
972 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
973 """
974 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
975 """
976 my_vca = vca_deployed_list[vca_index]
977 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
978 return
979 timeout = 300
980 while timeout >= 0:
981 for index, vca_deployed in enumerate(vca_deployed_list):
982 if index == vca_index:
983 continue
984 if not my_vca.get("member-vnf-index") or \
985 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
986 if not vca_deployed.get("instantiation"):
987 break # wait
988 if vca_deployed["instantiation"] == "FAILED":
989 raise LcmException("Configuration aborted because dependent charm/s has failed")
990 else:
991 return
992 await asyncio.sleep(10)
993 timeout -= 1
994 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
995 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
996
997 raise LcmException("Configuration aborted because dependent charm/s timeout")
998
999 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
1000 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
1001 nsr_id = db_nsr["_id"]
1002 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1003 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1004 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1005 db_dict = {
1006 'collection': 'nsrs',
1007 'filter': {'_id': nsr_id},
1008 'path': db_update_entry
1009 }
1010 step = ""
1011 try:
1012 vnfr_id = None
1013 if db_vnfr:
1014 vnfr_id = db_vnfr["_id"]
1015
1016 namespace = "{nsi}.{ns}".format(
1017 nsi=nsi_id if nsi_id else "",
1018 ns=nsr_id)
1019 if vnfr_id:
1020 namespace += "." + vnfr_id
1021 if vdu_id:
1022 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1023
1024 # Get artifact path
1025 artifact_path = "{}/{}/charms/{}".format(
1026 base_folder["folder"],
1027 base_folder["pkg-dir"],
1028 config_descriptor["juju"]["charm"]
1029 )
1030
1031 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1032 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1033 is_proxy_charm = False
1034
1035 # n2vc_redesign STEP 3.1
1036
1037 # find old ee_id if exists
1038 ee_id = vca_deployed.get("ee_id")
1039
1040 # create or register execution environment in VCA
1041 if is_proxy_charm:
1042 step = "create execution environment"
1043 self.logger.debug(logging_text + step)
1044 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1045 reuse_ee_id=ee_id,
1046 db_dict=db_dict)
1047 else:
1048 step = "Waiting to VM being up and getting IP address"
1049 self.logger.debug(logging_text + step)
1050 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1051 user=None, pub_key=None)
1052 credentials = {"hostname": rw_mgmt_ip}
1053 # get username
1054 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1055 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1056 # merged. Meanwhile let's get username from initial-config-primitive
1057 if not username and config_descriptor.get("initial-config-primitive"):
1058 for config_primitive in config_descriptor["initial-config-primitive"]:
1059 for param in config_primitive.get("parameter", ()):
1060 if param["name"] == "ssh-username":
1061 username = param["value"]
1062 break
1063 if not username:
1064 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1065 "'config-access.ssh-access.default-user'")
1066 credentials["username"] = username
1067 # n2vc_redesign STEP 3.2
1068
1069 step = "register execution environment {}".format(credentials)
1070 self.logger.debug(logging_text + step)
1071 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1072 db_dict=db_dict)
1073
1074 # for compatibility with MON/POL modules, the need model and application name at database
1075 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1076 ee_id_parts = ee_id.split('.')
1077 model_name = ee_id_parts[0]
1078 application_name = ee_id_parts[1]
1079 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1080 db_update_entry + "application": application_name,
1081 db_update_entry + "ee_id": ee_id})
1082
1083 # n2vc_redesign STEP 3.3
1084
1085 step = "Install configuration Software"
1086 # TODO check if already done
1087 self.logger.debug(logging_text + step)
1088 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1089
1090 # if SSH access is required, then get execution environment SSH public
1091 if is_proxy_charm: # if native charm we have waited already to VM be UP
1092 pub_key = None
1093 user = None
1094 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1095 # Needed to inject a ssh key
1096 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1097 step = "Install configuration Software, getting public ssh key"
1098 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1099
1100 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1101 else:
1102 step = "Waiting to VM being up and getting IP address"
1103 self.logger.debug(logging_text + step)
1104
1105 # n2vc_redesign STEP 5.1
1106 # wait for RO (ip-address) Insert pub_key into VM
1107 if vnfr_id:
1108 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1109 user=user, pub_key=pub_key)
1110 else:
1111 rw_mgmt_ip = None # This is for a NS configuration
1112
1113 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1114
1115 # store rw_mgmt_ip in deploy params for later replacement
1116 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1117
1118 # n2vc_redesign STEP 6 Execute initial config primitive
1119 step = 'execute initial config primitive'
1120 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1121
1122 # sort initial config primitives by 'seq'
1123 try:
1124 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1125 except Exception as e:
1126 self.logger.error(logging_text + step + ": " + str(e))
1127
1128 # add config if not present for NS charm
1129 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1130 vca_deployed)
1131 if initial_config_primitive_list:
1132 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1133 for initial_config_primitive in initial_config_primitive_list:
1134 # adding information on the vca_deployed if it is a NS execution environment
1135 if not vca_deployed["member-vnf-index"]:
1136 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1137 # TODO check if already done
1138 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1139
1140 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1141 self.logger.debug(logging_text + step)
1142 await self.n2vc.exec_primitive(
1143 ee_id=ee_id,
1144 primitive_name=initial_config_primitive["name"],
1145 params_dict=primitive_params_,
1146 db_dict=db_dict
1147 )
1148 # TODO register in database that primitive is done
1149
1150 step = "instantiated at VCA"
1151 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "COMPLETED"})
1152 self.logger.debug(logging_text + step)
1153
1154 except Exception as e: # TODO not use Exception but N2VC exception
1155 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1156 raise Exception("{} {}".format(step, e)) from e
1157 # TODO raise N2VC exception with 'step' extra information
1158
1159 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1160 error_description: str = None, error_detail: str = None):
1161 try:
1162 db_dict = dict()
1163 if ns_state:
1164 db_dict["nsState"] = ns_state
1165 db_dict["currentOperation"] = current_operation
1166 db_dict["currentOperationID"] = current_operation_id
1167 db_dict["errorDescription"] = error_description
1168 db_dict["errorDetail"] = error_detail
1169 self.update_db_2("nsrs", nsr_id, db_dict)
1170 except Exception as e:
1171 self.logger.warn('Error writing NS status: {}'.format(e))
1172
1173 async def instantiate(self, nsr_id, nslcmop_id):
1174 """
1175
1176 :param nsr_id: ns instance to deploy
1177 :param nslcmop_id: operation to run
1178 :return:
1179 """
1180
1181 # Try to lock HA task here
1182 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1183 if not task_is_locked_by_me:
1184 self.logger.debug('instantiate() task is not locked by me')
1185 return
1186
1187 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1188 self.logger.debug(logging_text + "Enter")
1189
1190 # get all needed from database
1191
1192 # database nsrs record
1193 db_nsr = None
1194
1195 # database nslcmops record
1196 db_nslcmop = None
1197
1198 # update operation on nsrs
1199 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1200 "_admin.current-operation": nslcmop_id,
1201 "_admin.operation-type": "instantiate"}
1202 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1203
1204 # update operation on nslcmops
1205 db_nslcmop_update = {}
1206
1207 nslcmop_operation_state = None
1208 db_vnfrs = {} # vnf's info indexed by member-index
1209 # n2vc_info = {}
1210 task_instantiation_list = []
1211 task_instantiation_info = {} # from task to info text
1212 exc = None
1213 try:
1214 # wait for any previous tasks in process
1215 step = "Waiting for previous operations to terminate"
1216 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1217
1218 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1219
1220 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1221 self._write_ns_status(
1222 nsr_id=nsr_id,
1223 ns_state="BUILDING",
1224 current_operation="INSTANTIATING",
1225 current_operation_id=nslcmop_id
1226 )
1227
1228 # read from db: operation
1229 step = "Getting nslcmop={} from db".format(nslcmop_id)
1230 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1231
1232 # read from db: ns
1233 step = "Getting nsr={} from db".format(nsr_id)
1234 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1235 # nsd is replicated into ns (no db read)
1236 nsd = db_nsr["nsd"]
1237 # nsr_name = db_nsr["name"] # TODO short-name??
1238
1239 # read from db: vnf's of this ns
1240 step = "Getting vnfrs from db"
1241 self.logger.debug(logging_text + step)
1242 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1243
1244 # read from db: vnfd's for every vnf
1245 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1246 db_vnfds = {} # every vnfd data indexed by vnf id
1247 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1248
1249 # for each vnf in ns, read vnfd
1250 for vnfr in db_vnfrs_list:
1251 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1252 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1253 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1254 # if we haven't this vnfd, read it from db
1255 if vnfd_id not in db_vnfds:
1256 # read from cb
1257 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1258 self.logger.debug(logging_text + step)
1259 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1260
1261 # store vnfd
1262 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1263 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1264 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1265
1266 # Get or generates the _admin.deployed.VCA list
1267 vca_deployed_list = None
1268 if db_nsr["_admin"].get("deployed"):
1269 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1270 if vca_deployed_list is None:
1271 vca_deployed_list = []
1272 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1273 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1274 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1275 elif isinstance(vca_deployed_list, dict):
1276 # maintain backward compatibility. Change a dict to list at database
1277 vca_deployed_list = list(vca_deployed_list.values())
1278 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1279 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1280
1281 db_nsr_update["detailed-status"] = "creating"
1282 db_nsr_update["operational-status"] = "init"
1283
1284 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1285 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1286 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1287
1288 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1289 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1290 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1291 self.logger.debug(logging_text + "Before deploy_kdus")
1292 # Call to deploy_kdus in case exists the "vdu:kdu" param
1293 task_kdu = asyncio.ensure_future(
1294 self.deploy_kdus(
1295 logging_text=logging_text,
1296 nsr_id=nsr_id,
1297 db_nsr=db_nsr,
1298 db_vnfrs=db_vnfrs,
1299 )
1300 )
1301 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1302 task_instantiation_info[task_kdu] = "Deploy KDUs"
1303 task_instantiation_list.append(task_kdu)
1304 # n2vc_redesign STEP 1 Get VCA public ssh-key
1305 # feature 1429. Add n2vc public key to needed VMs
1306 n2vc_key = self.n2vc.get_public_key()
1307 n2vc_key_list = [n2vc_key]
1308 if self.vca_config.get("public_key"):
1309 n2vc_key_list.append(self.vca_config["public_key"])
1310
1311 # n2vc_redesign STEP 2 Deploy Network Scenario
1312 task_ro = asyncio.ensure_future(
1313 self.instantiate_RO(
1314 logging_text=logging_text,
1315 nsr_id=nsr_id,
1316 nsd=nsd,
1317 db_nsr=db_nsr,
1318 db_nslcmop=db_nslcmop,
1319 db_vnfrs=db_vnfrs,
1320 db_vnfds_ref=db_vnfds_ref,
1321 n2vc_key_list=n2vc_key_list
1322 )
1323 )
1324 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1325 task_instantiation_info[task_ro] = "Deploy at VIM"
1326 task_instantiation_list.append(task_ro)
1327
1328 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1329 step = "Looking for needed vnfd to configure with proxy charm"
1330 self.logger.debug(logging_text + step)
1331
1332 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1333 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1334 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1335 vnfd_id = c_vnf["vnfd-id-ref"]
1336 vnfd = db_vnfds_ref[vnfd_id]
1337 member_vnf_index = str(c_vnf["member-vnf-index"])
1338 db_vnfr = db_vnfrs[member_vnf_index]
1339 base_folder = vnfd["_admin"]["storage"]
1340 vdu_id = None
1341 vdu_index = 0
1342 vdu_name = None
1343 kdu_name = None
1344
1345 # Get additional parameters
1346 deploy_params = {}
1347 if db_vnfr.get("additionalParamsForVnf"):
1348 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1349
1350 descriptor_config = vnfd.get("vnf-configuration")
1351 if descriptor_config and descriptor_config.get("juju"):
1352 self._deploy_n2vc(
1353 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1354 db_nsr=db_nsr,
1355 db_vnfr=db_vnfr,
1356 nslcmop_id=nslcmop_id,
1357 nsr_id=nsr_id,
1358 nsi_id=nsi_id,
1359 vnfd_id=vnfd_id,
1360 vdu_id=vdu_id,
1361 kdu_name=kdu_name,
1362 member_vnf_index=member_vnf_index,
1363 vdu_index=vdu_index,
1364 vdu_name=vdu_name,
1365 deploy_params=deploy_params,
1366 descriptor_config=descriptor_config,
1367 base_folder=base_folder,
1368 task_instantiation_list=task_instantiation_list,
1369 task_instantiation_info=task_instantiation_info
1370 )
1371
1372 # Deploy charms for each VDU that supports one.
1373 for vdud in get_iterable(vnfd, 'vdu'):
1374 vdu_id = vdud["id"]
1375 descriptor_config = vdud.get('vdu-configuration')
1376 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1377 if vdur.get("additionalParams"):
1378 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1379 else:
1380 deploy_params_vdu = deploy_params
1381 if descriptor_config and descriptor_config.get("juju"):
1382 # look for vdu index in the db_vnfr["vdu"] section
1383 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1384 # if vdur["vdu-id-ref"] == vdu_id:
1385 # break
1386 # else:
1387 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1388 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1389 # vdu_name = vdur.get("name")
1390 vdu_name = None
1391 kdu_name = None
1392 for vdu_index in range(int(vdud.get("count", 1))):
1393 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1394 self._deploy_n2vc(
1395 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1396 member_vnf_index, vdu_id, vdu_index),
1397 db_nsr=db_nsr,
1398 db_vnfr=db_vnfr,
1399 nslcmop_id=nslcmop_id,
1400 nsr_id=nsr_id,
1401 nsi_id=nsi_id,
1402 vnfd_id=vnfd_id,
1403 vdu_id=vdu_id,
1404 kdu_name=kdu_name,
1405 member_vnf_index=member_vnf_index,
1406 vdu_index=vdu_index,
1407 vdu_name=vdu_name,
1408 deploy_params=deploy_params_vdu,
1409 descriptor_config=descriptor_config,
1410 base_folder=base_folder,
1411 task_instantiation_list=task_instantiation_list,
1412 task_instantiation_info=task_instantiation_info
1413 )
1414 for kdud in get_iterable(vnfd, 'kdu'):
1415 kdu_name = kdud["name"]
1416 descriptor_config = kdud.get('kdu-configuration')
1417 if descriptor_config and descriptor_config.get("juju"):
1418 vdu_id = None
1419 vdu_index = 0
1420 vdu_name = None
1421 # look for vdu index in the db_vnfr["vdu"] section
1422 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1423 # if vdur["vdu-id-ref"] == vdu_id:
1424 # break
1425 # else:
1426 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1427 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1428 # vdu_name = vdur.get("name")
1429 # vdu_name = None
1430
1431 self._deploy_n2vc(
1432 logging_text=logging_text,
1433 db_nsr=db_nsr,
1434 db_vnfr=db_vnfr,
1435 nslcmop_id=nslcmop_id,
1436 nsr_id=nsr_id,
1437 nsi_id=nsi_id,
1438 vnfd_id=vnfd_id,
1439 vdu_id=vdu_id,
1440 kdu_name=kdu_name,
1441 member_vnf_index=member_vnf_index,
1442 vdu_index=vdu_index,
1443 vdu_name=vdu_name,
1444 deploy_params=deploy_params,
1445 descriptor_config=descriptor_config,
1446 base_folder=base_folder,
1447 task_instantiation_list=task_instantiation_list,
1448 task_instantiation_info=task_instantiation_info
1449 )
1450
1451 # Check if this NS has a charm configuration
1452 descriptor_config = nsd.get("ns-configuration")
1453 if descriptor_config and descriptor_config.get("juju"):
1454 vnfd_id = None
1455 db_vnfr = None
1456 member_vnf_index = None
1457 vdu_id = None
1458 kdu_name = None
1459 vdu_index = 0
1460 vdu_name = None
1461
1462 # Get additional parameters
1463 deploy_params = {}
1464 if db_nsr.get("additionalParamsForNs"):
1465 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1466 base_folder = nsd["_admin"]["storage"]
1467 self._deploy_n2vc(
1468 logging_text=logging_text,
1469 db_nsr=db_nsr,
1470 db_vnfr=db_vnfr,
1471 nslcmop_id=nslcmop_id,
1472 nsr_id=nsr_id,
1473 nsi_id=nsi_id,
1474 vnfd_id=vnfd_id,
1475 vdu_id=vdu_id,
1476 kdu_name=kdu_name,
1477 member_vnf_index=member_vnf_index,
1478 vdu_index=vdu_index,
1479 vdu_name=vdu_name,
1480 deploy_params=deploy_params,
1481 descriptor_config=descriptor_config,
1482 base_folder=base_folder,
1483 task_instantiation_list=task_instantiation_list,
1484 task_instantiation_info=task_instantiation_info
1485 )
1486
1487 # Wait until all tasks of "task_instantiation_list" have been finished
1488
1489 # while time() <= start_deploy + self.total_deploy_timeout:
1490 error_text_list = []
1491 timeout = 3600
1492
1493 # let's begin with all OK
1494 instantiated_ok = True
1495 # let's begin with RO 'running' status (later we can change it)
1496 db_nsr_update["operational-status"] = "running"
1497 # let's begin with VCA 'configured' status (later we can change it)
1498 db_nsr_update["config-status"] = "configured"
1499
1500 if task_instantiation_list:
1501 # wait for all tasks completion
1502 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1503
1504 for task in pending:
1505 instantiated_ok = False
1506 if task == task_ro:
1507 db_nsr_update["operational-status"] = "failed"
1508 else:
1509 db_nsr_update["config-status"] = "failed"
1510 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1511 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1512 for task in done:
1513 if task.cancelled():
1514 instantiated_ok = False
1515 if task == task_ro:
1516 db_nsr_update["operational-status"] = "failed"
1517 else:
1518 db_nsr_update["config-status"] = "failed"
1519 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1520 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1521 else:
1522 exc = task.exception()
1523 if exc:
1524 instantiated_ok = False
1525 if task == task_ro:
1526 db_nsr_update["operational-status"] = "failed"
1527 else:
1528 db_nsr_update["config-status"] = "failed"
1529 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1530 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1531 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1532 else:
1533 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1534 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1535 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1536 else:
1537 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1538
1539 if error_text_list:
1540 error_text = "\n".join(error_text_list)
1541 db_nsr_update["detailed-status"] = error_text
1542 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1543 db_nslcmop_update["detailed-status"] = error_text
1544 db_nslcmop_update["statusEnteredTime"] = time()
1545 else:
1546 # all is done
1547 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1548 db_nslcmop_update["statusEnteredTime"] = time()
1549 db_nslcmop_update["detailed-status"] = "done"
1550 db_nsr_update["detailed-status"] = "done"
1551
1552 except (ROclient.ROClientException, DbException, LcmException) as e:
1553 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1554 exc = e
1555 except asyncio.CancelledError:
1556 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1557 exc = "Operation was cancelled"
1558 except Exception as e:
1559 exc = traceback.format_exc()
1560 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1561 exc_info=True)
1562 finally:
1563 if exc:
1564 if db_nsr:
1565 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1566 db_nsr_update["operational-status"] = "failed"
1567 db_nsr_update["config-status"] = "failed"
1568 if db_nslcmop:
1569 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1570 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1571 db_nslcmop_update["statusEnteredTime"] = time()
1572 try:
1573 if db_nsr:
1574 db_nsr_update["_admin.nslcmop"] = None
1575 db_nsr_update["_admin.current-operation"] = None
1576 db_nsr_update["_admin.operation-type"] = None
1577 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1578
1579 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1580 ns_state = None
1581 error_description = None
1582 error_detail = None
1583 if instantiated_ok:
1584 ns_state = "READY"
1585 else:
1586 ns_state = "BROKEN"
1587 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1588 error_detail = error_text
1589 self._write_ns_status(
1590 nsr_id=nsr_id,
1591 ns_state=ns_state,
1592 current_operation="IDLE",
1593 current_operation_id=None,
1594 error_description=error_description,
1595 error_detail=error_detail
1596 )
1597
1598 if db_nslcmop_update:
1599 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1600 except DbException as e:
1601 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1602 if nslcmop_operation_state:
1603 try:
1604 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1605 "operationState": nslcmop_operation_state},
1606 loop=self.loop)
1607 except Exception as e:
1608 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1609
1610 self.logger.debug(logging_text + "Exit")
1611 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1612
1613 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1614 # Launch kdus if present in the descriptor
1615
1616 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1617
1618 def _get_cluster_id(cluster_id, cluster_type):
1619 nonlocal k8scluster_id_2_uuic
1620 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1621 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1622
1623 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1624 if not db_k8scluster:
1625 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1626 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1627 if not k8s_id:
1628 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1629 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1630 return k8s_id
1631
1632 logging_text += "Deploy kdus: "
1633 try:
1634 db_nsr_update = {"_admin.deployed.K8s": []}
1635 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1636
1637 # Look for all vnfds
1638 pending_tasks = {}
1639 index = 0
1640 for vnfr_data in db_vnfrs.values():
1641 for kdur in get_iterable(vnfr_data, "kdur"):
1642 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1643 kdumodel = None
1644 k8sclustertype = None
1645 error_text = None
1646 cluster_uuid = None
1647 if kdur.get("helm-chart"):
1648 kdumodel = kdur["helm-chart"]
1649 k8sclustertype = "chart"
1650 k8sclustertype_full = "helm-chart"
1651 elif kdur.get("juju-bundle"):
1652 kdumodel = kdur["juju-bundle"]
1653 k8sclustertype = "juju"
1654 k8sclustertype_full = "juju-bundle"
1655 else:
1656 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1657 " running"
1658 try:
1659 if not error_text:
1660 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1661 except LcmException as e:
1662 error_text = str(e)
1663 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1664
1665 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1666 "k8scluster-type": k8sclustertype,
1667 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1668 if error_text:
1669 k8s_instace_info["detailed-status"] = error_text
1670 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1671 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1672 if error_text:
1673 continue
1674
1675 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1676 "{}".format(index)}
1677 if k8sclustertype == "chart":
1678 task = asyncio.ensure_future(
1679 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1680 params=desc_params, db_dict=db_dict, timeout=3600)
1681 )
1682 else:
1683 task = asyncio.ensure_future(
1684 self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1685 atomic=True, params=desc_params,
1686 db_dict=db_dict, timeout=600)
1687 )
1688
1689 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1690 index += 1
1691 if not pending_tasks:
1692 return
1693 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1694 pending_list = list(pending_tasks.keys())
1695 while pending_list:
1696 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1697 return_when=asyncio.FIRST_COMPLETED)
1698 if not done_list: # timeout
1699 for task in pending_list:
1700 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1701 break
1702 for task in done_list:
1703 exc = task.exception()
1704 if exc:
1705 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1706 else:
1707 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1708
1709 except Exception as e:
1710 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1711 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1712 finally:
1713 # TODO Write in data base
1714 if db_nsr_update:
1715 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1716
1717 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1718 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1719 base_folder, task_instantiation_list, task_instantiation_info):
1720 # launch instantiate_N2VC in a asyncio task and register task object
1721 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1722 # if not found, create one entry and update database
1723
1724 # fill db_nsr._admin.deployed.VCA.<index>
1725 vca_index = -1
1726 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1727 if not vca_deployed:
1728 continue
1729 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1730 vca_deployed.get("vdu_id") == vdu_id and \
1731 vca_deployed.get("kdu_name") == kdu_name and \
1732 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1733 break
1734 else:
1735 # not found, create one.
1736 vca_deployed = {
1737 "member-vnf-index": member_vnf_index,
1738 "vdu_id": vdu_id,
1739 "kdu_name": kdu_name,
1740 "vdu_count_index": vdu_index,
1741 "operational-status": "init", # TODO revise
1742 "detailed-status": "", # TODO revise
1743 "step": "initial-deploy", # TODO revise
1744 "vnfd_id": vnfd_id,
1745 "vdu_name": vdu_name,
1746 }
1747 vca_index += 1
1748 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1749 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1750
1751 # Launch task
1752 task_n2vc = asyncio.ensure_future(
1753 self.instantiate_N2VC(
1754 logging_text=logging_text,
1755 vca_index=vca_index,
1756 nsi_id=nsi_id,
1757 db_nsr=db_nsr,
1758 db_vnfr=db_vnfr,
1759 vdu_id=vdu_id,
1760 kdu_name=kdu_name,
1761 vdu_index=vdu_index,
1762 deploy_params=deploy_params,
1763 config_descriptor=descriptor_config,
1764 base_folder=base_folder,
1765 )
1766 )
1767 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1768 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1769 task_instantiation_list.append(task_n2vc)
1770
1771 # Check if this VNFD has a configured terminate action
1772 def _has_terminate_config_primitive(self, vnfd):
1773 vnf_config = vnfd.get("vnf-configuration")
1774 if vnf_config and vnf_config.get("terminate-config-primitive"):
1775 return True
1776 else:
1777 return False
1778
1779 @staticmethod
1780 def _get_terminate_config_primitive_seq_list(vnfd):
1781 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1782 # No need to check for existing primitive twice, already done before
1783 vnf_config = vnfd.get("vnf-configuration")
1784 seq_list = vnf_config.get("terminate-config-primitive")
1785 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1786 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1787 return seq_list_sorted
1788
1789 @staticmethod
1790 def _create_nslcmop(nsr_id, operation, params):
1791 """
1792 Creates a ns-lcm-opp content to be stored at database.
1793 :param nsr_id: internal id of the instance
1794 :param operation: instantiate, terminate, scale, action, ...
1795 :param params: user parameters for the operation
1796 :return: dictionary following SOL005 format
1797 """
1798 # Raise exception if invalid arguments
1799 if not (nsr_id and operation and params):
1800 raise LcmException(
1801 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1802 now = time()
1803 _id = str(uuid4())
1804 nslcmop = {
1805 "id": _id,
1806 "_id": _id,
1807 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1808 "operationState": "PROCESSING",
1809 "statusEnteredTime": now,
1810 "nsInstanceId": nsr_id,
1811 "lcmOperationType": operation,
1812 "startTime": now,
1813 "isAutomaticInvocation": False,
1814 "operationParams": params,
1815 "isCancelPending": False,
1816 "links": {
1817 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1818 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1819 }
1820 }
1821 return nslcmop
1822
1823 def _format_additional_params(self, params):
1824 params = params or {}
1825 for key, value in params.items():
1826 if str(value).startswith("!!yaml "):
1827 params[key] = yaml.safe_load(value[7:])
1828 return params
1829
1830 def _get_terminate_primitive_params(self, seq, vnf_index):
1831 primitive = seq.get('name')
1832 primitive_params = {}
1833 params = {
1834 "member_vnf_index": vnf_index,
1835 "primitive": primitive,
1836 "primitive_params": primitive_params,
1837 }
1838 desc_params = {}
1839 return self._map_primitive_params(seq, params, desc_params)
1840
1841 # sub-operations
1842
1843 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1844 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1845 if (op.get('operationState') == 'COMPLETED'):
1846 # b. Skip sub-operation
1847 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1848 return self.SUBOPERATION_STATUS_SKIP
1849 else:
1850 # c. Reintent executing sub-operation
1851 # The sub-operation exists, and operationState != 'COMPLETED'
1852 # Update operationState = 'PROCESSING' to indicate a reintent.
1853 operationState = 'PROCESSING'
1854 detailed_status = 'In progress'
1855 self._update_suboperation_status(
1856 db_nslcmop, op_index, operationState, detailed_status)
1857 # Return the sub-operation index
1858 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1859 # with arguments extracted from the sub-operation
1860 return op_index
1861
1862 # Find a sub-operation where all keys in a matching dictionary must match
1863 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1864 def _find_suboperation(self, db_nslcmop, match):
1865 if (db_nslcmop and match):
1866 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1867 for i, op in enumerate(op_list):
1868 if all(op.get(k) == match[k] for k in match):
1869 return i
1870 return self.SUBOPERATION_STATUS_NOT_FOUND
1871
1872 # Update status for a sub-operation given its index
1873 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1874 # Update DB for HA tasks
1875 q_filter = {'_id': db_nslcmop['_id']}
1876 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1877 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1878 self.db.set_one("nslcmops",
1879 q_filter=q_filter,
1880 update_dict=update_dict,
1881 fail_on_empty=False)
1882
1883 # Add sub-operation, return the index of the added sub-operation
1884 # Optionally, set operationState, detailed-status, and operationType
1885 # Status and type are currently set for 'scale' sub-operations:
1886 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1887 # 'detailed-status' : status message
1888 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1889 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1890 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1891 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1892 RO_nsr_id=None, RO_scaling_info=None):
1893 if not (db_nslcmop):
1894 return self.SUBOPERATION_STATUS_NOT_FOUND
1895 # Get the "_admin.operations" list, if it exists
1896 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1897 op_list = db_nslcmop_admin.get('operations')
1898 # Create or append to the "_admin.operations" list
1899 new_op = {'member_vnf_index': vnf_index,
1900 'vdu_id': vdu_id,
1901 'vdu_count_index': vdu_count_index,
1902 'primitive': primitive,
1903 'primitive_params': mapped_primitive_params}
1904 if operationState:
1905 new_op['operationState'] = operationState
1906 if detailed_status:
1907 new_op['detailed-status'] = detailed_status
1908 if operationType:
1909 new_op['lcmOperationType'] = operationType
1910 if RO_nsr_id:
1911 new_op['RO_nsr_id'] = RO_nsr_id
1912 if RO_scaling_info:
1913 new_op['RO_scaling_info'] = RO_scaling_info
1914 if not op_list:
1915 # No existing operations, create key 'operations' with current operation as first list element
1916 db_nslcmop_admin.update({'operations': [new_op]})
1917 op_list = db_nslcmop_admin.get('operations')
1918 else:
1919 # Existing operations, append operation to list
1920 op_list.append(new_op)
1921
1922 db_nslcmop_update = {'_admin.operations': op_list}
1923 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1924 op_index = len(op_list) - 1
1925 return op_index
1926
1927 # Helper methods for scale() sub-operations
1928
1929 # pre-scale/post-scale:
1930 # Check for 3 different cases:
1931 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1932 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1933 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1934 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1935 operationType, RO_nsr_id=None, RO_scaling_info=None):
1936 # Find this sub-operation
1937 if (RO_nsr_id and RO_scaling_info):
1938 operationType = 'SCALE-RO'
1939 match = {
1940 'member_vnf_index': vnf_index,
1941 'RO_nsr_id': RO_nsr_id,
1942 'RO_scaling_info': RO_scaling_info,
1943 }
1944 else:
1945 match = {
1946 'member_vnf_index': vnf_index,
1947 'primitive': vnf_config_primitive,
1948 'primitive_params': primitive_params,
1949 'lcmOperationType': operationType
1950 }
1951 op_index = self._find_suboperation(db_nslcmop, match)
1952 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1953 # a. New sub-operation
1954 # The sub-operation does not exist, add it.
1955 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1956 # The following parameters are set to None for all kind of scaling:
1957 vdu_id = None
1958 vdu_count_index = None
1959 vdu_name = None
1960 if (RO_nsr_id and RO_scaling_info):
1961 vnf_config_primitive = None
1962 primitive_params = None
1963 else:
1964 RO_nsr_id = None
1965 RO_scaling_info = None
1966 # Initial status for sub-operation
1967 operationState = 'PROCESSING'
1968 detailed_status = 'In progress'
1969 # Add sub-operation for pre/post-scaling (zero or more operations)
1970 self._add_suboperation(db_nslcmop,
1971 vnf_index,
1972 vdu_id,
1973 vdu_count_index,
1974 vdu_name,
1975 vnf_config_primitive,
1976 primitive_params,
1977 operationState,
1978 detailed_status,
1979 operationType,
1980 RO_nsr_id,
1981 RO_scaling_info)
1982 return self.SUBOPERATION_STATUS_NEW
1983 else:
1984 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1985 # or op_index (operationState != 'COMPLETED')
1986 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1987
1988 # Function to return execution_environment id
1989
1990 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
1991 for vca in vca_deployed_list:
1992 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
1993 return vca["ee_id"]
1994
1995 # Helper methods for terminate()
1996
1997 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1998 """ Create a primitive with params from VNFD
1999 Called from terminate() before deleting instance
2000 Calls action() to execute the primitive """
2001 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
2002 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2003 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
2004 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2005 db_vnfds = {}
2006 # Loop over VNFRs
2007 for vnfr in db_vnfrs_list:
2008 vnfd_id = vnfr["vnfd-id"]
2009 vnf_index = vnfr["member-vnf-index-ref"]
2010 if vnfd_id not in db_vnfds:
2011 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2012 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2013 db_vnfds[vnfd_id] = vnfd
2014 vnfd = db_vnfds[vnfd_id]
2015 if not self._has_terminate_config_primitive(vnfd):
2016 continue
2017 # Get the primitive's sorted sequence list
2018 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2019 for seq in seq_list:
2020 # For each sequence in list, get primitive and call _ns_execute_primitive()
2021 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2022 vnf_index, seq.get("name"))
2023 self.logger.debug(logging_text + step)
2024 # Create the primitive for each sequence, i.e. "primitive": "touch"
2025 primitive = seq.get('name')
2026 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2027 # The following 3 parameters are currently set to None for 'terminate':
2028 # vdu_id, vdu_count_index, vdu_name
2029 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2030 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2031 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2032 # Add sub-operation
2033 self._add_suboperation(db_nslcmop,
2034 nslcmop_id,
2035 vnf_index,
2036 vdu_id,
2037 vdu_count_index,
2038 vdu_name,
2039 primitive,
2040 mapped_primitive_params)
2041 # Sub-operations: Call _ns_execute_primitive() instead of action()
2042 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2043 # nsr_deployed = db_nsr["_admin"]["deployed"]
2044
2045 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2046 # nsr_id, nslcmop_terminate_action_id)
2047 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2048 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2049 # if result not in result_ok:
2050 # raise LcmException(
2051 # "terminate_primitive_action for vnf_member_index={}",
2052 # " primitive={} fails with error {}".format(
2053 # vnf_index, seq.get("name"), result_detail))
2054
2055 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2056 try:
2057 await self.n2vc.exec_primitive(
2058 ee_id=ee_id,
2059 primitive_name=primitive,
2060 params_dict=mapped_primitive_params
2061 )
2062 except Exception as e:
2063 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2064 raise LcmException(
2065 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2066 .format(vnf_index, seq.get("name"), e),
2067 )
2068
2069 async def terminate(self, nsr_id, nslcmop_id):
2070
2071 # Try to lock HA task here
2072 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2073 if not task_is_locked_by_me:
2074 return
2075
2076 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2077 self.logger.debug(logging_text + "Enter")
2078 db_nsr = None
2079 db_nslcmop = None
2080 exc = None
2081 failed_detail = [] # annotates all failed error messages
2082 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2083 "_admin.current-operation": nslcmop_id,
2084 "_admin.operation-type": "terminate"}
2085 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2086 db_nslcmop_update = {}
2087 nslcmop_operation_state = None
2088 autoremove = False # autoremove after terminated
2089 pending_tasks = []
2090 try:
2091 # wait for any previous tasks in process
2092 step = "Waiting for previous operations to terminate"
2093 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2094
2095 self._write_ns_status(
2096 nsr_id=nsr_id,
2097 ns_state="TERMINATING",
2098 current_operation="TERMINATING",
2099 current_operation_id=nslcmop_id
2100 )
2101
2102 step = "Getting nslcmop={} from db".format(nslcmop_id)
2103 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2104 step = "Getting nsr={} from db".format(nsr_id)
2105 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2106 # nsd = db_nsr["nsd"]
2107 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2108 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2109 return
2110 # #TODO check if VIM is creating and wait
2111 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2112 # Call internal terminate action
2113 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2114
2115 pending_tasks = []
2116
2117 db_nsr_update["operational-status"] = "terminating"
2118 db_nsr_update["config-status"] = "terminating"
2119
2120 # remove NS
2121 try:
2122 step = "delete execution environment"
2123 self.logger.debug(logging_text + step)
2124
2125 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2126 pending_tasks.append(task_delete_ee)
2127 except Exception as e:
2128 msg = "Failed while deleting NS in VCA: {}".format(e)
2129 self.logger.error(msg)
2130 failed_detail.append(msg)
2131
2132 try:
2133 # Delete from k8scluster
2134 step = "delete kdus"
2135 self.logger.debug(logging_text + step)
2136 # print(nsr_deployed)
2137 if nsr_deployed:
2138 for kdu in nsr_deployed.get("K8s", ()):
2139 kdu_instance = kdu.get("kdu-instance")
2140 if not kdu_instance:
2141 continue
2142 if kdu.get("k8scluster-type") == "chart":
2143 task_delete_kdu_instance = asyncio.ensure_future(
2144 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2145 kdu_instance=kdu_instance))
2146 elif kdu.get("k8scluster-type") == "juju":
2147 task_delete_kdu_instance = asyncio.ensure_future(
2148 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2149 kdu_instance=kdu_instance))
2150 else:
2151 self.error(logging_text + "Unknown k8s deployment type {}".
2152 format(kdu.get("k8scluster-type")))
2153 continue
2154 pending_tasks.append(task_delete_kdu_instance)
2155 except LcmException as e:
2156 msg = "Failed while deleting KDUs from NS: {}".format(e)
2157 self.logger.error(msg)
2158 failed_detail.append(msg)
2159
2160 # remove from RO
2161 RO_fail = False
2162
2163 # Delete ns
2164 RO_nsr_id = RO_delete_action = None
2165 if nsr_deployed and nsr_deployed.get("RO"):
2166 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2167 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2168 try:
2169 if RO_nsr_id:
2170 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2171 "Deleting ns from VIM"
2172 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2173 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2174 self.logger.debug(logging_text + step)
2175 desc = await self.RO.delete("ns", RO_nsr_id)
2176 RO_delete_action = desc["action_id"]
2177 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2178 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2179 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2180 if RO_delete_action:
2181 # wait until NS is deleted from VIM
2182 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2183 format(RO_nsr_id, RO_delete_action)
2184 detailed_status_old = None
2185 self.logger.debug(logging_text + step)
2186
2187 delete_timeout = 20 * 60 # 20 minutes
2188 while delete_timeout > 0:
2189 desc = await self.RO.show(
2190 "ns",
2191 item_id_name=RO_nsr_id,
2192 extra_item="action",
2193 extra_item_id=RO_delete_action)
2194 ns_status, ns_status_info = self.RO.check_action_status(desc)
2195 if ns_status == "ERROR":
2196 raise ROclient.ROClientException(ns_status_info)
2197 elif ns_status == "BUILD":
2198 detailed_status = step + "; {}".format(ns_status_info)
2199 elif ns_status == "ACTIVE":
2200 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2201 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2202 break
2203 else:
2204 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2205 if detailed_status != detailed_status_old:
2206 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2207 db_nsr_update["detailed-status"] = detailed_status
2208 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2209 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2210 await asyncio.sleep(5, loop=self.loop)
2211 delete_timeout -= 5
2212 else: # delete_timeout <= 0:
2213 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2214
2215 except ROclient.ROClientException as e:
2216 if e.http_code == 404: # not found
2217 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2218 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2219 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2220 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2221 elif e.http_code == 409: # conflict
2222 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2223 self.logger.debug(logging_text + failed_detail[-1])
2224 RO_fail = True
2225 else:
2226 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2227 self.logger.error(logging_text + failed_detail[-1])
2228 RO_fail = True
2229
2230 # Delete nsd
2231 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2232 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2233 try:
2234 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2235 "Deleting nsd from RO"
2236 await self.RO.delete("nsd", RO_nsd_id)
2237 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2238 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2239 except ROclient.ROClientException as e:
2240 if e.http_code == 404: # not found
2241 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2242 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2243 elif e.http_code == 409: # conflict
2244 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2245 self.logger.debug(logging_text + failed_detail[-1])
2246 RO_fail = True
2247 else:
2248 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2249 self.logger.error(logging_text + failed_detail[-1])
2250 RO_fail = True
2251
2252 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2253 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2254 if not vnf_deployed or not vnf_deployed["id"]:
2255 continue
2256 try:
2257 RO_vnfd_id = vnf_deployed["id"]
2258 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2259 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2260 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2261 await self.RO.delete("vnfd", RO_vnfd_id)
2262 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2263 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2264 except ROclient.ROClientException as e:
2265 if e.http_code == 404: # not found
2266 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2267 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2268 elif e.http_code == 409: # conflict
2269 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2270 self.logger.debug(logging_text + failed_detail[-1])
2271 else:
2272 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2273 self.logger.error(logging_text + failed_detail[-1])
2274
2275 if failed_detail:
2276 terminate_ok = False
2277 self.logger.error(logging_text + " ;".join(failed_detail))
2278 db_nsr_update["operational-status"] = "failed"
2279 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2280 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2281 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2282 db_nslcmop_update["statusEnteredTime"] = time()
2283 else:
2284 terminate_ok = True
2285 db_nsr_update["operational-status"] = "terminated"
2286 db_nsr_update["detailed-status"] = "Done"
2287 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2288 db_nslcmop_update["detailed-status"] = "Done"
2289 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2290 db_nslcmop_update["statusEnteredTime"] = time()
2291 if db_nslcmop["operationParams"].get("autoremove"):
2292 autoremove = True
2293
2294 except (ROclient.ROClientException, DbException, LcmException) as e:
2295 self.logger.error(logging_text + "Exit Exception {}".format(e))
2296 exc = e
2297 except asyncio.CancelledError:
2298 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2299 exc = "Operation was cancelled"
2300 except Exception as e:
2301 exc = traceback.format_exc()
2302 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2303 finally:
2304 if exc and db_nslcmop:
2305 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2306 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2307 db_nslcmop_update["statusEnteredTime"] = time()
2308 try:
2309 if db_nslcmop and db_nslcmop_update:
2310 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2311 if db_nsr:
2312 db_nsr_update["_admin.nslcmop"] = None
2313 db_nsr_update["_admin.current-operation"] = None
2314 db_nsr_update["_admin.operation-type"] = None
2315 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2316
2317 if terminate_ok:
2318 ns_state = "IDLE"
2319 error_description = None
2320 error_detail = None
2321 else:
2322 ns_state = "BROKEN"
2323 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2324 error_detail = "; ".join(failed_detail)
2325
2326 self._write_ns_status(
2327 nsr_id=nsr_id,
2328 ns_state=ns_state,
2329 current_operation="IDLE",
2330 current_operation_id=None,
2331 error_description=error_description,
2332 error_detail=error_detail
2333 )
2334
2335 except DbException as e:
2336 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2337 if nslcmop_operation_state:
2338 try:
2339 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2340 "operationState": nslcmop_operation_state,
2341 "autoremove": autoremove},
2342 loop=self.loop)
2343 except Exception as e:
2344 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2345
2346 # wait for pending tasks
2347 done = None
2348 pending = None
2349 if pending_tasks:
2350 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2351 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2352 if not pending:
2353 self.logger.debug(logging_text + 'All tasks finished...')
2354 else:
2355 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2356
2357 self.logger.debug(logging_text + "Exit")
2358 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2359
2360 @staticmethod
2361 def _map_primitive_params(primitive_desc, params, instantiation_params):
2362 """
2363 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2364 The default-value is used. If it is between < > it look for a value at instantiation_params
2365 :param primitive_desc: portion of VNFD/NSD that describes primitive
2366 :param params: Params provided by user
2367 :param instantiation_params: Instantiation params provided by user
2368 :return: a dictionary with the calculated params
2369 """
2370 calculated_params = {}
2371 for parameter in primitive_desc.get("parameter", ()):
2372 param_name = parameter["name"]
2373 if param_name in params:
2374 calculated_params[param_name] = params[param_name]
2375 elif "default-value" in parameter or "value" in parameter:
2376 if "value" in parameter:
2377 calculated_params[param_name] = parameter["value"]
2378 else:
2379 calculated_params[param_name] = parameter["default-value"]
2380 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2381 and calculated_params[param_name].endswith(">"):
2382 if calculated_params[param_name][1:-1] in instantiation_params:
2383 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2384 else:
2385 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2386 format(calculated_params[param_name], primitive_desc["name"]))
2387 else:
2388 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2389 format(param_name, primitive_desc["name"]))
2390
2391 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2392 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2393 width=256)
2394 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2395 calculated_params[param_name] = calculated_params[param_name][7:]
2396
2397 # add always ns_config_info if primitive name is config
2398 if primitive_desc["name"] == "config":
2399 if "ns_config_info" in instantiation_params:
2400 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2401 return calculated_params
2402
2403 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2404 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2405
2406 # find vca_deployed record for this action
2407 try:
2408 for vca_deployed in db_deployed["VCA"]:
2409 if not vca_deployed:
2410 continue
2411 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2412 continue
2413 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2414 continue
2415 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2416 continue
2417 break
2418 else:
2419 # vca_deployed not found
2420 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2421 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2422
2423 # get ee_id
2424 ee_id = vca_deployed.get("ee_id")
2425 if not ee_id:
2426 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2427 "execution environment"
2428 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2429
2430 if primitive == "config":
2431 primitive_params = {"params": primitive_params}
2432
2433 while retries >= 0:
2434 try:
2435 output = await self.n2vc.exec_primitive(
2436 ee_id=ee_id,
2437 primitive_name=primitive,
2438 params_dict=primitive_params
2439 )
2440 # execution was OK
2441 break
2442 except Exception as e:
2443 retries -= 1
2444 if retries >= 0:
2445 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2446 # wait and retry
2447 await asyncio.sleep(retries_interval, loop=self.loop)
2448 else:
2449 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2450
2451 return output, 'OK'
2452
2453 except Exception as e:
2454 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2455
2456 async def action(self, nsr_id, nslcmop_id):
2457
2458 # Try to lock HA task here
2459 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2460 if not task_is_locked_by_me:
2461 return
2462
2463 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2464 self.logger.debug(logging_text + "Enter")
2465 # get all needed from database
2466 db_nsr = None
2467 db_nslcmop = None
2468 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2469 "_admin.current-operation": nslcmop_id,
2470 "_admin.operation-type": "action"}
2471 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2472 db_nslcmop_update = {}
2473 nslcmop_operation_state = None
2474 nslcmop_operation_state_detail = None
2475 exc = None
2476 try:
2477 # wait for any previous tasks in process
2478 step = "Waiting for previous operations to terminate"
2479 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2480
2481 self._write_ns_status(
2482 nsr_id=nsr_id,
2483 ns_state=None,
2484 current_operation="RUNNING ACTION",
2485 current_operation_id=nslcmop_id
2486 )
2487
2488 step = "Getting information from database"
2489 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2490 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2491
2492 nsr_deployed = db_nsr["_admin"].get("deployed")
2493 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2494 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2495 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2496 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2497 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2498
2499 if vnf_index:
2500 step = "Getting vnfr from database"
2501 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2502 step = "Getting vnfd from database"
2503 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2504 else:
2505 if db_nsr.get("nsd"):
2506 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2507 else:
2508 step = "Getting nsd from database"
2509 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2510
2511 # for backward compatibility
2512 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2513 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2514 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2515 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2516
2517 primitive = db_nslcmop["operationParams"]["primitive"]
2518 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2519
2520 # look for primitive
2521 config_primitive_desc = None
2522 if vdu_id:
2523 for vdu in get_iterable(db_vnfd, "vdu"):
2524 if vdu_id == vdu["id"]:
2525 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2526 if config_primitive["name"] == primitive:
2527 config_primitive_desc = config_primitive
2528 break
2529 elif kdu_name:
2530 self.logger.debug(logging_text + "Checking actions in KDUs")
2531 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2532 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2533 if primitive_params:
2534 desc_params.update(primitive_params)
2535 # TODO Check if we will need something at vnf level
2536 index = 0
2537 for kdu in get_iterable(nsr_deployed, "K8s"):
2538 if kdu_name == kdu["kdu-name"]:
2539 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2540 "path": "_admin.deployed.K8s.{}".format(index)}
2541 if primitive == "upgrade":
2542 if desc_params.get("kdu_model"):
2543 kdu_model = desc_params.get("kdu_model")
2544 del desc_params["kdu_model"]
2545 else:
2546 kdu_model = kdu.get("kdu-model")
2547 parts = kdu_model.split(sep=":")
2548 if len(parts) == 2:
2549 kdu_model = parts[0]
2550
2551 if kdu.get("k8scluster-type") == "chart":
2552 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2553 kdu_instance=kdu.get("kdu-instance"),
2554 atomic=True, kdu_model=kdu_model,
2555 params=desc_params, db_dict=db_dict,
2556 timeout=300)
2557 elif kdu.get("k8scluster-type") == "juju":
2558 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2559 kdu_instance=kdu.get("kdu-instance"),
2560 atomic=True, kdu_model=kdu_model,
2561 params=desc_params, db_dict=db_dict,
2562 timeout=300)
2563
2564 else:
2565 msg = "k8scluster-type not defined"
2566 raise LcmException(msg)
2567
2568 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2569 break
2570 elif primitive == "rollback":
2571 if kdu.get("k8scluster-type") == "chart":
2572 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2573 kdu_instance=kdu.get("kdu-instance"),
2574 db_dict=db_dict)
2575 elif kdu.get("k8scluster-type") == "juju":
2576 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2577 kdu_instance=kdu.get("kdu-instance"),
2578 db_dict=db_dict)
2579 else:
2580 msg = "k8scluster-type not defined"
2581 raise LcmException(msg)
2582 break
2583 elif primitive == "status":
2584 if kdu.get("k8scluster-type") == "chart":
2585 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2586 kdu_instance=kdu.get("kdu-instance"))
2587 elif kdu.get("k8scluster-type") == "juju":
2588 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2589 kdu_instance=kdu.get("kdu-instance"))
2590 else:
2591 msg = "k8scluster-type not defined"
2592 raise LcmException(msg)
2593 break
2594 index += 1
2595
2596 else:
2597 raise LcmException("KDU '{}' not found".format(kdu_name))
2598 if output:
2599 db_nslcmop_update["detailed-status"] = output
2600 db_nslcmop_update["operationState"] = 'COMPLETED'
2601 db_nslcmop_update["statusEnteredTime"] = time()
2602 else:
2603 db_nslcmop_update["detailed-status"] = ''
2604 db_nslcmop_update["operationState"] = 'FAILED'
2605 db_nslcmop_update["statusEnteredTime"] = time()
2606 return
2607 elif vnf_index:
2608 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2609 if config_primitive["name"] == primitive:
2610 config_primitive_desc = config_primitive
2611 break
2612 else:
2613 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2614 if config_primitive["name"] == primitive:
2615 config_primitive_desc = config_primitive
2616 break
2617
2618 if not config_primitive_desc:
2619 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2620 format(primitive))
2621
2622 desc_params = {}
2623 if vnf_index:
2624 if db_vnfr.get("additionalParamsForVnf"):
2625 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2626 if vdu_id:
2627 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2628 if vdur.get("additionalParams"):
2629 desc_params = self._format_additional_params(vdur["additionalParams"])
2630 else:
2631 if db_nsr.get("additionalParamsForNs"):
2632 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2633
2634 # TODO check if ns is in a proper status
2635 output, detail = await self._ns_execute_primitive(
2636 db_deployed=nsr_deployed,
2637 member_vnf_index=vnf_index,
2638 vdu_id=vdu_id,
2639 vdu_name=vdu_name,
2640 vdu_count_index=vdu_count_index,
2641 primitive=primitive,
2642 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2643
2644 detailed_status = output
2645 if detail == 'OK':
2646 result = 'COMPLETED'
2647 else:
2648 result = 'FAILED'
2649
2650 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2651 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2652 db_nslcmop_update["statusEnteredTime"] = time()
2653 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2654 return # database update is called inside finally
2655
2656 except (DbException, LcmException) as e:
2657 self.logger.error(logging_text + "Exit Exception {}".format(e))
2658 exc = e
2659 except asyncio.CancelledError:
2660 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2661 exc = "Operation was cancelled"
2662 except Exception as e:
2663 exc = traceback.format_exc()
2664 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2665 finally:
2666 if exc and db_nslcmop:
2667 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2668 "FAILED {}: {}".format(step, exc)
2669 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2670 db_nslcmop_update["statusEnteredTime"] = time()
2671 try:
2672 if db_nslcmop_update:
2673 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2674 if db_nsr:
2675 db_nsr_update["_admin.nslcmop"] = None
2676 db_nsr_update["_admin.operation-type"] = None
2677 db_nsr_update["_admin.nslcmop"] = None
2678 db_nsr_update["_admin.current-operation"] = None
2679 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2680 self._write_ns_status(
2681 nsr_id=nsr_id,
2682 ns_state=None,
2683 current_operation="IDLE",
2684 current_operation_id=None
2685 )
2686 except DbException as e:
2687 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2688 self.logger.debug(logging_text + "Exit")
2689 if nslcmop_operation_state:
2690 try:
2691 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2692 "operationState": nslcmop_operation_state},
2693 loop=self.loop)
2694 except Exception as e:
2695 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2696 self.logger.debug(logging_text + "Exit")
2697 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2698 return nslcmop_operation_state, nslcmop_operation_state_detail
2699
2700 async def scale(self, nsr_id, nslcmop_id):
2701
2702 # Try to lock HA task here
2703 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2704 if not task_is_locked_by_me:
2705 return
2706
2707 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2708 self.logger.debug(logging_text + "Enter")
2709 # get all needed from database
2710 db_nsr = None
2711 db_nslcmop = None
2712 db_nslcmop_update = {}
2713 nslcmop_operation_state = None
2714 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2715 "_admin.current-operation": nslcmop_id,
2716 "_admin.operation-type": "scale"}
2717 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2718 exc = None
2719 # in case of error, indicates what part of scale was failed to put nsr at error status
2720 scale_process = None
2721 old_operational_status = ""
2722 old_config_status = ""
2723 vnfr_scaled = False
2724 try:
2725 # wait for any previous tasks in process
2726 step = "Waiting for previous operations to terminate"
2727 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2728
2729 self._write_ns_status(
2730 nsr_id=nsr_id,
2731 ns_state=None,
2732 current_operation="SCALING",
2733 current_operation_id=nslcmop_id
2734 )
2735
2736 step = "Getting nslcmop from database"
2737 self.logger.debug(step + " after having waited for previous tasks to be completed")
2738 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2739 step = "Getting nsr from database"
2740 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2741
2742 old_operational_status = db_nsr["operational-status"]
2743 old_config_status = db_nsr["config-status"]
2744 step = "Parsing scaling parameters"
2745 # self.logger.debug(step)
2746 db_nsr_update["operational-status"] = "scaling"
2747 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2748 nsr_deployed = db_nsr["_admin"].get("deployed")
2749
2750 #######
2751 nsr_deployed = db_nsr["_admin"].get("deployed")
2752 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2753 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2754 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2755 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2756 #######
2757
2758 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2759 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2760 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2761 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2762 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2763
2764 # for backward compatibility
2765 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2766 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2767 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2768 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2769
2770 step = "Getting vnfr from database"
2771 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2772 step = "Getting vnfd from database"
2773 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2774
2775 step = "Getting scaling-group-descriptor"
2776 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2777 if scaling_descriptor["name"] == scaling_group:
2778 break
2779 else:
2780 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2781 "at vnfd:scaling-group-descriptor".format(scaling_group))
2782
2783 # cooldown_time = 0
2784 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2785 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2786 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2787 # break
2788
2789 # TODO check if ns is in a proper status
2790 step = "Sending scale order to VIM"
2791 nb_scale_op = 0
2792 if not db_nsr["_admin"].get("scaling-group"):
2793 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2794 admin_scale_index = 0
2795 else:
2796 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2797 if admin_scale_info["name"] == scaling_group:
2798 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2799 break
2800 else: # not found, set index one plus last element and add new entry with the name
2801 admin_scale_index += 1
2802 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2803 RO_scaling_info = []
2804 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2805 if scaling_type == "SCALE_OUT":
2806 # count if max-instance-count is reached
2807 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2808 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2809 if nb_scale_op >= max_instance_count:
2810 raise LcmException("reached the limit of {} (max-instance-count) "
2811 "scaling-out operations for the "
2812 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2813
2814 nb_scale_op += 1
2815 vdu_scaling_info["scaling_direction"] = "OUT"
2816 vdu_scaling_info["vdu-create"] = {}
2817 for vdu_scale_info in scaling_descriptor["vdu"]:
2818 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2819 "type": "create", "count": vdu_scale_info.get("count", 1)})
2820 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2821
2822 elif scaling_type == "SCALE_IN":
2823 # count if min-instance-count is reached
2824 min_instance_count = 0
2825 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2826 min_instance_count = int(scaling_descriptor["min-instance-count"])
2827 if nb_scale_op <= min_instance_count:
2828 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2829 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2830 nb_scale_op -= 1
2831 vdu_scaling_info["scaling_direction"] = "IN"
2832 vdu_scaling_info["vdu-delete"] = {}
2833 for vdu_scale_info in scaling_descriptor["vdu"]:
2834 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2835 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2836 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2837
2838 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2839 vdu_create = vdu_scaling_info.get("vdu-create")
2840 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2841 if vdu_scaling_info["scaling_direction"] == "IN":
2842 for vdur in reversed(db_vnfr["vdur"]):
2843 if vdu_delete.get(vdur["vdu-id-ref"]):
2844 vdu_delete[vdur["vdu-id-ref"]] -= 1
2845 vdu_scaling_info["vdu"].append({
2846 "name": vdur["name"],
2847 "vdu_id": vdur["vdu-id-ref"],
2848 "interface": []
2849 })
2850 for interface in vdur["interfaces"]:
2851 vdu_scaling_info["vdu"][-1]["interface"].append({
2852 "name": interface["name"],
2853 "ip_address": interface["ip-address"],
2854 "mac_address": interface.get("mac-address"),
2855 })
2856 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2857
2858 # PRE-SCALE BEGIN
2859 step = "Executing pre-scale vnf-config-primitive"
2860 if scaling_descriptor.get("scaling-config-action"):
2861 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2862 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2863 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2864 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2865 step = db_nslcmop_update["detailed-status"] = \
2866 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2867
2868 # look for primitive
2869 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2870 if config_primitive["name"] == vnf_config_primitive:
2871 break
2872 else:
2873 raise LcmException(
2874 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2875 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2876 "primitive".format(scaling_group, config_primitive))
2877
2878 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2879 if db_vnfr.get("additionalParamsForVnf"):
2880 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2881
2882 scale_process = "VCA"
2883 db_nsr_update["config-status"] = "configuring pre-scaling"
2884 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2885
2886 # Pre-scale reintent check: Check if this sub-operation has been executed before
2887 op_index = self._check_or_add_scale_suboperation(
2888 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2889 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2890 # Skip sub-operation
2891 result = 'COMPLETED'
2892 result_detail = 'Done'
2893 self.logger.debug(logging_text +
2894 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2895 vnf_config_primitive, result, result_detail))
2896 else:
2897 if (op_index == self.SUBOPERATION_STATUS_NEW):
2898 # New sub-operation: Get index of this sub-operation
2899 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2900 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2901 format(vnf_config_primitive))
2902 else:
2903 # Reintent: Get registered params for this existing sub-operation
2904 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2905 vnf_index = op.get('member_vnf_index')
2906 vnf_config_primitive = op.get('primitive')
2907 primitive_params = op.get('primitive_params')
2908 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2909 format(vnf_config_primitive))
2910 # Execute the primitive, either with new (first-time) or registered (reintent) args
2911 result, result_detail = await self._ns_execute_primitive(
2912 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2913 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2914 vnf_config_primitive, result, result_detail))
2915 # Update operationState = COMPLETED | FAILED
2916 self._update_suboperation_status(
2917 db_nslcmop, op_index, result, result_detail)
2918
2919 if result == "FAILED":
2920 raise LcmException(result_detail)
2921 db_nsr_update["config-status"] = old_config_status
2922 scale_process = None
2923 # PRE-SCALE END
2924
2925 # SCALE RO - BEGIN
2926 # Should this block be skipped if 'RO_nsr_id' == None ?
2927 # if (RO_nsr_id and RO_scaling_info):
2928 if RO_scaling_info:
2929 scale_process = "RO"
2930 # Scale RO reintent check: Check if this sub-operation has been executed before
2931 op_index = self._check_or_add_scale_suboperation(
2932 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2933 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2934 # Skip sub-operation
2935 result = 'COMPLETED'
2936 result_detail = 'Done'
2937 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2938 result, result_detail))
2939 else:
2940 if (op_index == self.SUBOPERATION_STATUS_NEW):
2941 # New sub-operation: Get index of this sub-operation
2942 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2943 self.logger.debug(logging_text + "New sub-operation RO")
2944 else:
2945 # Reintent: Get registered params for this existing sub-operation
2946 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2947 RO_nsr_id = op.get('RO_nsr_id')
2948 RO_scaling_info = op.get('RO_scaling_info')
2949 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2950 vnf_config_primitive))
2951
2952 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2953 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2954 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2955 # wait until ready
2956 RO_nslcmop_id = RO_desc["instance_action_id"]
2957 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2958
2959 RO_task_done = False
2960 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2961 detailed_status_old = None
2962 self.logger.debug(logging_text + step)
2963
2964 deployment_timeout = 1 * 3600 # One hour
2965 while deployment_timeout > 0:
2966 if not RO_task_done:
2967 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2968 extra_item_id=RO_nslcmop_id)
2969 ns_status, ns_status_info = self.RO.check_action_status(desc)
2970 if ns_status == "ERROR":
2971 raise ROclient.ROClientException(ns_status_info)
2972 elif ns_status == "BUILD":
2973 detailed_status = step + "; {}".format(ns_status_info)
2974 elif ns_status == "ACTIVE":
2975 RO_task_done = True
2976 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2977 self.logger.debug(logging_text + step)
2978 else:
2979 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2980 else:
2981
2982 if ns_status == "ERROR":
2983 raise ROclient.ROClientException(ns_status_info)
2984 elif ns_status == "BUILD":
2985 detailed_status = step + "; {}".format(ns_status_info)
2986 elif ns_status == "ACTIVE":
2987 step = detailed_status = \
2988 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2989 if not vnfr_scaled:
2990 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2991 vnfr_scaled = True
2992 try:
2993 desc = await self.RO.show("ns", RO_nsr_id)
2994 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2995 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2996 break
2997 except LcmExceptionNoMgmtIP:
2998 pass
2999 else:
3000 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
3001 if detailed_status != detailed_status_old:
3002 self._update_suboperation_status(
3003 db_nslcmop, op_index, 'COMPLETED', detailed_status)
3004 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
3005 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3006
3007 await asyncio.sleep(5, loop=self.loop)
3008 deployment_timeout -= 5
3009 if deployment_timeout <= 0:
3010 self._update_suboperation_status(
3011 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3012 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3013
3014 # update VDU_SCALING_INFO with the obtained ip_addresses
3015 if vdu_scaling_info["scaling_direction"] == "OUT":
3016 for vdur in reversed(db_vnfr["vdur"]):
3017 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3018 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3019 vdu_scaling_info["vdu"].append({
3020 "name": vdur["name"],
3021 "vdu_id": vdur["vdu-id-ref"],
3022 "interface": []
3023 })
3024 for interface in vdur["interfaces"]:
3025 vdu_scaling_info["vdu"][-1]["interface"].append({
3026 "name": interface["name"],
3027 "ip_address": interface["ip-address"],
3028 "mac_address": interface.get("mac-address"),
3029 })
3030 del vdu_scaling_info["vdu-create"]
3031
3032 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3033 # SCALE RO - END
3034
3035 scale_process = None
3036 if db_nsr_update:
3037 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3038
3039 # POST-SCALE BEGIN
3040 # execute primitive service POST-SCALING
3041 step = "Executing post-scale vnf-config-primitive"
3042 if scaling_descriptor.get("scaling-config-action"):
3043 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3044 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3045 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3046 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3047 step = db_nslcmop_update["detailed-status"] = \
3048 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3049
3050 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3051 if db_vnfr.get("additionalParamsForVnf"):
3052 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3053
3054 # look for primitive
3055 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3056 if config_primitive["name"] == vnf_config_primitive:
3057 break
3058 else:
3059 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3060 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3061 "match any vnf-configuration:config-primitive".format(scaling_group,
3062 config_primitive))
3063 scale_process = "VCA"
3064 db_nsr_update["config-status"] = "configuring post-scaling"
3065 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3066
3067 # Post-scale reintent check: Check if this sub-operation has been executed before
3068 op_index = self._check_or_add_scale_suboperation(
3069 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3070 if op_index == self.SUBOPERATION_STATUS_SKIP:
3071 # Skip sub-operation
3072 result = 'COMPLETED'
3073 result_detail = 'Done'
3074 self.logger.debug(logging_text +
3075 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3076 format(vnf_config_primitive, result, result_detail))
3077 else:
3078 if op_index == self.SUBOPERATION_STATUS_NEW:
3079 # New sub-operation: Get index of this sub-operation
3080 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3081 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3082 format(vnf_config_primitive))
3083 else:
3084 # Reintent: Get registered params for this existing sub-operation
3085 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3086 vnf_index = op.get('member_vnf_index')
3087 vnf_config_primitive = op.get('primitive')
3088 primitive_params = op.get('primitive_params')
3089 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3090 format(vnf_config_primitive))
3091 # Execute the primitive, either with new (first-time) or registered (reintent) args
3092 result, result_detail = await self._ns_execute_primitive(
3093 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3094 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3095 vnf_config_primitive, result, result_detail))
3096 # Update operationState = COMPLETED | FAILED
3097 self._update_suboperation_status(
3098 db_nslcmop, op_index, result, result_detail)
3099
3100 if result == "FAILED":
3101 raise LcmException(result_detail)
3102 db_nsr_update["config-status"] = old_config_status
3103 scale_process = None
3104 # POST-SCALE END
3105
3106 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3107 db_nslcmop_update["statusEnteredTime"] = time()
3108 db_nslcmop_update["detailed-status"] = "done"
3109 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3110 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3111 else old_operational_status
3112 db_nsr_update["config-status"] = old_config_status
3113 return
3114 except (ROclient.ROClientException, DbException, LcmException) as e:
3115 self.logger.error(logging_text + "Exit Exception {}".format(e))
3116 exc = e
3117 except asyncio.CancelledError:
3118 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3119 exc = "Operation was cancelled"
3120 except Exception as e:
3121 exc = traceback.format_exc()
3122 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3123 finally:
3124 if exc:
3125 if db_nslcmop:
3126 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3127 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3128 db_nslcmop_update["statusEnteredTime"] = time()
3129 if db_nsr:
3130 db_nsr_update["operational-status"] = old_operational_status
3131 db_nsr_update["config-status"] = old_config_status
3132 db_nsr_update["detailed-status"] = ""
3133 db_nsr_update["_admin.nslcmop"] = None
3134 if scale_process:
3135 if "VCA" in scale_process:
3136 db_nsr_update["config-status"] = "failed"
3137 if "RO" in scale_process:
3138 db_nsr_update["operational-status"] = "failed"
3139 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3140 exc)
3141 try:
3142 if db_nslcmop and db_nslcmop_update:
3143 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3144 if db_nsr:
3145 db_nsr_update["_admin.current-operation"] = None
3146 db_nsr_update["_admin.operation-type"] = None
3147 db_nsr_update["_admin.nslcmop"] = None
3148 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3149
3150 self._write_ns_status(
3151 nsr_id=nsr_id,
3152 ns_state=None,
3153 current_operation="IDLE",
3154 current_operation_id=None
3155 )
3156
3157 except DbException as e:
3158 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3159 if nslcmop_operation_state:
3160 try:
3161 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3162 "operationState": nslcmop_operation_state},
3163 loop=self.loop)
3164 # if cooldown_time:
3165 # await asyncio.sleep(cooldown_time, loop=self.loop)
3166 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3167 except Exception as e:
3168 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3169 self.logger.debug(logging_text + "Exit")
3170 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")