fix bug938 added method to find ee_id
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
29 from n2vc.k8s_helm_conn import K8sHelmConnector
30 from n2vc.k8s_juju_conn import K8sJujuConnector
31
32 from osm_common.dbbase import DbException
33 from osm_common.fsbase import FsException
34
35 from n2vc.n2vc_juju_conn import N2VCJujuConnector
36 from n2vc.exceptions import N2VCException
37
38 from copy import copy, deepcopy
39 from http import HTTPStatus
40 from time import time
41 from uuid import uuid4
42
43 __author__ = "Alfonso Tierno"
44
45
46 def get_iterable(in_dict, in_key):
47 """
48 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
49 :param in_dict: a dictionary
50 :param in_key: the key to look for at in_dict
51 :return: in_dict[in_var] or () if it is None or not present
52 """
53 if not in_dict.get(in_key):
54 return ()
55 return in_dict[in_key]
56
57
58 def populate_dict(target_dict, key_list, value):
59 """
60 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
61 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
62 :param target_dict: dictionary to be changed
63 :param key_list: list of keys to insert at target_dict
64 :param value:
65 :return: None
66 """
67 for key in key_list[0:-1]:
68 if key not in target_dict:
69 target_dict[key] = {}
70 target_dict = target_dict[key]
71 target_dict[key_list[-1]] = value
72
73
74 class NsLcm(LcmBase):
75 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
76 total_deploy_timeout = 2 * 3600 # global timeout for deployment
77 timeout_charm_delete = 10 * 60
78 timeout_primitive = 10 * 60 # timeout for primitive execution
79
80 SUBOPERATION_STATUS_NOT_FOUND = -1
81 SUBOPERATION_STATUS_NEW = -2
82 SUBOPERATION_STATUS_SKIP = -3
83
84 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
85 """
86 Init, Connect to database, filesystem storage, and messaging
87 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
88 :return: None
89 """
90 super().__init__(
91 db=db,
92 msg=msg,
93 fs=fs,
94 logger=logging.getLogger('lcm.ns')
95 )
96
97 self.loop = loop
98 self.lcm_tasks = lcm_tasks
99 self.ro_config = ro_config
100 self.vca_config = vca_config
101 if 'pubkey' in self.vca_config:
102 self.vca_config['public_key'] = self.vca_config['pubkey']
103 if 'cacert' in self.vca_config:
104 self.vca_config['ca_cert'] = self.vca_config['cacert']
105 if 'apiproxy' in self.vca_config:
106 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
107
108 # create N2VC connector
109 self.n2vc = N2VCJujuConnector(
110 db=self.db,
111 fs=self.fs,
112 log=self.logger,
113 loop=self.loop,
114 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
115 username=self.vca_config.get('user', None),
116 vca_config=self.vca_config,
117 on_update_db=self._on_update_n2vc_db,
118 # ca_cert=self.vca_config.get('cacert'),
119 # api_proxy=self.vca_config.get('apiproxy'),
120 )
121
122 self.k8sclusterhelm = K8sHelmConnector(
123 kubectl_command=self.vca_config.get("kubectlpath"),
124 helm_command=self.vca_config.get("helmpath"),
125 fs=self.fs,
126 log=self.logger,
127 db=self.db,
128 on_update_db=None,
129 )
130
131 self.k8sclusterjuju = K8sJujuConnector(
132 kubectl_command=self.vca_config.get("kubectlpath"),
133 juju_command=self.vca_config.get("jujupath"),
134 fs=self.fs,
135 log=self.logger,
136 db=self.db,
137 on_update_db=None,
138 )
139
140 # create RO client
141 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
142
143 def _on_update_n2vc_db(self, table, filter, path, updated_data):
144
145 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
146 .format(table, filter, path, updated_data))
147
148 return
149 # write NS status to database
150 # try:
151 # # nsrs_id = filter.get('_id')
152 # # print(nsrs_id)
153 # # get ns record
154 # nsr = self.db.get_one(table=table, q_filter=filter)
155 # # get VCA deployed list
156 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
157 # # get RO deployed
158 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
159 # for vca in vca_list:
160 # # status = vca.get('status')
161 # # print(status)
162 # # detailed_status = vca.get('detailed-status')
163 # # print(detailed_status)
164 # # for ro in ro_list:
165 # # print(ro)
166 #
167 # except Exception as e:
168 # self.logger.error('Error writing NS status to db: {}'.format(e))
169
170 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
171 """
172 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
173 :param vnfd: input vnfd
174 :param new_id: overrides vnf id if provided
175 :param additionalParams: Instantiation params for VNFs provided
176 :param nsrId: Id of the NSR
177 :return: copy of vnfd
178 """
179 try:
180 vnfd_RO = deepcopy(vnfd)
181 # remove unused by RO configuration, monitoring, scaling and internal keys
182 vnfd_RO.pop("_id", None)
183 vnfd_RO.pop("_admin", None)
184 vnfd_RO.pop("vnf-configuration", None)
185 vnfd_RO.pop("monitoring-param", None)
186 vnfd_RO.pop("scaling-group-descriptor", None)
187 vnfd_RO.pop("kdu", None)
188 vnfd_RO.pop("k8s-cluster", None)
189 if new_id:
190 vnfd_RO["id"] = new_id
191
192 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
193 for vdu in get_iterable(vnfd_RO, "vdu"):
194 cloud_init_file = None
195 if vdu.get("cloud-init-file"):
196 base_folder = vnfd["_admin"]["storage"]
197 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
198 vdu["cloud-init-file"])
199 with self.fs.file_open(cloud_init_file, "r") as ci_file:
200 cloud_init_content = ci_file.read()
201 vdu.pop("cloud-init-file", None)
202 elif vdu.get("cloud-init"):
203 cloud_init_content = vdu["cloud-init"]
204 else:
205 continue
206
207 env = Environment()
208 ast = env.parse(cloud_init_content)
209 mandatory_vars = meta.find_undeclared_variables(ast)
210 if mandatory_vars:
211 for var in mandatory_vars:
212 if not additionalParams or var not in additionalParams.keys():
213 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
214 "file, must be provided in the instantiation parameters inside the "
215 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
216 template = Template(cloud_init_content)
217 cloud_init_content = template.render(additionalParams or {})
218 vdu["cloud-init"] = cloud_init_content
219
220 return vnfd_RO
221 except FsException as e:
222 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
223 format(vnfd["id"], vdu["id"], cloud_init_file, e))
224 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
225 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
226 format(vnfd["id"], vdu["id"], e))
227
228 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
229 """
230 Creates a RO ns descriptor from OSM ns_instantiate params
231 :param ns_params: OSM instantiate params
232 :return: The RO ns descriptor
233 """
234 vim_2_RO = {}
235 wim_2_RO = {}
236 # TODO feature 1417: Check that no instantiation is set over PDU
237 # check if PDU forces a concrete vim-network-id and add it
238 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
239
240 def vim_account_2_RO(vim_account):
241 if vim_account in vim_2_RO:
242 return vim_2_RO[vim_account]
243
244 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
245 if db_vim["_admin"]["operationalState"] != "ENABLED":
246 raise LcmException("VIM={} is not available. operationalState={}".format(
247 vim_account, db_vim["_admin"]["operationalState"]))
248 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
249 vim_2_RO[vim_account] = RO_vim_id
250 return RO_vim_id
251
252 def wim_account_2_RO(wim_account):
253 if isinstance(wim_account, str):
254 if wim_account in wim_2_RO:
255 return wim_2_RO[wim_account]
256
257 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
258 if db_wim["_admin"]["operationalState"] != "ENABLED":
259 raise LcmException("WIM={} is not available. operationalState={}".format(
260 wim_account, db_wim["_admin"]["operationalState"]))
261 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
262 wim_2_RO[wim_account] = RO_wim_id
263 return RO_wim_id
264 else:
265 return wim_account
266
267 def ip_profile_2_RO(ip_profile):
268 RO_ip_profile = deepcopy((ip_profile))
269 if "dns-server" in RO_ip_profile:
270 if isinstance(RO_ip_profile["dns-server"], list):
271 RO_ip_profile["dns-address"] = []
272 for ds in RO_ip_profile.pop("dns-server"):
273 RO_ip_profile["dns-address"].append(ds['address'])
274 else:
275 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
276 if RO_ip_profile.get("ip-version") == "ipv4":
277 RO_ip_profile["ip-version"] = "IPv4"
278 if RO_ip_profile.get("ip-version") == "ipv6":
279 RO_ip_profile["ip-version"] = "IPv6"
280 if "dhcp-params" in RO_ip_profile:
281 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
282 return RO_ip_profile
283
284 if not ns_params:
285 return None
286 RO_ns_params = {
287 # "name": ns_params["nsName"],
288 # "description": ns_params.get("nsDescription"),
289 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
290 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
291 # "scenario": ns_params["nsdId"],
292 }
293
294 n2vc_key_list = n2vc_key_list or []
295 for vnfd_ref, vnfd in vnfd_dict.items():
296 vdu_needed_access = []
297 mgmt_cp = None
298 if vnfd.get("vnf-configuration"):
299 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
300 if ssh_required and vnfd.get("mgmt-interface"):
301 if vnfd["mgmt-interface"].get("vdu-id"):
302 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
303 elif vnfd["mgmt-interface"].get("cp"):
304 mgmt_cp = vnfd["mgmt-interface"]["cp"]
305
306 for vdu in vnfd.get("vdu", ()):
307 if vdu.get("vdu-configuration"):
308 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
309 if ssh_required:
310 vdu_needed_access.append(vdu["id"])
311 elif mgmt_cp:
312 for vdu_interface in vdu.get("interface"):
313 if vdu_interface.get("external-connection-point-ref") and \
314 vdu_interface["external-connection-point-ref"] == mgmt_cp:
315 vdu_needed_access.append(vdu["id"])
316 mgmt_cp = None
317 break
318
319 if vdu_needed_access:
320 for vnf_member in nsd.get("constituent-vnfd"):
321 if vnf_member["vnfd-id-ref"] != vnfd_ref:
322 continue
323 for vdu in vdu_needed_access:
324 populate_dict(RO_ns_params,
325 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
326 n2vc_key_list)
327
328 if ns_params.get("vduImage"):
329 RO_ns_params["vduImage"] = ns_params["vduImage"]
330
331 if ns_params.get("ssh_keys"):
332 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
333 for vnf_params in get_iterable(ns_params, "vnf"):
334 for constituent_vnfd in nsd["constituent-vnfd"]:
335 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
336 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
337 break
338 else:
339 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
340 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
341 if vnf_params.get("vimAccountId"):
342 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
343 vim_account_2_RO(vnf_params["vimAccountId"]))
344
345 for vdu_params in get_iterable(vnf_params, "vdu"):
346 # TODO feature 1417: check that this VDU exist and it is not a PDU
347 if vdu_params.get("volume"):
348 for volume_params in vdu_params["volume"]:
349 if volume_params.get("vim-volume-id"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
352 volume_params["vim-volume-id"])
353 if vdu_params.get("interface"):
354 for interface_params in vdu_params["interface"]:
355 if interface_params.get("ip-address"):
356 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
357 vdu_params["id"], "interfaces", interface_params["name"],
358 "ip_address"),
359 interface_params["ip-address"])
360 if interface_params.get("mac-address"):
361 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
362 vdu_params["id"], "interfaces", interface_params["name"],
363 "mac_address"),
364 interface_params["mac-address"])
365 if interface_params.get("floating-ip-required"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
367 vdu_params["id"], "interfaces", interface_params["name"],
368 "floating-ip"),
369 interface_params["floating-ip-required"])
370
371 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
372 if internal_vld_params.get("vim-network-name"):
373 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
374 internal_vld_params["name"], "vim-network-name"),
375 internal_vld_params["vim-network-name"])
376 if internal_vld_params.get("vim-network-id"):
377 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
378 internal_vld_params["name"], "vim-network-id"),
379 internal_vld_params["vim-network-id"])
380 if internal_vld_params.get("ip-profile"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
382 internal_vld_params["name"], "ip-profile"),
383 ip_profile_2_RO(internal_vld_params["ip-profile"]))
384 if internal_vld_params.get("provider-network"):
385
386 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
387 internal_vld_params["name"], "provider-network"),
388 internal_vld_params["provider-network"].copy())
389
390 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
391 # look for interface
392 iface_found = False
393 for vdu_descriptor in vnf_descriptor["vdu"]:
394 for vdu_interface in vdu_descriptor["interface"]:
395 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
396 if icp_params.get("ip-address"):
397 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
398 vdu_descriptor["id"], "interfaces",
399 vdu_interface["name"], "ip_address"),
400 icp_params["ip-address"])
401
402 if icp_params.get("mac-address"):
403 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
404 vdu_descriptor["id"], "interfaces",
405 vdu_interface["name"], "mac_address"),
406 icp_params["mac-address"])
407 iface_found = True
408 break
409 if iface_found:
410 break
411 else:
412 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
413 "internal-vld:id-ref={} is not present at vnfd:internal-"
414 "connection-point".format(vnf_params["member-vnf-index"],
415 icp_params["id-ref"]))
416
417 for vld_params in get_iterable(ns_params, "vld"):
418 if "ip-profile" in vld_params:
419 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
420 ip_profile_2_RO(vld_params["ip-profile"]))
421
422 if vld_params.get("provider-network"):
423
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
425 vld_params["provider-network"].copy())
426
427 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
429 wim_account_2_RO(vld_params["wimAccountId"])),
430 if vld_params.get("vim-network-name"):
431 RO_vld_sites = []
432 if isinstance(vld_params["vim-network-name"], dict):
433 for vim_account, vim_net in vld_params["vim-network-name"].items():
434 RO_vld_sites.append({
435 "netmap-use": vim_net,
436 "datacenter": vim_account_2_RO(vim_account)
437 })
438 else: # isinstance str
439 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
440 if RO_vld_sites:
441 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
442
443 if vld_params.get("vim-network-id"):
444 RO_vld_sites = []
445 if isinstance(vld_params["vim-network-id"], dict):
446 for vim_account, vim_net in vld_params["vim-network-id"].items():
447 RO_vld_sites.append({
448 "netmap-use": vim_net,
449 "datacenter": vim_account_2_RO(vim_account)
450 })
451 else: # isinstance str
452 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
453 if RO_vld_sites:
454 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
455 if vld_params.get("ns-net"):
456 if isinstance(vld_params["ns-net"], dict):
457 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
458 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
459 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
460 if "vnfd-connection-point-ref" in vld_params:
461 for cp_params in vld_params["vnfd-connection-point-ref"]:
462 # look for interface
463 for constituent_vnfd in nsd["constituent-vnfd"]:
464 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
465 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
466 break
467 else:
468 raise LcmException(
469 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
470 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
471 match_cp = False
472 for vdu_descriptor in vnf_descriptor["vdu"]:
473 for interface_descriptor in vdu_descriptor["interface"]:
474 if interface_descriptor.get("external-connection-point-ref") == \
475 cp_params["vnfd-connection-point-ref"]:
476 match_cp = True
477 break
478 if match_cp:
479 break
480 else:
481 raise LcmException(
482 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
483 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
484 cp_params["member-vnf-index-ref"],
485 cp_params["vnfd-connection-point-ref"],
486 vnf_descriptor["id"]))
487 if cp_params.get("ip-address"):
488 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
489 vdu_descriptor["id"], "interfaces",
490 interface_descriptor["name"], "ip_address"),
491 cp_params["ip-address"])
492 if cp_params.get("mac-address"):
493 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
494 vdu_descriptor["id"], "interfaces",
495 interface_descriptor["name"], "mac_address"),
496 cp_params["mac-address"])
497 return RO_ns_params
498
499 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
500 # make a copy to do not change
501 vdu_create = copy(vdu_create)
502 vdu_delete = copy(vdu_delete)
503
504 vdurs = db_vnfr.get("vdur")
505 if vdurs is None:
506 vdurs = []
507 vdu_index = len(vdurs)
508 while vdu_index:
509 vdu_index -= 1
510 vdur = vdurs[vdu_index]
511 if vdur.get("pdu-type"):
512 continue
513 vdu_id_ref = vdur["vdu-id-ref"]
514 if vdu_create and vdu_create.get(vdu_id_ref):
515 for index in range(0, vdu_create[vdu_id_ref]):
516 vdur = deepcopy(vdur)
517 vdur["_id"] = str(uuid4())
518 vdur["count-index"] += 1
519 vdurs.insert(vdu_index+1+index, vdur)
520 del vdu_create[vdu_id_ref]
521 if vdu_delete and vdu_delete.get(vdu_id_ref):
522 del vdurs[vdu_index]
523 vdu_delete[vdu_id_ref] -= 1
524 if not vdu_delete[vdu_id_ref]:
525 del vdu_delete[vdu_id_ref]
526 # check all operations are done
527 if vdu_create or vdu_delete:
528 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_create))
530 if vdu_delete:
531 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
532 vdu_delete))
533
534 vnfr_update = {"vdur": vdurs}
535 db_vnfr["vdur"] = vdurs
536 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
537
538 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
539 """
540 Updates database nsr with the RO info for the created vld
541 :param ns_update_nsr: dictionary to be filled with the updated info
542 :param db_nsr: content of db_nsr. This is also modified
543 :param nsr_desc_RO: nsr descriptor from RO
544 :return: Nothing, LcmException is raised on errors
545 """
546
547 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
548 for net_RO in get_iterable(nsr_desc_RO, "nets"):
549 if vld["id"] != net_RO.get("ns_net_osm_id"):
550 continue
551 vld["vim-id"] = net_RO.get("vim_net_id")
552 vld["name"] = net_RO.get("vim_name")
553 vld["status"] = net_RO.get("status")
554 vld["status-detailed"] = net_RO.get("error_msg")
555 ns_update_nsr["vld.{}".format(vld_index)] = vld
556 break
557 else:
558 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
559
560 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
561 """
562 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
563 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
564 :param nsr_desc_RO: nsr descriptor from RO
565 :return: Nothing, LcmException is raised on errors
566 """
567 for vnf_index, db_vnfr in db_vnfrs.items():
568 for vnf_RO in nsr_desc_RO["vnfs"]:
569 if vnf_RO["member_vnf_index"] != vnf_index:
570 continue
571 vnfr_update = {}
572 if vnf_RO.get("ip_address"):
573 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
574 elif not db_vnfr.get("ip-address"):
575 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
576
577 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
578 vdur_RO_count_index = 0
579 if vdur.get("pdu-type"):
580 continue
581 for vdur_RO in get_iterable(vnf_RO, "vms"):
582 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
583 continue
584 if vdur["count-index"] != vdur_RO_count_index:
585 vdur_RO_count_index += 1
586 continue
587 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
588 if vdur_RO.get("ip_address"):
589 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
590 else:
591 vdur["ip-address"] = None
592 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
593 vdur["name"] = vdur_RO.get("vim_name")
594 vdur["status"] = vdur_RO.get("status")
595 vdur["status-detailed"] = vdur_RO.get("error_msg")
596 for ifacer in get_iterable(vdur, "interfaces"):
597 for interface_RO in get_iterable(vdur_RO, "interfaces"):
598 if ifacer["name"] == interface_RO.get("internal_name"):
599 ifacer["ip-address"] = interface_RO.get("ip_address")
600 ifacer["mac-address"] = interface_RO.get("mac_address")
601 break
602 else:
603 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
604 "from VIM info"
605 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
606 vnfr_update["vdur.{}".format(vdu_index)] = vdur
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
610 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
611
612 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
613 for net_RO in get_iterable(nsr_desc_RO, "nets"):
614 if vld["id"] != net_RO.get("vnf_net_osm_id"):
615 continue
616 vld["vim-id"] = net_RO.get("vim_net_id")
617 vld["name"] = net_RO.get("vim_name")
618 vld["status"] = net_RO.get("status")
619 vld["status-detailed"] = net_RO.get("error_msg")
620 vnfr_update["vld.{}".format(vld_index)] = vld
621 break
622 else:
623 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
624 vnf_index, vld["id"]))
625
626 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
627 break
628
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
631
632 def _get_ns_config_info(self, nsr_id):
633 """
634 Generates a mapping between vnf,vdu elements and the N2VC id
635 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
636 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
637 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
638 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
639 """
640 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
641 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
642 mapping = {}
643 ns_config_info = {"osm-config-mapping": mapping}
644 for vca in vca_deployed_list:
645 if not vca["member-vnf-index"]:
646 continue
647 if not vca["vdu_id"]:
648 mapping[vca["member-vnf-index"]] = vca["application"]
649 else:
650 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
651 vca["application"]
652 return ns_config_info
653
654 @staticmethod
655 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
656 """
657 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
658 primitives as verify-ssh-credentials, or config when needed
659 :param desc_primitive_list: information of the descriptor
660 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
661 this element contains a ssh public key
662 :return: The modified list. Can ba an empty list, but always a list
663 """
664 if desc_primitive_list:
665 primitive_list = desc_primitive_list.copy()
666 else:
667 primitive_list = []
668 # look for primitive config, and get the position. None if not present
669 config_position = None
670 for index, primitive in enumerate(primitive_list):
671 if primitive["name"] == "config":
672 config_position = index
673 break
674
675 # for NS, add always a config primitive if not present (bug 874)
676 if not vca_deployed["member-vnf-index"] and config_position is None:
677 primitive_list.insert(0, {"name": "config", "parameter": []})
678 config_position = 0
679 # for VNF/VDU add verify-ssh-credentials after config
680 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
681 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
682 return primitive_list
683
684 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
685 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
686
687 db_nsr_update = {}
688 RO_descriptor_number = 0 # number of descriptors created at RO
689 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
690 start_deploy = time()
691 vdu_flag = False # If any of the VNFDs has VDUs
692 ns_params = db_nslcmop.get("operationParams")
693
694 # deploy RO
695
696 # get vnfds, instantiate at RO
697
698 for c_vnf in nsd.get("constituent-vnfd", ()):
699 member_vnf_index = c_vnf["member-vnf-index"]
700 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
701 if vnfd.get("vdu"):
702 vdu_flag = True
703 vnfd_ref = vnfd["id"]
704 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
705 " RO".format(vnfd_ref, member_vnf_index)
706 # self.logger.debug(logging_text + step)
707 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
708 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
709 RO_descriptor_number += 1
710
711 # look position at deployed.RO.vnfd if not present it will be appended at the end
712 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
713 if vnf_deployed["member-vnf-index"] == member_vnf_index:
714 break
715 else:
716 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
717 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
718
719 # look if present
720 RO_update = {"member-vnf-index": member_vnf_index}
721 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
722 if vnfd_list:
723 RO_update["id"] = vnfd_list[0]["uuid"]
724 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
725 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
726 else:
727 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
728 get("additionalParamsForVnf"), nsr_id)
729 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
730 RO_update["id"] = desc["uuid"]
731 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
732 vnfd_ref, member_vnf_index, desc["uuid"]))
733 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
734 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
735 self.update_db_2("nsrs", nsr_id, db_nsr_update)
736 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
737
738 # create nsd at RO
739 nsd_ref = nsd["id"]
740 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
741 # self.logger.debug(logging_text + step)
742
743 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
744 RO_descriptor_number += 1
745 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
746 if nsd_list:
747 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
748 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
749 nsd_ref, RO_nsd_uuid))
750 else:
751 nsd_RO = deepcopy(nsd)
752 nsd_RO["id"] = RO_osm_nsd_id
753 nsd_RO.pop("_id", None)
754 nsd_RO.pop("_admin", None)
755 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
756 member_vnf_index = c_vnf["member-vnf-index"]
757 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
758 for c_vld in nsd_RO.get("vld", ()):
759 for cp in c_vld.get("vnfd-connection-point-ref", ()):
760 member_vnf_index = cp["member-vnf-index-ref"]
761 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
762
763 desc = await self.RO.create("nsd", descriptor=nsd_RO)
764 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
765 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
766 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
767 self.update_db_2("nsrs", nsr_id, db_nsr_update)
768 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
769
770 # Crate ns at RO
771 # if present use it unless in error status
772 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
773 if RO_nsr_id:
774 try:
775 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
776 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
777 desc = await self.RO.show("ns", RO_nsr_id)
778 except ROclient.ROClientException as e:
779 if e.http_code != HTTPStatus.NOT_FOUND:
780 raise
781 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
782 if RO_nsr_id:
783 ns_status, ns_status_info = self.RO.check_ns_status(desc)
784 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
787 .format(RO_nsr_id)
788 self.logger.debug(logging_text + step)
789 await self.RO.delete("ns", RO_nsr_id)
790 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
791 if not RO_nsr_id:
792 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
793 # self.logger.debug(logging_text + step)
794
795 # check if VIM is creating and wait look if previous tasks in process
796 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
797 if task_dependency:
798 step = "Waiting for related tasks to be completed: {}".format(task_name)
799 self.logger.debug(logging_text + step)
800 await asyncio.wait(task_dependency, timeout=3600)
801 if ns_params.get("vnf"):
802 for vnf in ns_params["vnf"]:
803 if "vimAccountId" in vnf:
804 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
805 vnf["vimAccountId"])
806 if task_dependency:
807 step = "Waiting for related tasks to be completed: {}".format(task_name)
808 self.logger.debug(logging_text + step)
809 await asyncio.wait(task_dependency, timeout=3600)
810
811 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
812
813 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
814
815 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
816 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
817 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
818 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
819 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
820 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
821 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
822 self.update_db_2("nsrs", nsr_id, db_nsr_update)
823 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
824
825 # wait until NS is ready
826 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
827 detailed_status_old = None
828 self.logger.debug(logging_text + step)
829
830 while time() <= start_deploy + self.total_deploy_timeout:
831 desc = await self.RO.show("ns", RO_nsr_id)
832 ns_status, ns_status_info = self.RO.check_ns_status(desc)
833 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
834 if ns_status == "ERROR":
835 raise ROclient.ROClientException(ns_status_info)
836 elif ns_status == "BUILD":
837 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
838 elif ns_status == "ACTIVE":
839 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
840 try:
841 if vdu_flag:
842 self.ns_update_vnfr(db_vnfrs, desc)
843 break
844 except LcmExceptionNoMgmtIP:
845 pass
846 else:
847 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
848 if detailed_status != detailed_status_old:
849 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
852 await asyncio.sleep(5, loop=self.loop)
853 else: # total_deploy_timeout
854 raise ROclient.ROClientException("Timeout waiting ns to be ready")
855
856 step = "Updating NSR"
857 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
858
859 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
860 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self.update_db_2("nsrs", nsr_id, db_nsr_update)
863 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
864
865 step = "Deployed at VIM"
866 self.logger.debug(logging_text + step)
867
868 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
869 """
870 Wait for ip addres at RO, and optionally, insert public key in virtual machine
871 :param logging_text: prefix use for logging
872 :param nsr_id:
873 :param vnfr_id:
874 :param vdu_id:
875 :param vdu_index:
876 :param pub_key: public ssh key to inject, None to skip
877 :param user: user to apply the public ssh key
878 :return: IP address
879 """
880
881 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
882 ro_nsr_id = None
883 ip_address = None
884 nb_tries = 0
885 target_vdu_id = None
886 ro_retries = 0
887
888 while True:
889
890 ro_retries += 1
891 if ro_retries >= 360: # 1 hour
892 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
893
894 await asyncio.sleep(10, loop=self.loop)
895 # wait until NS is deployed at RO
896 if not ro_nsr_id:
897 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
898 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
899 if not ro_nsr_id:
900 continue
901
902 # get ip address
903 if not target_vdu_id:
904 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
905
906 if not vdu_id: # for the VNF case
907 ip_address = db_vnfr.get("ip-address")
908 if not ip_address:
909 continue
910 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
911 else: # VDU case
912 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
913 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
914
915 if not vdur:
916 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
917 vnfr_id, vdu_id, vdu_index
918 ))
919
920 if vdur.get("status") == "ACTIVE":
921 ip_address = vdur.get("ip-address")
922 if not ip_address:
923 continue
924 target_vdu_id = vdur["vdu-id-ref"]
925 elif vdur.get("status") == "ERROR":
926 raise LcmException("Cannot inject ssh-key because target VM is in error state")
927
928 if not target_vdu_id:
929 continue
930
931 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
932
933 # inject public key into machine
934 if pub_key and user:
935 # self.logger.debug(logging_text + "Inserting RO key")
936 try:
937 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
938 result_dict = await self.RO.create_action(
939 item="ns",
940 item_id_name=ro_nsr_id,
941 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
942 )
943 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
944 if not result_dict or not isinstance(result_dict, dict):
945 raise LcmException("Unknown response from RO when injecting key")
946 for result in result_dict.values():
947 if result.get("vim_result") == 200:
948 break
949 else:
950 raise ROclient.ROClientException("error injecting key: {}".format(
951 result.get("description")))
952 break
953 except ROclient.ROClientException as e:
954 if not nb_tries:
955 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
956 format(e, 20*10))
957 nb_tries += 1
958 if nb_tries >= 20:
959 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
960 else:
961 break
962
963 return ip_address
964
965 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
966 """
967 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
968 """
969 my_vca = vca_deployed_list[vca_index]
970 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
971 return
972 timeout = 300
973 while timeout >= 0:
974 for index, vca_deployed in enumerate(vca_deployed_list):
975 if index == vca_index:
976 continue
977 if not my_vca.get("member-vnf-index") or \
978 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
979 if not vca_deployed.get("instantiation"):
980 break # wait
981 if vca_deployed["instantiation"] == "FAILED":
982 raise LcmException("Configuration aborted because dependent charm/s has failed")
983 else:
984 return
985 await asyncio.sleep(10)
986 timeout -= 1
987 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
988 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
989
990 raise LcmException("Configuration aborted because dependent charm/s timeout")
991
992 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
993 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
994 nsr_id = db_nsr["_id"]
995 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
996 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
997 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
998 db_dict = {
999 'collection': 'nsrs',
1000 'filter': {'_id': nsr_id},
1001 'path': db_update_entry
1002 }
1003 step = ""
1004 try:
1005 vnfr_id = None
1006 if db_vnfr:
1007 vnfr_id = db_vnfr["_id"]
1008
1009 namespace = "{nsi}.{ns}".format(
1010 nsi=nsi_id if nsi_id else "",
1011 ns=nsr_id)
1012 if vnfr_id:
1013 namespace += "." + vnfr_id
1014 if vdu_id:
1015 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1016
1017 # Get artifact path
1018 artifact_path = "{}/{}/charms/{}".format(
1019 base_folder["folder"],
1020 base_folder["pkg-dir"],
1021 config_descriptor["juju"]["charm"]
1022 )
1023
1024 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1025 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1026 is_proxy_charm = False
1027
1028 # n2vc_redesign STEP 3.1
1029
1030 # find old ee_id if exists
1031 ee_id = vca_deployed.get("ee_id")
1032
1033 # create or register execution environment in VCA
1034 if is_proxy_charm:
1035 step = "create execution environment"
1036 self.logger.debug(logging_text + step)
1037 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1038 reuse_ee_id=ee_id,
1039 db_dict=db_dict)
1040 else:
1041 step = "Waiting to VM being up and getting IP address"
1042 self.logger.debug(logging_text + step)
1043 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1044 user=None, pub_key=None)
1045 credentials = {"hostname": rw_mgmt_ip}
1046 # get username
1047 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1048 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1049 # merged. Meanwhile let's get username from initial-config-primitive
1050 if not username and config_descriptor.get("initial-config-primitive"):
1051 for config_primitive in config_descriptor["initial-config-primitive"]:
1052 for param in config_primitive.get("parameter", ()):
1053 if param["name"] == "ssh-username":
1054 username = param["value"]
1055 break
1056 if not username:
1057 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1058 "'config-access.ssh-access.default-user'")
1059 credentials["username"] = username
1060 # n2vc_redesign STEP 3.2
1061
1062 step = "register execution environment {}".format(credentials)
1063 self.logger.debug(logging_text + step)
1064 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1065 db_dict=db_dict)
1066
1067 # for compatibility with MON/POL modules, the need model and application name at database
1068 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1069 ee_id_parts = ee_id.split('.')
1070 model_name = ee_id_parts[0]
1071 application_name = ee_id_parts[1]
1072 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1073 db_update_entry + "application": application_name,
1074 db_update_entry + "ee_id": ee_id})
1075
1076 # n2vc_redesign STEP 3.3
1077
1078 step = "Install configuration Software"
1079 # TODO check if already done
1080 self.logger.debug(logging_text + step)
1081 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1082
1083 # if SSH access is required, then get execution environment SSH public
1084 if is_proxy_charm: # if native charm we have waited already to VM be UP
1085 pub_key = None
1086 user = None
1087 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1088 # Needed to inject a ssh key
1089 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1090 step = "Install configuration Software, getting public ssh key"
1091 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1092
1093 step = "Insert public key into VM"
1094 else:
1095 step = "Waiting to VM being up and getting IP address"
1096 self.logger.debug(logging_text + step)
1097
1098 # n2vc_redesign STEP 5.1
1099 # wait for RO (ip-address) Insert pub_key into VM
1100 if vnfr_id:
1101 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1102 user=user, pub_key=pub_key)
1103 else:
1104 rw_mgmt_ip = None # This is for a NS configuration
1105
1106 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1107
1108 # store rw_mgmt_ip in deploy params for later replacement
1109 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1110
1111 # n2vc_redesign STEP 6 Execute initial config primitive
1112 step = 'execute initial config primitive'
1113 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1114
1115 # sort initial config primitives by 'seq'
1116 try:
1117 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1118 except Exception as e:
1119 self.logger.error(logging_text + step + ": " + str(e))
1120
1121 # add config if not present for NS charm
1122 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1123 vca_deployed)
1124 if initial_config_primitive_list:
1125 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1126 for initial_config_primitive in initial_config_primitive_list:
1127 # adding information on the vca_deployed if it is a NS execution environment
1128 if not vca_deployed["member-vnf-index"]:
1129 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1130 # TODO check if already done
1131 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1132
1133 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1134 self.logger.debug(logging_text + step)
1135 await self.n2vc.exec_primitive(
1136 ee_id=ee_id,
1137 primitive_name=initial_config_primitive["name"],
1138 params_dict=primitive_params_,
1139 db_dict=db_dict
1140 )
1141 # TODO register in database that primitive is done
1142
1143 step = "instantiated at VCA"
1144 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "COMPLETED"})
1145 self.logger.debug(logging_text + step)
1146
1147 except Exception as e: # TODO not use Exception but N2VC exception
1148 self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1149 raise Exception("{} {}".format(step, e)) from e
1150 # TODO raise N2VC exception with 'step' extra information
1151
1152 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1153 error_description: str = None, error_detail: str = None):
1154 try:
1155 db_dict = dict()
1156 if ns_state:
1157 db_dict["nsState"] = ns_state
1158 db_dict["currentOperation"] = current_operation
1159 db_dict["currentOperationID"] = current_operation_id
1160 db_dict["errorDescription"] = error_description
1161 db_dict["errorDetail"] = error_detail
1162 self.update_db_2("nsrs", nsr_id, db_dict)
1163 except Exception as e:
1164 self.logger.warn('Error writing NS status: {}'.format(e))
1165
1166 async def instantiate(self, nsr_id, nslcmop_id):
1167 """
1168
1169 :param nsr_id: ns instance to deploy
1170 :param nslcmop_id: operation to run
1171 :return:
1172 """
1173
1174 # Try to lock HA task here
1175 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1176 if not task_is_locked_by_me:
1177 self.logger.debug('instantiate() task is not locked by me')
1178 return
1179
1180 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1181 self.logger.debug(logging_text + "Enter")
1182
1183 # get all needed from database
1184
1185 # database nsrs record
1186 db_nsr = None
1187
1188 # database nslcmops record
1189 db_nslcmop = None
1190
1191 # update operation on nsrs
1192 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1193 "_admin.current-operation": nslcmop_id,
1194 "_admin.operation-type": "instantiate"}
1195 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1196
1197 # update operation on nslcmops
1198 db_nslcmop_update = {}
1199
1200 nslcmop_operation_state = None
1201 db_vnfrs = {} # vnf's info indexed by member-index
1202 # n2vc_info = {}
1203 task_instantiation_list = []
1204 task_instantiation_info = {} # from task to info text
1205 exc = None
1206 try:
1207 # wait for any previous tasks in process
1208 step = "Waiting for previous operations to terminate"
1209 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1210
1211 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1212
1213 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1214 self._write_ns_status(
1215 nsr_id=nsr_id,
1216 ns_state="BUILDING",
1217 current_operation="INSTANTIATING",
1218 current_operation_id=nslcmop_id
1219 )
1220
1221 # read from db: operation
1222 step = "Getting nslcmop={} from db".format(nslcmop_id)
1223 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1224
1225 # read from db: ns
1226 step = "Getting nsr={} from db".format(nsr_id)
1227 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1228 # nsd is replicated into ns (no db read)
1229 nsd = db_nsr["nsd"]
1230 # nsr_name = db_nsr["name"] # TODO short-name??
1231
1232 # read from db: vnf's of this ns
1233 step = "Getting vnfrs from db"
1234 self.logger.debug(logging_text + step)
1235 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1236
1237 # read from db: vnfd's for every vnf
1238 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1239 db_vnfds = {} # every vnfd data indexed by vnf id
1240 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1241
1242 # for each vnf in ns, read vnfd
1243 for vnfr in db_vnfrs_list:
1244 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1245 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1246 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1247 # if we haven't this vnfd, read it from db
1248 if vnfd_id not in db_vnfds:
1249 # read from cb
1250 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1251 self.logger.debug(logging_text + step)
1252 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1253
1254 # store vnfd
1255 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1256 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1257 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1258
1259 # Get or generates the _admin.deployed.VCA list
1260 vca_deployed_list = None
1261 if db_nsr["_admin"].get("deployed"):
1262 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1263 if vca_deployed_list is None:
1264 vca_deployed_list = []
1265 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1266 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1267 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1268 elif isinstance(vca_deployed_list, dict):
1269 # maintain backward compatibility. Change a dict to list at database
1270 vca_deployed_list = list(vca_deployed_list.values())
1271 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1272 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1273
1274 db_nsr_update["detailed-status"] = "creating"
1275 db_nsr_update["operational-status"] = "init"
1276
1277 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1278 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1279 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1280
1281 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1282 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1283 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1284 self.logger.debug(logging_text + "Before deploy_kdus")
1285 # Call to deploy_kdus in case exists the "vdu:kdu" param
1286 task_kdu = asyncio.ensure_future(
1287 self.deploy_kdus(
1288 logging_text=logging_text,
1289 nsr_id=nsr_id,
1290 db_nsr=db_nsr,
1291 db_vnfrs=db_vnfrs,
1292 )
1293 )
1294 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1295 task_instantiation_info[task_kdu] = "Deploy KDUs"
1296 task_instantiation_list.append(task_kdu)
1297 # n2vc_redesign STEP 1 Get VCA public ssh-key
1298 # feature 1429. Add n2vc public key to needed VMs
1299 n2vc_key = self.n2vc.get_public_key()
1300 n2vc_key_list = [n2vc_key]
1301 if self.vca_config.get("public_key"):
1302 n2vc_key_list.append(self.vca_config["public_key"])
1303
1304 # n2vc_redesign STEP 2 Deploy Network Scenario
1305 task_ro = asyncio.ensure_future(
1306 self.instantiate_RO(
1307 logging_text=logging_text,
1308 nsr_id=nsr_id,
1309 nsd=nsd,
1310 db_nsr=db_nsr,
1311 db_nslcmop=db_nslcmop,
1312 db_vnfrs=db_vnfrs,
1313 db_vnfds_ref=db_vnfds_ref,
1314 n2vc_key_list=n2vc_key_list
1315 )
1316 )
1317 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1318 task_instantiation_info[task_ro] = "Deploy at VIM"
1319 task_instantiation_list.append(task_ro)
1320
1321 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1322 step = "Looking for needed vnfd to configure with proxy charm"
1323 self.logger.debug(logging_text + step)
1324
1325 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1326 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1327 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1328 vnfd_id = c_vnf["vnfd-id-ref"]
1329 vnfd = db_vnfds_ref[vnfd_id]
1330 member_vnf_index = str(c_vnf["member-vnf-index"])
1331 db_vnfr = db_vnfrs[member_vnf_index]
1332 base_folder = vnfd["_admin"]["storage"]
1333 vdu_id = None
1334 vdu_index = 0
1335 vdu_name = None
1336 kdu_name = None
1337
1338 # Get additional parameters
1339 deploy_params = {}
1340 if db_vnfr.get("additionalParamsForVnf"):
1341 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1342
1343 descriptor_config = vnfd.get("vnf-configuration")
1344 if descriptor_config and descriptor_config.get("juju"):
1345 self._deploy_n2vc(
1346 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1347 db_nsr=db_nsr,
1348 db_vnfr=db_vnfr,
1349 nslcmop_id=nslcmop_id,
1350 nsr_id=nsr_id,
1351 nsi_id=nsi_id,
1352 vnfd_id=vnfd_id,
1353 vdu_id=vdu_id,
1354 kdu_name=kdu_name,
1355 member_vnf_index=member_vnf_index,
1356 vdu_index=vdu_index,
1357 vdu_name=vdu_name,
1358 deploy_params=deploy_params,
1359 descriptor_config=descriptor_config,
1360 base_folder=base_folder,
1361 task_instantiation_list=task_instantiation_list,
1362 task_instantiation_info=task_instantiation_info
1363 )
1364
1365 # Deploy charms for each VDU that supports one.
1366 for vdud in get_iterable(vnfd, 'vdu'):
1367 vdu_id = vdud["id"]
1368 descriptor_config = vdud.get('vdu-configuration')
1369 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1370 if vdur.get("additionalParams"):
1371 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1372 else:
1373 deploy_params_vdu = deploy_params
1374 if descriptor_config and descriptor_config.get("juju"):
1375 # look for vdu index in the db_vnfr["vdu"] section
1376 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1377 # if vdur["vdu-id-ref"] == vdu_id:
1378 # break
1379 # else:
1380 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1381 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1382 # vdu_name = vdur.get("name")
1383 vdu_name = None
1384 kdu_name = None
1385 for vdu_index in range(int(vdud.get("count", 1))):
1386 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1387 self._deploy_n2vc(
1388 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1389 member_vnf_index, vdu_id, vdu_index),
1390 db_nsr=db_nsr,
1391 db_vnfr=db_vnfr,
1392 nslcmop_id=nslcmop_id,
1393 nsr_id=nsr_id,
1394 nsi_id=nsi_id,
1395 vnfd_id=vnfd_id,
1396 vdu_id=vdu_id,
1397 kdu_name=kdu_name,
1398 member_vnf_index=member_vnf_index,
1399 vdu_index=vdu_index,
1400 vdu_name=vdu_name,
1401 deploy_params=deploy_params_vdu,
1402 descriptor_config=descriptor_config,
1403 base_folder=base_folder,
1404 task_instantiation_list=task_instantiation_list,
1405 task_instantiation_info=task_instantiation_info
1406 )
1407 for kdud in get_iterable(vnfd, 'kdu'):
1408 kdu_name = kdud["name"]
1409 descriptor_config = kdud.get('kdu-configuration')
1410 if descriptor_config and descriptor_config.get("juju"):
1411 vdu_id = None
1412 vdu_index = 0
1413 vdu_name = None
1414 # look for vdu index in the db_vnfr["vdu"] section
1415 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1416 # if vdur["vdu-id-ref"] == vdu_id:
1417 # break
1418 # else:
1419 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1420 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1421 # vdu_name = vdur.get("name")
1422 # vdu_name = None
1423
1424 self._deploy_n2vc(
1425 logging_text=logging_text,
1426 db_nsr=db_nsr,
1427 db_vnfr=db_vnfr,
1428 nslcmop_id=nslcmop_id,
1429 nsr_id=nsr_id,
1430 nsi_id=nsi_id,
1431 vnfd_id=vnfd_id,
1432 vdu_id=vdu_id,
1433 kdu_name=kdu_name,
1434 member_vnf_index=member_vnf_index,
1435 vdu_index=vdu_index,
1436 vdu_name=vdu_name,
1437 deploy_params=deploy_params,
1438 descriptor_config=descriptor_config,
1439 base_folder=base_folder,
1440 task_instantiation_list=task_instantiation_list,
1441 task_instantiation_info=task_instantiation_info
1442 )
1443
1444 # Check if this NS has a charm configuration
1445 descriptor_config = nsd.get("ns-configuration")
1446 if descriptor_config and descriptor_config.get("juju"):
1447 vnfd_id = None
1448 db_vnfr = None
1449 member_vnf_index = None
1450 vdu_id = None
1451 kdu_name = None
1452 vdu_index = 0
1453 vdu_name = None
1454
1455 # Get additional parameters
1456 deploy_params = {}
1457 if db_nsr.get("additionalParamsForNs"):
1458 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1459 base_folder = nsd["_admin"]["storage"]
1460 self._deploy_n2vc(
1461 logging_text=logging_text,
1462 db_nsr=db_nsr,
1463 db_vnfr=db_vnfr,
1464 nslcmop_id=nslcmop_id,
1465 nsr_id=nsr_id,
1466 nsi_id=nsi_id,
1467 vnfd_id=vnfd_id,
1468 vdu_id=vdu_id,
1469 kdu_name=kdu_name,
1470 member_vnf_index=member_vnf_index,
1471 vdu_index=vdu_index,
1472 vdu_name=vdu_name,
1473 deploy_params=deploy_params,
1474 descriptor_config=descriptor_config,
1475 base_folder=base_folder,
1476 task_instantiation_list=task_instantiation_list,
1477 task_instantiation_info=task_instantiation_info
1478 )
1479
1480 # Wait until all tasks of "task_instantiation_list" have been finished
1481
1482 # while time() <= start_deploy + self.total_deploy_timeout:
1483 error_text_list = []
1484 timeout = 3600
1485
1486 # let's begin with all OK
1487 instantiated_ok = True
1488 # let's begin with RO 'running' status (later we can change it)
1489 db_nsr_update["operational-status"] = "running"
1490 # let's begin with VCA 'configured' status (later we can change it)
1491 db_nsr_update["config-status"] = "configured"
1492
1493 if task_instantiation_list:
1494 # wait for all tasks completion
1495 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1496
1497 for task in pending:
1498 instantiated_ok = False
1499 if task == task_ro:
1500 db_nsr_update["operational-status"] = "failed"
1501 else:
1502 db_nsr_update["config-status"] = "failed"
1503 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1504 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1505 for task in done:
1506 if task.cancelled():
1507 instantiated_ok = False
1508 if task == task_ro:
1509 db_nsr_update["operational-status"] = "failed"
1510 else:
1511 db_nsr_update["config-status"] = "failed"
1512 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1513 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1514 else:
1515 exc = task.exception()
1516 if exc:
1517 instantiated_ok = False
1518 if task == task_ro:
1519 db_nsr_update["operational-status"] = "failed"
1520 else:
1521 db_nsr_update["config-status"] = "failed"
1522 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1523 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1524 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1525 else:
1526 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1527 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1528 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1529 else:
1530 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1531
1532 if error_text_list:
1533 error_text = "\n".join(error_text_list)
1534 db_nsr_update["detailed-status"] = error_text
1535 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1536 db_nslcmop_update["detailed-status"] = error_text
1537 db_nslcmop_update["statusEnteredTime"] = time()
1538 else:
1539 # all is done
1540 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1541 db_nslcmop_update["statusEnteredTime"] = time()
1542 db_nslcmop_update["detailed-status"] = "done"
1543 db_nsr_update["detailed-status"] = "done"
1544
1545 except (ROclient.ROClientException, DbException, LcmException) as e:
1546 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1547 exc = e
1548 except asyncio.CancelledError:
1549 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1550 exc = "Operation was cancelled"
1551 except Exception as e:
1552 exc = traceback.format_exc()
1553 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1554 exc_info=True)
1555 finally:
1556 if exc:
1557 if db_nsr:
1558 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1559 db_nsr_update["operational-status"] = "failed"
1560 db_nsr_update["config-status"] = "failed"
1561 if db_nslcmop:
1562 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1563 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1564 db_nslcmop_update["statusEnteredTime"] = time()
1565 try:
1566 if db_nsr:
1567 db_nsr_update["_admin.nslcmop"] = None
1568 db_nsr_update["_admin.current-operation"] = None
1569 db_nsr_update["_admin.operation-type"] = None
1570 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1571
1572 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1573 ns_state = None
1574 error_description = None
1575 error_detail = None
1576 if instantiated_ok:
1577 ns_state = "READY"
1578 else:
1579 ns_state = "BROKEN"
1580 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1581 error_detail = error_text
1582 self._write_ns_status(
1583 nsr_id=nsr_id,
1584 ns_state=ns_state,
1585 current_operation="IDLE",
1586 current_operation_id=None,
1587 error_description=error_description,
1588 error_detail=error_detail
1589 )
1590
1591 if db_nslcmop_update:
1592 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1593 except DbException as e:
1594 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1595 if nslcmop_operation_state:
1596 try:
1597 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1598 "operationState": nslcmop_operation_state},
1599 loop=self.loop)
1600 except Exception as e:
1601 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1602
1603 self.logger.debug(logging_text + "Exit")
1604 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1605
1606 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1607 # Launch kdus if present in the descriptor
1608
1609 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1610
1611 def _get_cluster_id(cluster_id, cluster_type):
1612 nonlocal k8scluster_id_2_uuic
1613 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1614 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1615
1616 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1617 if not db_k8scluster:
1618 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1619 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1620 if not k8s_id:
1621 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1622 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1623 return k8s_id
1624
1625 logging_text += "Deploy kdus: "
1626 try:
1627 db_nsr_update = {"_admin.deployed.K8s": []}
1628 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1629
1630 # Look for all vnfds
1631 pending_tasks = {}
1632 index = 0
1633 for vnfr_data in db_vnfrs.values():
1634 for kdur in get_iterable(vnfr_data, "kdur"):
1635 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1636 kdumodel = None
1637 k8sclustertype = None
1638 error_text = None
1639 cluster_uuid = None
1640 if kdur.get("helm-chart"):
1641 kdumodel = kdur["helm-chart"]
1642 k8sclustertype = "chart"
1643 k8sclustertype_full = "helm-chart"
1644 elif kdur.get("juju-bundle"):
1645 kdumodel = kdur["juju-bundle"]
1646 k8sclustertype = "juju"
1647 k8sclustertype_full = "juju-bundle"
1648 else:
1649 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1650 " running"
1651 try:
1652 if not error_text:
1653 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1654 except LcmException as e:
1655 error_text = str(e)
1656 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1657
1658 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1659 "k8scluster-type": k8sclustertype,
1660 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1661 if error_text:
1662 k8s_instace_info["detailed-status"] = error_text
1663 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1665 if error_text:
1666 continue
1667
1668 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1669 "{}".format(index)}
1670 if k8sclustertype == "chart":
1671 task = asyncio.ensure_future(
1672 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1673 params=desc_params, db_dict=db_dict, timeout=3600)
1674 )
1675 else:
1676 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1677 atomic=True, params=desc_params,
1678 db_dict=db_dict, timeout=600)
1679
1680 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1681 index += 1
1682 if not pending_tasks:
1683 return
1684 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1685 pending_list = list(pending_tasks.keys())
1686 while pending_list:
1687 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1688 return_when=asyncio.FIRST_COMPLETED)
1689 if not done_list: # timeout
1690 for task in pending_list:
1691 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1692 break
1693 for task in done_list:
1694 exc = task.exception()
1695 if exc:
1696 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1697 else:
1698 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1699
1700 except Exception as e:
1701 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1702 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1703 finally:
1704 # TODO Write in data base
1705 if db_nsr_update:
1706 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1707
1708 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1709 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1710 base_folder, task_instantiation_list, task_instantiation_info):
1711 # launch instantiate_N2VC in a asyncio task and register task object
1712 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1713 # if not found, create one entry and update database
1714
1715 # fill db_nsr._admin.deployed.VCA.<index>
1716 vca_index = -1
1717 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1718 if not vca_deployed:
1719 continue
1720 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1721 vca_deployed.get("vdu_id") == vdu_id and \
1722 vca_deployed.get("kdu_name") == kdu_name and \
1723 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1724 break
1725 else:
1726 # not found, create one.
1727 vca_deployed = {
1728 "member-vnf-index": member_vnf_index,
1729 "vdu_id": vdu_id,
1730 "kdu_name": kdu_name,
1731 "vdu_count_index": vdu_index,
1732 "operational-status": "init", # TODO revise
1733 "detailed-status": "", # TODO revise
1734 "step": "initial-deploy", # TODO revise
1735 "vnfd_id": vnfd_id,
1736 "vdu_name": vdu_name,
1737 }
1738 vca_index += 1
1739 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1740 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1741
1742 # Launch task
1743 task_n2vc = asyncio.ensure_future(
1744 self.instantiate_N2VC(
1745 logging_text=logging_text,
1746 vca_index=vca_index,
1747 nsi_id=nsi_id,
1748 db_nsr=db_nsr,
1749 db_vnfr=db_vnfr,
1750 vdu_id=vdu_id,
1751 kdu_name=kdu_name,
1752 vdu_index=vdu_index,
1753 deploy_params=deploy_params,
1754 config_descriptor=descriptor_config,
1755 base_folder=base_folder,
1756 )
1757 )
1758 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1759 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1760 task_instantiation_list.append(task_n2vc)
1761
1762 # Check if this VNFD has a configured terminate action
1763 def _has_terminate_config_primitive(self, vnfd):
1764 vnf_config = vnfd.get("vnf-configuration")
1765 if vnf_config and vnf_config.get("terminate-config-primitive"):
1766 return True
1767 else:
1768 return False
1769
1770 @staticmethod
1771 def _get_terminate_config_primitive_seq_list(vnfd):
1772 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1773 # No need to check for existing primitive twice, already done before
1774 vnf_config = vnfd.get("vnf-configuration")
1775 seq_list = vnf_config.get("terminate-config-primitive")
1776 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1777 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1778 return seq_list_sorted
1779
1780 @staticmethod
1781 def _create_nslcmop(nsr_id, operation, params):
1782 """
1783 Creates a ns-lcm-opp content to be stored at database.
1784 :param nsr_id: internal id of the instance
1785 :param operation: instantiate, terminate, scale, action, ...
1786 :param params: user parameters for the operation
1787 :return: dictionary following SOL005 format
1788 """
1789 # Raise exception if invalid arguments
1790 if not (nsr_id and operation and params):
1791 raise LcmException(
1792 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1793 now = time()
1794 _id = str(uuid4())
1795 nslcmop = {
1796 "id": _id,
1797 "_id": _id,
1798 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1799 "operationState": "PROCESSING",
1800 "statusEnteredTime": now,
1801 "nsInstanceId": nsr_id,
1802 "lcmOperationType": operation,
1803 "startTime": now,
1804 "isAutomaticInvocation": False,
1805 "operationParams": params,
1806 "isCancelPending": False,
1807 "links": {
1808 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1809 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1810 }
1811 }
1812 return nslcmop
1813
1814 def _format_additional_params(self, params):
1815 params = params or {}
1816 for key, value in params.items():
1817 if str(value).startswith("!!yaml "):
1818 params[key] = yaml.safe_load(value[7:])
1819 return params
1820
1821 def _get_terminate_primitive_params(self, seq, vnf_index):
1822 primitive = seq.get('name')
1823 primitive_params = {}
1824 params = {
1825 "member_vnf_index": vnf_index,
1826 "primitive": primitive,
1827 "primitive_params": primitive_params,
1828 }
1829 desc_params = {}
1830 return self._map_primitive_params(seq, params, desc_params)
1831
1832 # sub-operations
1833
1834 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1835 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1836 if (op.get('operationState') == 'COMPLETED'):
1837 # b. Skip sub-operation
1838 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1839 return self.SUBOPERATION_STATUS_SKIP
1840 else:
1841 # c. Reintent executing sub-operation
1842 # The sub-operation exists, and operationState != 'COMPLETED'
1843 # Update operationState = 'PROCESSING' to indicate a reintent.
1844 operationState = 'PROCESSING'
1845 detailed_status = 'In progress'
1846 self._update_suboperation_status(
1847 db_nslcmop, op_index, operationState, detailed_status)
1848 # Return the sub-operation index
1849 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1850 # with arguments extracted from the sub-operation
1851 return op_index
1852
1853 # Find a sub-operation where all keys in a matching dictionary must match
1854 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1855 def _find_suboperation(self, db_nslcmop, match):
1856 if (db_nslcmop and match):
1857 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1858 for i, op in enumerate(op_list):
1859 if all(op.get(k) == match[k] for k in match):
1860 return i
1861 return self.SUBOPERATION_STATUS_NOT_FOUND
1862
1863 # Update status for a sub-operation given its index
1864 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1865 # Update DB for HA tasks
1866 q_filter = {'_id': db_nslcmop['_id']}
1867 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1868 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1869 self.db.set_one("nslcmops",
1870 q_filter=q_filter,
1871 update_dict=update_dict,
1872 fail_on_empty=False)
1873
1874 # Add sub-operation, return the index of the added sub-operation
1875 # Optionally, set operationState, detailed-status, and operationType
1876 # Status and type are currently set for 'scale' sub-operations:
1877 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1878 # 'detailed-status' : status message
1879 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1880 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1881 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1882 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1883 RO_nsr_id=None, RO_scaling_info=None):
1884 if not (db_nslcmop):
1885 return self.SUBOPERATION_STATUS_NOT_FOUND
1886 # Get the "_admin.operations" list, if it exists
1887 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1888 op_list = db_nslcmop_admin.get('operations')
1889 # Create or append to the "_admin.operations" list
1890 new_op = {'member_vnf_index': vnf_index,
1891 'vdu_id': vdu_id,
1892 'vdu_count_index': vdu_count_index,
1893 'primitive': primitive,
1894 'primitive_params': mapped_primitive_params}
1895 if operationState:
1896 new_op['operationState'] = operationState
1897 if detailed_status:
1898 new_op['detailed-status'] = detailed_status
1899 if operationType:
1900 new_op['lcmOperationType'] = operationType
1901 if RO_nsr_id:
1902 new_op['RO_nsr_id'] = RO_nsr_id
1903 if RO_scaling_info:
1904 new_op['RO_scaling_info'] = RO_scaling_info
1905 if not op_list:
1906 # No existing operations, create key 'operations' with current operation as first list element
1907 db_nslcmop_admin.update({'operations': [new_op]})
1908 op_list = db_nslcmop_admin.get('operations')
1909 else:
1910 # Existing operations, append operation to list
1911 op_list.append(new_op)
1912
1913 db_nslcmop_update = {'_admin.operations': op_list}
1914 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1915 op_index = len(op_list) - 1
1916 return op_index
1917
1918 # Helper methods for scale() sub-operations
1919
1920 # pre-scale/post-scale:
1921 # Check for 3 different cases:
1922 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1923 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1924 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1925 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1926 operationType, RO_nsr_id=None, RO_scaling_info=None):
1927 # Find this sub-operation
1928 if (RO_nsr_id and RO_scaling_info):
1929 operationType = 'SCALE-RO'
1930 match = {
1931 'member_vnf_index': vnf_index,
1932 'RO_nsr_id': RO_nsr_id,
1933 'RO_scaling_info': RO_scaling_info,
1934 }
1935 else:
1936 match = {
1937 'member_vnf_index': vnf_index,
1938 'primitive': vnf_config_primitive,
1939 'primitive_params': primitive_params,
1940 'lcmOperationType': operationType
1941 }
1942 op_index = self._find_suboperation(db_nslcmop, match)
1943 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1944 # a. New sub-operation
1945 # The sub-operation does not exist, add it.
1946 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1947 # The following parameters are set to None for all kind of scaling:
1948 vdu_id = None
1949 vdu_count_index = None
1950 vdu_name = None
1951 if (RO_nsr_id and RO_scaling_info):
1952 vnf_config_primitive = None
1953 primitive_params = None
1954 else:
1955 RO_nsr_id = None
1956 RO_scaling_info = None
1957 # Initial status for sub-operation
1958 operationState = 'PROCESSING'
1959 detailed_status = 'In progress'
1960 # Add sub-operation for pre/post-scaling (zero or more operations)
1961 self._add_suboperation(db_nslcmop,
1962 vnf_index,
1963 vdu_id,
1964 vdu_count_index,
1965 vdu_name,
1966 vnf_config_primitive,
1967 primitive_params,
1968 operationState,
1969 detailed_status,
1970 operationType,
1971 RO_nsr_id,
1972 RO_scaling_info)
1973 return self.SUBOPERATION_STATUS_NEW
1974 else:
1975 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1976 # or op_index (operationState != 'COMPLETED')
1977 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1978
1979 # Function to return execution_environment id
1980
1981 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
1982 for vca in vca_deployed_list:
1983 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
1984 return vca["ee_id"]
1985
1986 # Helper methods for terminate()
1987
1988 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1989 """ Create a primitive with params from VNFD
1990 Called from terminate() before deleting instance
1991 Calls action() to execute the primitive """
1992 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1993 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1994 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1995 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1996 db_vnfds = {}
1997 # Loop over VNFRs
1998 for vnfr in db_vnfrs_list:
1999 vnfd_id = vnfr["vnfd-id"]
2000 vnf_index = vnfr["member-vnf-index-ref"]
2001 if vnfd_id not in db_vnfds:
2002 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
2003 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2004 db_vnfds[vnfd_id] = vnfd
2005 vnfd = db_vnfds[vnfd_id]
2006 if not self._has_terminate_config_primitive(vnfd):
2007 continue
2008 # Get the primitive's sorted sequence list
2009 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
2010 for seq in seq_list:
2011 # For each sequence in list, get primitive and call _ns_execute_primitive()
2012 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2013 vnf_index, seq.get("name"))
2014 self.logger.debug(logging_text + step)
2015 # Create the primitive for each sequence, i.e. "primitive": "touch"
2016 primitive = seq.get('name')
2017 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2018 # The following 3 parameters are currently set to None for 'terminate':
2019 # vdu_id, vdu_count_index, vdu_name
2020 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2021 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2022 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2023 # Add sub-operation
2024 self._add_suboperation(db_nslcmop,
2025 nslcmop_id,
2026 vnf_index,
2027 vdu_id,
2028 vdu_count_index,
2029 vdu_name,
2030 primitive,
2031 mapped_primitive_params)
2032 # Sub-operations: Call _ns_execute_primitive() instead of action()
2033 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2034 # nsr_deployed = db_nsr["_admin"]["deployed"]
2035
2036 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
2037 # nsr_id, nslcmop_terminate_action_id)
2038 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
2039 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2040 # if result not in result_ok:
2041 # raise LcmException(
2042 # "terminate_primitive_action for vnf_member_index={}",
2043 # " primitive={} fails with error {}".format(
2044 # vnf_index, seq.get("name"), result_detail))
2045
2046 ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
2047 try:
2048 await self.n2vc.exec_primitive(
2049 ee_id=ee_id,
2050 primitive_name=primitive,
2051 params_dict=mapped_primitive_params
2052 )
2053 except Exception as e:
2054 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2055 raise LcmException(
2056 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2057 .format(vnf_index, seq.get("name"), e),
2058 )
2059
2060 async def terminate(self, nsr_id, nslcmop_id):
2061
2062 # Try to lock HA task here
2063 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2064 if not task_is_locked_by_me:
2065 return
2066
2067 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2068 self.logger.debug(logging_text + "Enter")
2069 db_nsr = None
2070 db_nslcmop = None
2071 exc = None
2072 failed_detail = [] # annotates all failed error messages
2073 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2074 "_admin.current-operation": nslcmop_id,
2075 "_admin.operation-type": "terminate"}
2076 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2077 db_nslcmop_update = {}
2078 nslcmop_operation_state = None
2079 autoremove = False # autoremove after terminated
2080 pending_tasks = []
2081 try:
2082 # wait for any previous tasks in process
2083 step = "Waiting for previous operations to terminate"
2084 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2085
2086 self._write_ns_status(
2087 nsr_id=nsr_id,
2088 ns_state="TERMINATING",
2089 current_operation="TERMINATING",
2090 current_operation_id=nslcmop_id
2091 )
2092
2093 step = "Getting nslcmop={} from db".format(nslcmop_id)
2094 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2095 step = "Getting nsr={} from db".format(nsr_id)
2096 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2097 # nsd = db_nsr["nsd"]
2098 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2099 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2100 return
2101 # #TODO check if VIM is creating and wait
2102 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2103 # Call internal terminate action
2104 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2105
2106 pending_tasks = []
2107
2108 db_nsr_update["operational-status"] = "terminating"
2109 db_nsr_update["config-status"] = "terminating"
2110
2111 # remove NS
2112 try:
2113 step = "delete execution environment"
2114 self.logger.debug(logging_text + step)
2115
2116 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2117 pending_tasks.append(task_delete_ee)
2118 except Exception as e:
2119 msg = "Failed while deleting NS in VCA: {}".format(e)
2120 self.logger.error(msg)
2121 failed_detail.append(msg)
2122
2123 try:
2124 # Delete from k8scluster
2125 step = "delete kdus"
2126 self.logger.debug(logging_text + step)
2127 # print(nsr_deployed)
2128 if nsr_deployed:
2129 for kdu in nsr_deployed.get("K8s", ()):
2130 kdu_instance = kdu.get("kdu-instance")
2131 if not kdu_instance:
2132 continue
2133 if kdu.get("k8scluster-type") == "chart":
2134 task_delete_kdu_instance = asyncio.ensure_future(
2135 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2136 kdu_instance=kdu_instance))
2137 elif kdu.get("k8scluster-type") == "juju":
2138 task_delete_kdu_instance = asyncio.ensure_future(
2139 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2140 kdu_instance=kdu_instance))
2141 else:
2142 self.error(logging_text + "Unknown k8s deployment type {}".
2143 format(kdu.get("k8scluster-type")))
2144 continue
2145 pending_tasks.append(task_delete_kdu_instance)
2146 except LcmException as e:
2147 msg = "Failed while deleting KDUs from NS: {}".format(e)
2148 self.logger.error(msg)
2149 failed_detail.append(msg)
2150
2151 # remove from RO
2152 RO_fail = False
2153
2154 # Delete ns
2155 RO_nsr_id = RO_delete_action = None
2156 if nsr_deployed and nsr_deployed.get("RO"):
2157 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2158 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2159 try:
2160 if RO_nsr_id:
2161 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2162 "Deleting ns from VIM"
2163 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2164 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2165 self.logger.debug(logging_text + step)
2166 desc = await self.RO.delete("ns", RO_nsr_id)
2167 RO_delete_action = desc["action_id"]
2168 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2169 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2170 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2171 if RO_delete_action:
2172 # wait until NS is deleted from VIM
2173 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2174 format(RO_nsr_id, RO_delete_action)
2175 detailed_status_old = None
2176 self.logger.debug(logging_text + step)
2177
2178 delete_timeout = 20 * 60 # 20 minutes
2179 while delete_timeout > 0:
2180 desc = await self.RO.show(
2181 "ns",
2182 item_id_name=RO_nsr_id,
2183 extra_item="action",
2184 extra_item_id=RO_delete_action)
2185 ns_status, ns_status_info = self.RO.check_action_status(desc)
2186 if ns_status == "ERROR":
2187 raise ROclient.ROClientException(ns_status_info)
2188 elif ns_status == "BUILD":
2189 detailed_status = step + "; {}".format(ns_status_info)
2190 elif ns_status == "ACTIVE":
2191 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2192 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2193 break
2194 else:
2195 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2196 if detailed_status != detailed_status_old:
2197 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2198 db_nsr_update["detailed-status"] = detailed_status
2199 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2201 await asyncio.sleep(5, loop=self.loop)
2202 delete_timeout -= 5
2203 else: # delete_timeout <= 0:
2204 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2205
2206 except ROclient.ROClientException as e:
2207 if e.http_code == 404: # not found
2208 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2209 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2210 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2211 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2212 elif e.http_code == 409: # conflict
2213 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2214 self.logger.debug(logging_text + failed_detail[-1])
2215 RO_fail = True
2216 else:
2217 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2218 self.logger.error(logging_text + failed_detail[-1])
2219 RO_fail = True
2220
2221 # Delete nsd
2222 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2223 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2224 try:
2225 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2226 "Deleting nsd from RO"
2227 await self.RO.delete("nsd", RO_nsd_id)
2228 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2229 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2230 except ROclient.ROClientException as e:
2231 if e.http_code == 404: # not found
2232 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2233 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2234 elif e.http_code == 409: # conflict
2235 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2236 self.logger.debug(logging_text + failed_detail[-1])
2237 RO_fail = True
2238 else:
2239 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2240 self.logger.error(logging_text + failed_detail[-1])
2241 RO_fail = True
2242
2243 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2244 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2245 if not vnf_deployed or not vnf_deployed["id"]:
2246 continue
2247 try:
2248 RO_vnfd_id = vnf_deployed["id"]
2249 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2250 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2251 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2252 await self.RO.delete("vnfd", RO_vnfd_id)
2253 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2254 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2255 except ROclient.ROClientException as e:
2256 if e.http_code == 404: # not found
2257 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2258 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2259 elif e.http_code == 409: # conflict
2260 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2261 self.logger.debug(logging_text + failed_detail[-1])
2262 else:
2263 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2264 self.logger.error(logging_text + failed_detail[-1])
2265
2266 if failed_detail:
2267 terminate_ok = False
2268 self.logger.error(logging_text + " ;".join(failed_detail))
2269 db_nsr_update["operational-status"] = "failed"
2270 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2271 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2272 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2273 db_nslcmop_update["statusEnteredTime"] = time()
2274 else:
2275 terminate_ok = True
2276 db_nsr_update["operational-status"] = "terminated"
2277 db_nsr_update["detailed-status"] = "Done"
2278 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2279 db_nslcmop_update["detailed-status"] = "Done"
2280 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2281 db_nslcmop_update["statusEnteredTime"] = time()
2282 if db_nslcmop["operationParams"].get("autoremove"):
2283 autoremove = True
2284
2285 except (ROclient.ROClientException, DbException, LcmException) as e:
2286 self.logger.error(logging_text + "Exit Exception {}".format(e))
2287 exc = e
2288 except asyncio.CancelledError:
2289 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2290 exc = "Operation was cancelled"
2291 except Exception as e:
2292 exc = traceback.format_exc()
2293 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2294 finally:
2295 if exc and db_nslcmop:
2296 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2297 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2298 db_nslcmop_update["statusEnteredTime"] = time()
2299 try:
2300 if db_nslcmop and db_nslcmop_update:
2301 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2302 if db_nsr:
2303 db_nsr_update["_admin.nslcmop"] = None
2304 db_nsr_update["_admin.current-operation"] = None
2305 db_nsr_update["_admin.operation-type"] = None
2306 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2307
2308 if terminate_ok:
2309 ns_state = "IDLE"
2310 error_description = None
2311 error_detail = None
2312 else:
2313 ns_state = "BROKEN"
2314 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2315 error_detail = "; ".join(failed_detail)
2316
2317 self._write_ns_status(
2318 nsr_id=nsr_id,
2319 ns_state=ns_state,
2320 current_operation="IDLE",
2321 current_operation_id=None,
2322 error_description=error_description,
2323 error_detail=error_detail
2324 )
2325
2326 except DbException as e:
2327 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2328 if nslcmop_operation_state:
2329 try:
2330 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2331 "operationState": nslcmop_operation_state,
2332 "autoremove": autoremove},
2333 loop=self.loop)
2334 except Exception as e:
2335 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2336
2337 # wait for pending tasks
2338 done = None
2339 pending = None
2340 if pending_tasks:
2341 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2342 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2343 if not pending:
2344 self.logger.debug(logging_text + 'All tasks finished...')
2345 else:
2346 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2347
2348 self.logger.debug(logging_text + "Exit")
2349 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2350
2351 @staticmethod
2352 def _map_primitive_params(primitive_desc, params, instantiation_params):
2353 """
2354 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2355 The default-value is used. If it is between < > it look for a value at instantiation_params
2356 :param primitive_desc: portion of VNFD/NSD that describes primitive
2357 :param params: Params provided by user
2358 :param instantiation_params: Instantiation params provided by user
2359 :return: a dictionary with the calculated params
2360 """
2361 calculated_params = {}
2362 for parameter in primitive_desc.get("parameter", ()):
2363 param_name = parameter["name"]
2364 if param_name in params:
2365 calculated_params[param_name] = params[param_name]
2366 elif "default-value" in parameter or "value" in parameter:
2367 if "value" in parameter:
2368 calculated_params[param_name] = parameter["value"]
2369 else:
2370 calculated_params[param_name] = parameter["default-value"]
2371 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2372 and calculated_params[param_name].endswith(">"):
2373 if calculated_params[param_name][1:-1] in instantiation_params:
2374 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2375 else:
2376 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2377 format(calculated_params[param_name], primitive_desc["name"]))
2378 else:
2379 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2380 format(param_name, primitive_desc["name"]))
2381
2382 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2383 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2384 width=256)
2385 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2386 calculated_params[param_name] = calculated_params[param_name][7:]
2387
2388 # add always ns_config_info if primitive name is config
2389 if primitive_desc["name"] == "config":
2390 if "ns_config_info" in instantiation_params:
2391 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2392 return calculated_params
2393
2394 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2395 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2396
2397 # find vca_deployed record for this action
2398 try:
2399 for vca_deployed in db_deployed["VCA"]:
2400 if not vca_deployed:
2401 continue
2402 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2403 continue
2404 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2405 continue
2406 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2407 continue
2408 break
2409 else:
2410 # vca_deployed not found
2411 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2412 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2413
2414 # get ee_id
2415 ee_id = vca_deployed.get("ee_id")
2416 if not ee_id:
2417 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2418 "execution environment"
2419 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2420
2421 if primitive == "config":
2422 primitive_params = {"params": primitive_params}
2423
2424 while retries >= 0:
2425 try:
2426 output = await self.n2vc.exec_primitive(
2427 ee_id=ee_id,
2428 primitive_name=primitive,
2429 params_dict=primitive_params
2430 )
2431 # execution was OK
2432 break
2433 except Exception as e:
2434 retries -= 1
2435 if retries >= 0:
2436 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2437 # wait and retry
2438 await asyncio.sleep(retries_interval, loop=self.loop)
2439 else:
2440 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2441
2442 return output, 'OK'
2443
2444 except Exception as e:
2445 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2446
2447 async def action(self, nsr_id, nslcmop_id):
2448
2449 # Try to lock HA task here
2450 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2451 if not task_is_locked_by_me:
2452 return
2453
2454 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2455 self.logger.debug(logging_text + "Enter")
2456 # get all needed from database
2457 db_nsr = None
2458 db_nslcmop = None
2459 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2460 "_admin.current-operation": nslcmop_id,
2461 "_admin.operation-type": "action"}
2462 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2463 db_nslcmop_update = {}
2464 nslcmop_operation_state = None
2465 nslcmop_operation_state_detail = None
2466 exc = None
2467 try:
2468 # wait for any previous tasks in process
2469 step = "Waiting for previous operations to terminate"
2470 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2471
2472 self._write_ns_status(
2473 nsr_id=nsr_id,
2474 ns_state=None,
2475 current_operation="RUNNING ACTION",
2476 current_operation_id=nslcmop_id
2477 )
2478
2479 step = "Getting information from database"
2480 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2481 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2482
2483 nsr_deployed = db_nsr["_admin"].get("deployed")
2484 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2485 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2486 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2487 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2488 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2489
2490 if vnf_index:
2491 step = "Getting vnfr from database"
2492 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2493 step = "Getting vnfd from database"
2494 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2495 else:
2496 if db_nsr.get("nsd"):
2497 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2498 else:
2499 step = "Getting nsd from database"
2500 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2501
2502 # for backward compatibility
2503 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2504 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2505 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2506 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2507
2508 primitive = db_nslcmop["operationParams"]["primitive"]
2509 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2510
2511 # look for primitive
2512 config_primitive_desc = None
2513 if vdu_id:
2514 for vdu in get_iterable(db_vnfd, "vdu"):
2515 if vdu_id == vdu["id"]:
2516 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2517 if config_primitive["name"] == primitive:
2518 config_primitive_desc = config_primitive
2519 break
2520 elif kdu_name:
2521 self.logger.debug(logging_text + "Checking actions in KDUs")
2522 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2523 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2524 if primitive_params:
2525 desc_params.update(primitive_params)
2526 # TODO Check if we will need something at vnf level
2527 index = 0
2528 for kdu in get_iterable(nsr_deployed, "K8s"):
2529 if kdu_name == kdu["kdu-name"]:
2530 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2531 "path": "_admin.deployed.K8s.{}".format(index)}
2532 if primitive == "upgrade":
2533 if desc_params.get("kdu_model"):
2534 kdu_model = desc_params.get("kdu_model")
2535 del desc_params["kdu_model"]
2536 else:
2537 kdu_model = kdu.get("kdu-model")
2538 parts = kdu_model.split(sep=":")
2539 if len(parts) == 2:
2540 kdu_model = parts[0]
2541
2542 if kdu.get("k8scluster-type") == "chart":
2543 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2544 kdu_instance=kdu.get("kdu-instance"),
2545 atomic=True, kdu_model=kdu_model,
2546 params=desc_params, db_dict=db_dict,
2547 timeout=300)
2548 elif kdu.get("k8scluster-type") == "juju":
2549 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2550 kdu_instance=kdu.get("kdu-instance"),
2551 atomic=True, kdu_model=kdu_model,
2552 params=desc_params, db_dict=db_dict,
2553 timeout=300)
2554
2555 else:
2556 msg = "k8scluster-type not defined"
2557 raise LcmException(msg)
2558
2559 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2560 break
2561 elif primitive == "rollback":
2562 if kdu.get("k8scluster-type") == "chart":
2563 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2564 kdu_instance=kdu.get("kdu-instance"),
2565 db_dict=db_dict)
2566 elif kdu.get("k8scluster-type") == "juju":
2567 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2568 kdu_instance=kdu.get("kdu-instance"),
2569 db_dict=db_dict)
2570 else:
2571 msg = "k8scluster-type not defined"
2572 raise LcmException(msg)
2573 break
2574 elif primitive == "status":
2575 if kdu.get("k8scluster-type") == "chart":
2576 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2577 kdu_instance=kdu.get("kdu-instance"))
2578 elif kdu.get("k8scluster-type") == "juju":
2579 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2580 kdu_instance=kdu.get("kdu-instance"))
2581 else:
2582 msg = "k8scluster-type not defined"
2583 raise LcmException(msg)
2584 break
2585 index += 1
2586
2587 else:
2588 raise LcmException("KDU '{}' not found".format(kdu_name))
2589 if output:
2590 db_nslcmop_update["detailed-status"] = output
2591 db_nslcmop_update["operationState"] = 'COMPLETED'
2592 db_nslcmop_update["statusEnteredTime"] = time()
2593 else:
2594 db_nslcmop_update["detailed-status"] = ''
2595 db_nslcmop_update["operationState"] = 'FAILED'
2596 db_nslcmop_update["statusEnteredTime"] = time()
2597 return
2598 elif vnf_index:
2599 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2600 if config_primitive["name"] == primitive:
2601 config_primitive_desc = config_primitive
2602 break
2603 else:
2604 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2605 if config_primitive["name"] == primitive:
2606 config_primitive_desc = config_primitive
2607 break
2608
2609 if not config_primitive_desc:
2610 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2611 format(primitive))
2612
2613 desc_params = {}
2614 if vnf_index:
2615 if db_vnfr.get("additionalParamsForVnf"):
2616 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2617 if vdu_id:
2618 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2619 if vdur.get("additionalParams"):
2620 desc_params = self._format_additional_params(vdur["additionalParams"])
2621 else:
2622 if db_nsr.get("additionalParamsForNs"):
2623 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2624
2625 # TODO check if ns is in a proper status
2626 output, detail = await self._ns_execute_primitive(
2627 db_deployed=nsr_deployed,
2628 member_vnf_index=vnf_index,
2629 vdu_id=vdu_id,
2630 vdu_name=vdu_name,
2631 vdu_count_index=vdu_count_index,
2632 primitive=primitive,
2633 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2634
2635 detailed_status = output
2636 if detail == 'OK':
2637 result = 'COMPLETED'
2638 else:
2639 result = 'FAILED'
2640
2641 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2642 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2643 db_nslcmop_update["statusEnteredTime"] = time()
2644 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2645 return # database update is called inside finally
2646
2647 except (DbException, LcmException) as e:
2648 self.logger.error(logging_text + "Exit Exception {}".format(e))
2649 exc = e
2650 except asyncio.CancelledError:
2651 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2652 exc = "Operation was cancelled"
2653 except Exception as e:
2654 exc = traceback.format_exc()
2655 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2656 finally:
2657 if exc and db_nslcmop:
2658 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2659 "FAILED {}: {}".format(step, exc)
2660 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2661 db_nslcmop_update["statusEnteredTime"] = time()
2662 try:
2663 if db_nslcmop_update:
2664 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2665 if db_nsr:
2666 db_nsr_update["_admin.nslcmop"] = None
2667 db_nsr_update["_admin.operation-type"] = None
2668 db_nsr_update["_admin.nslcmop"] = None
2669 db_nsr_update["_admin.current-operation"] = None
2670 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2671 self._write_ns_status(
2672 nsr_id=nsr_id,
2673 ns_state=None,
2674 current_operation="IDLE",
2675 current_operation_id=None
2676 )
2677 except DbException as e:
2678 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2679 self.logger.debug(logging_text + "Exit")
2680 if nslcmop_operation_state:
2681 try:
2682 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2683 "operationState": nslcmop_operation_state},
2684 loop=self.loop)
2685 except Exception as e:
2686 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2687 self.logger.debug(logging_text + "Exit")
2688 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2689 return nslcmop_operation_state, nslcmop_operation_state_detail
2690
2691 async def scale(self, nsr_id, nslcmop_id):
2692
2693 # Try to lock HA task here
2694 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2695 if not task_is_locked_by_me:
2696 return
2697
2698 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2699 self.logger.debug(logging_text + "Enter")
2700 # get all needed from database
2701 db_nsr = None
2702 db_nslcmop = None
2703 db_nslcmop_update = {}
2704 nslcmop_operation_state = None
2705 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2706 "_admin.current-operation": nslcmop_id,
2707 "_admin.operation-type": "scale"}
2708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2709 exc = None
2710 # in case of error, indicates what part of scale was failed to put nsr at error status
2711 scale_process = None
2712 old_operational_status = ""
2713 old_config_status = ""
2714 vnfr_scaled = False
2715 try:
2716 # wait for any previous tasks in process
2717 step = "Waiting for previous operations to terminate"
2718 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2719
2720 self._write_ns_status(
2721 nsr_id=nsr_id,
2722 ns_state=None,
2723 current_operation="SCALING",
2724 current_operation_id=nslcmop_id
2725 )
2726
2727 step = "Getting nslcmop from database"
2728 self.logger.debug(step + " after having waited for previous tasks to be completed")
2729 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2730 step = "Getting nsr from database"
2731 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2732
2733 old_operational_status = db_nsr["operational-status"]
2734 old_config_status = db_nsr["config-status"]
2735 step = "Parsing scaling parameters"
2736 # self.logger.debug(step)
2737 db_nsr_update["operational-status"] = "scaling"
2738 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2739 nsr_deployed = db_nsr["_admin"].get("deployed")
2740
2741 #######
2742 nsr_deployed = db_nsr["_admin"].get("deployed")
2743 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2744 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2745 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2746 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2747 #######
2748
2749 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2750 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2751 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2752 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2753 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2754
2755 # for backward compatibility
2756 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2757 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2758 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2759 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2760
2761 step = "Getting vnfr from database"
2762 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2763 step = "Getting vnfd from database"
2764 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2765
2766 step = "Getting scaling-group-descriptor"
2767 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2768 if scaling_descriptor["name"] == scaling_group:
2769 break
2770 else:
2771 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2772 "at vnfd:scaling-group-descriptor".format(scaling_group))
2773
2774 # cooldown_time = 0
2775 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2776 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2777 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2778 # break
2779
2780 # TODO check if ns is in a proper status
2781 step = "Sending scale order to VIM"
2782 nb_scale_op = 0
2783 if not db_nsr["_admin"].get("scaling-group"):
2784 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2785 admin_scale_index = 0
2786 else:
2787 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2788 if admin_scale_info["name"] == scaling_group:
2789 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2790 break
2791 else: # not found, set index one plus last element and add new entry with the name
2792 admin_scale_index += 1
2793 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2794 RO_scaling_info = []
2795 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2796 if scaling_type == "SCALE_OUT":
2797 # count if max-instance-count is reached
2798 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2799 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2800 if nb_scale_op >= max_instance_count:
2801 raise LcmException("reached the limit of {} (max-instance-count) "
2802 "scaling-out operations for the "
2803 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2804
2805 nb_scale_op += 1
2806 vdu_scaling_info["scaling_direction"] = "OUT"
2807 vdu_scaling_info["vdu-create"] = {}
2808 for vdu_scale_info in scaling_descriptor["vdu"]:
2809 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2810 "type": "create", "count": vdu_scale_info.get("count", 1)})
2811 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2812
2813 elif scaling_type == "SCALE_IN":
2814 # count if min-instance-count is reached
2815 min_instance_count = 0
2816 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2817 min_instance_count = int(scaling_descriptor["min-instance-count"])
2818 if nb_scale_op <= min_instance_count:
2819 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2820 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2821 nb_scale_op -= 1
2822 vdu_scaling_info["scaling_direction"] = "IN"
2823 vdu_scaling_info["vdu-delete"] = {}
2824 for vdu_scale_info in scaling_descriptor["vdu"]:
2825 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2826 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2827 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2828
2829 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2830 vdu_create = vdu_scaling_info.get("vdu-create")
2831 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2832 if vdu_scaling_info["scaling_direction"] == "IN":
2833 for vdur in reversed(db_vnfr["vdur"]):
2834 if vdu_delete.get(vdur["vdu-id-ref"]):
2835 vdu_delete[vdur["vdu-id-ref"]] -= 1
2836 vdu_scaling_info["vdu"].append({
2837 "name": vdur["name"],
2838 "vdu_id": vdur["vdu-id-ref"],
2839 "interface": []
2840 })
2841 for interface in vdur["interfaces"]:
2842 vdu_scaling_info["vdu"][-1]["interface"].append({
2843 "name": interface["name"],
2844 "ip_address": interface["ip-address"],
2845 "mac_address": interface.get("mac-address"),
2846 })
2847 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2848
2849 # PRE-SCALE BEGIN
2850 step = "Executing pre-scale vnf-config-primitive"
2851 if scaling_descriptor.get("scaling-config-action"):
2852 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2853 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2854 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2855 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2856 step = db_nslcmop_update["detailed-status"] = \
2857 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2858
2859 # look for primitive
2860 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2861 if config_primitive["name"] == vnf_config_primitive:
2862 break
2863 else:
2864 raise LcmException(
2865 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2866 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2867 "primitive".format(scaling_group, config_primitive))
2868
2869 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2870 if db_vnfr.get("additionalParamsForVnf"):
2871 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2872
2873 scale_process = "VCA"
2874 db_nsr_update["config-status"] = "configuring pre-scaling"
2875 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2876
2877 # Pre-scale reintent check: Check if this sub-operation has been executed before
2878 op_index = self._check_or_add_scale_suboperation(
2879 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2880 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2881 # Skip sub-operation
2882 result = 'COMPLETED'
2883 result_detail = 'Done'
2884 self.logger.debug(logging_text +
2885 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2886 vnf_config_primitive, result, result_detail))
2887 else:
2888 if (op_index == self.SUBOPERATION_STATUS_NEW):
2889 # New sub-operation: Get index of this sub-operation
2890 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2891 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2892 format(vnf_config_primitive))
2893 else:
2894 # Reintent: Get registered params for this existing sub-operation
2895 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2896 vnf_index = op.get('member_vnf_index')
2897 vnf_config_primitive = op.get('primitive')
2898 primitive_params = op.get('primitive_params')
2899 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2900 format(vnf_config_primitive))
2901 # Execute the primitive, either with new (first-time) or registered (reintent) args
2902 result, result_detail = await self._ns_execute_primitive(
2903 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2904 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2905 vnf_config_primitive, result, result_detail))
2906 # Update operationState = COMPLETED | FAILED
2907 self._update_suboperation_status(
2908 db_nslcmop, op_index, result, result_detail)
2909
2910 if result == "FAILED":
2911 raise LcmException(result_detail)
2912 db_nsr_update["config-status"] = old_config_status
2913 scale_process = None
2914 # PRE-SCALE END
2915
2916 # SCALE RO - BEGIN
2917 # Should this block be skipped if 'RO_nsr_id' == None ?
2918 # if (RO_nsr_id and RO_scaling_info):
2919 if RO_scaling_info:
2920 scale_process = "RO"
2921 # Scale RO reintent check: Check if this sub-operation has been executed before
2922 op_index = self._check_or_add_scale_suboperation(
2923 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2924 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2925 # Skip sub-operation
2926 result = 'COMPLETED'
2927 result_detail = 'Done'
2928 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2929 result, result_detail))
2930 else:
2931 if (op_index == self.SUBOPERATION_STATUS_NEW):
2932 # New sub-operation: Get index of this sub-operation
2933 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2934 self.logger.debug(logging_text + "New sub-operation RO")
2935 else:
2936 # Reintent: Get registered params for this existing sub-operation
2937 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2938 RO_nsr_id = op.get('RO_nsr_id')
2939 RO_scaling_info = op.get('RO_scaling_info')
2940 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2941 vnf_config_primitive))
2942
2943 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2944 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2945 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2946 # wait until ready
2947 RO_nslcmop_id = RO_desc["instance_action_id"]
2948 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2949
2950 RO_task_done = False
2951 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2952 detailed_status_old = None
2953 self.logger.debug(logging_text + step)
2954
2955 deployment_timeout = 1 * 3600 # One hour
2956 while deployment_timeout > 0:
2957 if not RO_task_done:
2958 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2959 extra_item_id=RO_nslcmop_id)
2960 ns_status, ns_status_info = self.RO.check_action_status(desc)
2961 if ns_status == "ERROR":
2962 raise ROclient.ROClientException(ns_status_info)
2963 elif ns_status == "BUILD":
2964 detailed_status = step + "; {}".format(ns_status_info)
2965 elif ns_status == "ACTIVE":
2966 RO_task_done = True
2967 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2968 self.logger.debug(logging_text + step)
2969 else:
2970 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2971 else:
2972
2973 if ns_status == "ERROR":
2974 raise ROclient.ROClientException(ns_status_info)
2975 elif ns_status == "BUILD":
2976 detailed_status = step + "; {}".format(ns_status_info)
2977 elif ns_status == "ACTIVE":
2978 step = detailed_status = \
2979 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2980 if not vnfr_scaled:
2981 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2982 vnfr_scaled = True
2983 try:
2984 desc = await self.RO.show("ns", RO_nsr_id)
2985 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2986 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2987 break
2988 except LcmExceptionNoMgmtIP:
2989 pass
2990 else:
2991 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2992 if detailed_status != detailed_status_old:
2993 self._update_suboperation_status(
2994 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2995 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2996 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2997
2998 await asyncio.sleep(5, loop=self.loop)
2999 deployment_timeout -= 5
3000 if deployment_timeout <= 0:
3001 self._update_suboperation_status(
3002 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
3003 raise ROclient.ROClientException("Timeout waiting ns to be ready")
3004
3005 # update VDU_SCALING_INFO with the obtained ip_addresses
3006 if vdu_scaling_info["scaling_direction"] == "OUT":
3007 for vdur in reversed(db_vnfr["vdur"]):
3008 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
3009 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
3010 vdu_scaling_info["vdu"].append({
3011 "name": vdur["name"],
3012 "vdu_id": vdur["vdu-id-ref"],
3013 "interface": []
3014 })
3015 for interface in vdur["interfaces"]:
3016 vdu_scaling_info["vdu"][-1]["interface"].append({
3017 "name": interface["name"],
3018 "ip_address": interface["ip-address"],
3019 "mac_address": interface.get("mac-address"),
3020 })
3021 del vdu_scaling_info["vdu-create"]
3022
3023 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
3024 # SCALE RO - END
3025
3026 scale_process = None
3027 if db_nsr_update:
3028 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3029
3030 # POST-SCALE BEGIN
3031 # execute primitive service POST-SCALING
3032 step = "Executing post-scale vnf-config-primitive"
3033 if scaling_descriptor.get("scaling-config-action"):
3034 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3035 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3036 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3037 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3038 step = db_nslcmop_update["detailed-status"] = \
3039 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3040
3041 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3042 if db_vnfr.get("additionalParamsForVnf"):
3043 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3044
3045 # look for primitive
3046 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3047 if config_primitive["name"] == vnf_config_primitive:
3048 break
3049 else:
3050 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3051 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3052 "match any vnf-configuration:config-primitive".format(scaling_group,
3053 config_primitive))
3054 scale_process = "VCA"
3055 db_nsr_update["config-status"] = "configuring post-scaling"
3056 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3057
3058 # Post-scale reintent check: Check if this sub-operation has been executed before
3059 op_index = self._check_or_add_scale_suboperation(
3060 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3061 if op_index == self.SUBOPERATION_STATUS_SKIP:
3062 # Skip sub-operation
3063 result = 'COMPLETED'
3064 result_detail = 'Done'
3065 self.logger.debug(logging_text +
3066 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3067 format(vnf_config_primitive, result, result_detail))
3068 else:
3069 if op_index == self.SUBOPERATION_STATUS_NEW:
3070 # New sub-operation: Get index of this sub-operation
3071 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3072 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3073 format(vnf_config_primitive))
3074 else:
3075 # Reintent: Get registered params for this existing sub-operation
3076 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3077 vnf_index = op.get('member_vnf_index')
3078 vnf_config_primitive = op.get('primitive')
3079 primitive_params = op.get('primitive_params')
3080 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3081 format(vnf_config_primitive))
3082 # Execute the primitive, either with new (first-time) or registered (reintent) args
3083 result, result_detail = await self._ns_execute_primitive(
3084 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3085 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3086 vnf_config_primitive, result, result_detail))
3087 # Update operationState = COMPLETED | FAILED
3088 self._update_suboperation_status(
3089 db_nslcmop, op_index, result, result_detail)
3090
3091 if result == "FAILED":
3092 raise LcmException(result_detail)
3093 db_nsr_update["config-status"] = old_config_status
3094 scale_process = None
3095 # POST-SCALE END
3096
3097 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3098 db_nslcmop_update["statusEnteredTime"] = time()
3099 db_nslcmop_update["detailed-status"] = "done"
3100 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3101 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3102 else old_operational_status
3103 db_nsr_update["config-status"] = old_config_status
3104 return
3105 except (ROclient.ROClientException, DbException, LcmException) as e:
3106 self.logger.error(logging_text + "Exit Exception {}".format(e))
3107 exc = e
3108 except asyncio.CancelledError:
3109 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3110 exc = "Operation was cancelled"
3111 except Exception as e:
3112 exc = traceback.format_exc()
3113 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3114 finally:
3115 if exc:
3116 if db_nslcmop:
3117 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3118 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3119 db_nslcmop_update["statusEnteredTime"] = time()
3120 if db_nsr:
3121 db_nsr_update["operational-status"] = old_operational_status
3122 db_nsr_update["config-status"] = old_config_status
3123 db_nsr_update["detailed-status"] = ""
3124 db_nsr_update["_admin.nslcmop"] = None
3125 if scale_process:
3126 if "VCA" in scale_process:
3127 db_nsr_update["config-status"] = "failed"
3128 if "RO" in scale_process:
3129 db_nsr_update["operational-status"] = "failed"
3130 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3131 exc)
3132 try:
3133 if db_nslcmop and db_nslcmop_update:
3134 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3135 if db_nsr:
3136 db_nsr_update["_admin.current-operation"] = None
3137 db_nsr_update["_admin.operation-type"] = None
3138 db_nsr_update["_admin.nslcmop"] = None
3139 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3140
3141 self._write_ns_status(
3142 nsr_id=nsr_id,
3143 ns_state=None,
3144 current_operation="IDLE",
3145 current_operation_id=None
3146 )
3147
3148 except DbException as e:
3149 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3150 if nslcmop_operation_state:
3151 try:
3152 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3153 "operationState": nslcmop_operation_state},
3154 loop=self.loop)
3155 # if cooldown_time:
3156 # await asyncio.sleep(cooldown_time, loop=self.loop)
3157 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3158 except Exception as e:
3159 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3160 self.logger.debug(logging_text + "Exit")
3161 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")