Fix bug 937 NSI Instantiation
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
28 from n2vc.k8s_helm_conn import K8sHelmConnector
29 from n2vc.k8s_juju_conn import K8sJujuConnector
30
31 from osm_common.dbbase import DbException
32 from osm_common.fsbase import FsException
33
34 from n2vc.n2vc_juju_conn import N2VCJujuConnector
35 from n2vc.exceptions import N2VCException
36
37 from copy import copy, deepcopy
38 from http import HTTPStatus
39 from time import time
40 from uuid import uuid4
41
42 __author__ = "Alfonso Tierno"
43
44
45 def get_iterable(in_dict, in_key):
46 """
47 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
48 :param in_dict: a dictionary
49 :param in_key: the key to look for at in_dict
50 :return: in_dict[in_var] or () if it is None or not present
51 """
52 if not in_dict.get(in_key):
53 return ()
54 return in_dict[in_key]
55
56
57 def populate_dict(target_dict, key_list, value):
58 """
59 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
60 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
61 :param target_dict: dictionary to be changed
62 :param key_list: list of keys to insert at target_dict
63 :param value:
64 :return: None
65 """
66 for key in key_list[0:-1]:
67 if key not in target_dict:
68 target_dict[key] = {}
69 target_dict = target_dict[key]
70 target_dict[key_list[-1]] = value
71
72
73 class NsLcm(LcmBase):
74 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
75 total_deploy_timeout = 2 * 3600 # global timeout for deployment
76 timeout_charm_delete = 10 * 60
77 timeout_primitive = 10 * 60 # timeout for primitive execution
78
79 SUBOPERATION_STATUS_NOT_FOUND = -1
80 SUBOPERATION_STATUS_NEW = -2
81 SUBOPERATION_STATUS_SKIP = -3
82
83 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
84 """
85 Init, Connect to database, filesystem storage, and messaging
86 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
87 :return: None
88 """
89 super().__init__(
90 db=db,
91 msg=msg,
92 fs=fs,
93 logger=logging.getLogger('lcm.ns')
94 )
95
96 self.loop = loop
97 self.lcm_tasks = lcm_tasks
98 self.ro_config = ro_config
99 self.vca_config = vca_config
100 if 'pubkey' in self.vca_config:
101 self.vca_config['public_key'] = self.vca_config['pubkey']
102 if 'cacert' in self.vca_config:
103 self.vca_config['ca_cert'] = self.vca_config['cacert']
104 if 'apiproxy' in self.vca_config:
105 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
106
107 # create N2VC connector
108 self.n2vc = N2VCJujuConnector(
109 db=self.db,
110 fs=self.fs,
111 log=self.logger,
112 loop=self.loop,
113 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
114 username=self.vca_config.get('user', None),
115 vca_config=self.vca_config,
116 on_update_db=self._on_update_n2vc_db,
117 # ca_cert=self.vca_config.get('cacert'),
118 # api_proxy=self.vca_config.get('apiproxy'),
119 )
120
121 self.k8sclusterhelm = K8sHelmConnector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helmpath"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 fs=self.fs,
134 log=self.logger,
135 db=self.db,
136 on_update_db=None,
137 )
138
139 # create RO client
140 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
141
142 def _on_update_n2vc_db(self, table, filter, path, updated_data):
143
144 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
145 .format(table, filter, path, updated_data))
146
147 return
148 # write NS status to database
149 # try:
150 # # nsrs_id = filter.get('_id')
151 # # print(nsrs_id)
152 # # get ns record
153 # nsr = self.db.get_one(table=table, q_filter=filter)
154 # # get VCA deployed list
155 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
156 # # get RO deployed
157 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
158 # for vca in vca_list:
159 # # status = vca.get('status')
160 # # print(status)
161 # # detailed_status = vca.get('detailed-status')
162 # # print(detailed_status)
163 # # for ro in ro_list:
164 # # print(ro)
165 #
166 # except Exception as e:
167 # self.logger.error('Error writing NS status to db: {}'.format(e))
168
169 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
170 """
171 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
172 :param vnfd: input vnfd
173 :param new_id: overrides vnf id if provided
174 :param additionalParams: Instantiation params for VNFs provided
175 :param nsrId: Id of the NSR
176 :return: copy of vnfd
177 """
178 try:
179 vnfd_RO = deepcopy(vnfd)
180 # remove unused by RO configuration, monitoring, scaling and internal keys
181 vnfd_RO.pop("_id", None)
182 vnfd_RO.pop("_admin", None)
183 vnfd_RO.pop("vnf-configuration", None)
184 vnfd_RO.pop("monitoring-param", None)
185 vnfd_RO.pop("scaling-group-descriptor", None)
186 vnfd_RO.pop("kdu", None)
187 vnfd_RO.pop("k8s-cluster", None)
188 if new_id:
189 vnfd_RO["id"] = new_id
190
191 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
192 for vdu in get_iterable(vnfd_RO, "vdu"):
193 cloud_init_file = None
194 if vdu.get("cloud-init-file"):
195 base_folder = vnfd["_admin"]["storage"]
196 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
197 vdu["cloud-init-file"])
198 with self.fs.file_open(cloud_init_file, "r") as ci_file:
199 cloud_init_content = ci_file.read()
200 vdu.pop("cloud-init-file", None)
201 elif vdu.get("cloud-init"):
202 cloud_init_content = vdu["cloud-init"]
203 else:
204 continue
205
206 env = Environment()
207 ast = env.parse(cloud_init_content)
208 mandatory_vars = meta.find_undeclared_variables(ast)
209 if mandatory_vars:
210 for var in mandatory_vars:
211 if not additionalParams or var not in additionalParams.keys():
212 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
213 "file, must be provided in the instantiation parameters inside the "
214 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
215 template = Template(cloud_init_content)
216 cloud_init_content = template.render(additionalParams or {})
217 vdu["cloud-init"] = cloud_init_content
218
219 return vnfd_RO
220 except FsException as e:
221 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
222 format(vnfd["id"], vdu["id"], cloud_init_file, e))
223 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
224 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
225 format(vnfd["id"], vdu["id"], e))
226
227 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
228 """
229 Creates a RO ns descriptor from OSM ns_instantiate params
230 :param ns_params: OSM instantiate params
231 :return: The RO ns descriptor
232 """
233 vim_2_RO = {}
234 wim_2_RO = {}
235 # TODO feature 1417: Check that no instantiation is set over PDU
236 # check if PDU forces a concrete vim-network-id and add it
237 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
238
239 def vim_account_2_RO(vim_account):
240 if vim_account in vim_2_RO:
241 return vim_2_RO[vim_account]
242
243 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
244 if db_vim["_admin"]["operationalState"] != "ENABLED":
245 raise LcmException("VIM={} is not available. operationalState={}".format(
246 vim_account, db_vim["_admin"]["operationalState"]))
247 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
248 vim_2_RO[vim_account] = RO_vim_id
249 return RO_vim_id
250
251 def wim_account_2_RO(wim_account):
252 if isinstance(wim_account, str):
253 if wim_account in wim_2_RO:
254 return wim_2_RO[wim_account]
255
256 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
257 if db_wim["_admin"]["operationalState"] != "ENABLED":
258 raise LcmException("WIM={} is not available. operationalState={}".format(
259 wim_account, db_wim["_admin"]["operationalState"]))
260 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
261 wim_2_RO[wim_account] = RO_wim_id
262 return RO_wim_id
263 else:
264 return wim_account
265
266 def ip_profile_2_RO(ip_profile):
267 RO_ip_profile = deepcopy((ip_profile))
268 if "dns-server" in RO_ip_profile:
269 if isinstance(RO_ip_profile["dns-server"], list):
270 RO_ip_profile["dns-address"] = []
271 for ds in RO_ip_profile.pop("dns-server"):
272 RO_ip_profile["dns-address"].append(ds['address'])
273 else:
274 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
275 if RO_ip_profile.get("ip-version") == "ipv4":
276 RO_ip_profile["ip-version"] = "IPv4"
277 if RO_ip_profile.get("ip-version") == "ipv6":
278 RO_ip_profile["ip-version"] = "IPv6"
279 if "dhcp-params" in RO_ip_profile:
280 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
281 return RO_ip_profile
282
283 if not ns_params:
284 return None
285 RO_ns_params = {
286 # "name": ns_params["nsName"],
287 # "description": ns_params.get("nsDescription"),
288 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
289 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
290 # "scenario": ns_params["nsdId"],
291 }
292
293 n2vc_key_list = n2vc_key_list or []
294 for vnfd_ref, vnfd in vnfd_dict.items():
295 vdu_needed_access = []
296 mgmt_cp = None
297 if vnfd.get("vnf-configuration"):
298 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
299 if ssh_required and vnfd.get("mgmt-interface"):
300 if vnfd["mgmt-interface"].get("vdu-id"):
301 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
302 elif vnfd["mgmt-interface"].get("cp"):
303 mgmt_cp = vnfd["mgmt-interface"]["cp"]
304
305 for vdu in vnfd.get("vdu", ()):
306 if vdu.get("vdu-configuration"):
307 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
308 if ssh_required:
309 vdu_needed_access.append(vdu["id"])
310 elif mgmt_cp:
311 for vdu_interface in vdu.get("interface"):
312 if vdu_interface.get("external-connection-point-ref") and \
313 vdu_interface["external-connection-point-ref"] == mgmt_cp:
314 vdu_needed_access.append(vdu["id"])
315 mgmt_cp = None
316 break
317
318 if vdu_needed_access:
319 for vnf_member in nsd.get("constituent-vnfd"):
320 if vnf_member["vnfd-id-ref"] != vnfd_ref:
321 continue
322 for vdu in vdu_needed_access:
323 populate_dict(RO_ns_params,
324 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
325 n2vc_key_list)
326
327 if ns_params.get("vduImage"):
328 RO_ns_params["vduImage"] = ns_params["vduImage"]
329
330 if ns_params.get("ssh_keys"):
331 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
332 for vnf_params in get_iterable(ns_params, "vnf"):
333 for constituent_vnfd in nsd["constituent-vnfd"]:
334 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
335 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
336 break
337 else:
338 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
339 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
340 if vnf_params.get("vimAccountId"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
342 vim_account_2_RO(vnf_params["vimAccountId"]))
343
344 for vdu_params in get_iterable(vnf_params, "vdu"):
345 # TODO feature 1417: check that this VDU exist and it is not a PDU
346 if vdu_params.get("volume"):
347 for volume_params in vdu_params["volume"]:
348 if volume_params.get("vim-volume-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
351 volume_params["vim-volume-id"])
352 if vdu_params.get("interface"):
353 for interface_params in vdu_params["interface"]:
354 if interface_params.get("ip-address"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "ip_address"),
358 interface_params["ip-address"])
359 if interface_params.get("mac-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_params["id"], "interfaces", interface_params["name"],
362 "mac_address"),
363 interface_params["mac-address"])
364 if interface_params.get("floating-ip-required"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_params["id"], "interfaces", interface_params["name"],
367 "floating-ip"),
368 interface_params["floating-ip-required"])
369
370 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
371 if internal_vld_params.get("vim-network-name"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
373 internal_vld_params["name"], "vim-network-name"),
374 internal_vld_params["vim-network-name"])
375 if internal_vld_params.get("vim-network-id"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-id"),
378 internal_vld_params["vim-network-id"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383 if internal_vld_params.get("provider-network"):
384
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
386 internal_vld_params["name"], "provider-network"),
387 internal_vld_params["provider-network"].copy())
388
389 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
390 # look for interface
391 iface_found = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for vdu_interface in vdu_descriptor["interface"]:
394 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
395 if icp_params.get("ip-address"):
396 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
397 vdu_descriptor["id"], "interfaces",
398 vdu_interface["name"], "ip_address"),
399 icp_params["ip-address"])
400
401 if icp_params.get("mac-address"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_descriptor["id"], "interfaces",
404 vdu_interface["name"], "mac_address"),
405 icp_params["mac-address"])
406 iface_found = True
407 break
408 if iface_found:
409 break
410 else:
411 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
412 "internal-vld:id-ref={} is not present at vnfd:internal-"
413 "connection-point".format(vnf_params["member-vnf-index"],
414 icp_params["id-ref"]))
415
416 for vld_params in get_iterable(ns_params, "vld"):
417 if "ip-profile" in vld_params:
418 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
419 ip_profile_2_RO(vld_params["ip-profile"]))
420
421 if vld_params.get("provider-network"):
422
423 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
424 vld_params["provider-network"].copy())
425
426 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
428 wim_account_2_RO(vld_params["wimAccountId"])),
429 if vld_params.get("vim-network-name"):
430 RO_vld_sites = []
431 if isinstance(vld_params["vim-network-name"], dict):
432 for vim_account, vim_net in vld_params["vim-network-name"].items():
433 RO_vld_sites.append({
434 "netmap-use": vim_net,
435 "datacenter": vim_account_2_RO(vim_account)
436 })
437 else: # isinstance str
438 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
439 if RO_vld_sites:
440 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
441
442 if vld_params.get("vim-network-id"):
443 RO_vld_sites = []
444 if isinstance(vld_params["vim-network-id"], dict):
445 for vim_account, vim_net in vld_params["vim-network-id"].items():
446 RO_vld_sites.append({
447 "netmap-use": vim_net,
448 "datacenter": vim_account_2_RO(vim_account)
449 })
450 else: # isinstance str
451 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
452 if RO_vld_sites:
453 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
454 if vld_params.get("ns-net"):
455 if isinstance(vld_params["ns-net"], dict):
456 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
457 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
458 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
459 if "vnfd-connection-point-ref" in vld_params:
460 for cp_params in vld_params["vnfd-connection-point-ref"]:
461 # look for interface
462 for constituent_vnfd in nsd["constituent-vnfd"]:
463 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
464 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
465 break
466 else:
467 raise LcmException(
468 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
469 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
470 match_cp = False
471 for vdu_descriptor in vnf_descriptor["vdu"]:
472 for interface_descriptor in vdu_descriptor["interface"]:
473 if interface_descriptor.get("external-connection-point-ref") == \
474 cp_params["vnfd-connection-point-ref"]:
475 match_cp = True
476 break
477 if match_cp:
478 break
479 else:
480 raise LcmException(
481 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
482 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
483 cp_params["member-vnf-index-ref"],
484 cp_params["vnfd-connection-point-ref"],
485 vnf_descriptor["id"]))
486 if cp_params.get("ip-address"):
487 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
488 vdu_descriptor["id"], "interfaces",
489 interface_descriptor["name"], "ip_address"),
490 cp_params["ip-address"])
491 if cp_params.get("mac-address"):
492 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
493 vdu_descriptor["id"], "interfaces",
494 interface_descriptor["name"], "mac_address"),
495 cp_params["mac-address"])
496 return RO_ns_params
497
498 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
499 # make a copy to do not change
500 vdu_create = copy(vdu_create)
501 vdu_delete = copy(vdu_delete)
502
503 vdurs = db_vnfr.get("vdur")
504 if vdurs is None:
505 vdurs = []
506 vdu_index = len(vdurs)
507 while vdu_index:
508 vdu_index -= 1
509 vdur = vdurs[vdu_index]
510 if vdur.get("pdu-type"):
511 continue
512 vdu_id_ref = vdur["vdu-id-ref"]
513 if vdu_create and vdu_create.get(vdu_id_ref):
514 for index in range(0, vdu_create[vdu_id_ref]):
515 vdur = deepcopy(vdur)
516 vdur["_id"] = str(uuid4())
517 vdur["count-index"] += 1
518 vdurs.insert(vdu_index+1+index, vdur)
519 del vdu_create[vdu_id_ref]
520 if vdu_delete and vdu_delete.get(vdu_id_ref):
521 del vdurs[vdu_index]
522 vdu_delete[vdu_id_ref] -= 1
523 if not vdu_delete[vdu_id_ref]:
524 del vdu_delete[vdu_id_ref]
525 # check all operations are done
526 if vdu_create or vdu_delete:
527 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
528 vdu_create))
529 if vdu_delete:
530 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
531 vdu_delete))
532
533 vnfr_update = {"vdur": vdurs}
534 db_vnfr["vdur"] = vdurs
535 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
536
537 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
538 """
539 Updates database nsr with the RO info for the created vld
540 :param ns_update_nsr: dictionary to be filled with the updated info
541 :param db_nsr: content of db_nsr. This is also modified
542 :param nsr_desc_RO: nsr descriptor from RO
543 :return: Nothing, LcmException is raised on errors
544 """
545
546 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
547 for net_RO in get_iterable(nsr_desc_RO, "nets"):
548 if vld["id"] != net_RO.get("ns_net_osm_id"):
549 continue
550 vld["vim-id"] = net_RO.get("vim_net_id")
551 vld["name"] = net_RO.get("vim_name")
552 vld["status"] = net_RO.get("status")
553 vld["status-detailed"] = net_RO.get("error_msg")
554 ns_update_nsr["vld.{}".format(vld_index)] = vld
555 break
556 else:
557 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
558
559 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
560 """
561 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
562 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
563 :param nsr_desc_RO: nsr descriptor from RO
564 :return: Nothing, LcmException is raised on errors
565 """
566 for vnf_index, db_vnfr in db_vnfrs.items():
567 for vnf_RO in nsr_desc_RO["vnfs"]:
568 if vnf_RO["member_vnf_index"] != vnf_index:
569 continue
570 vnfr_update = {}
571 if vnf_RO.get("ip_address"):
572 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
573 elif not db_vnfr.get("ip-address"):
574 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
575
576 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
577 vdur_RO_count_index = 0
578 if vdur.get("pdu-type"):
579 continue
580 for vdur_RO in get_iterable(vnf_RO, "vms"):
581 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
582 continue
583 if vdur["count-index"] != vdur_RO_count_index:
584 vdur_RO_count_index += 1
585 continue
586 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
587 if vdur_RO.get("ip_address"):
588 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
589 else:
590 vdur["ip-address"] = None
591 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
592 vdur["name"] = vdur_RO.get("vim_name")
593 vdur["status"] = vdur_RO.get("status")
594 vdur["status-detailed"] = vdur_RO.get("error_msg")
595 for ifacer in get_iterable(vdur, "interfaces"):
596 for interface_RO in get_iterable(vdur_RO, "interfaces"):
597 if ifacer["name"] == interface_RO.get("internal_name"):
598 ifacer["ip-address"] = interface_RO.get("ip_address")
599 ifacer["mac-address"] = interface_RO.get("mac_address")
600 break
601 else:
602 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
603 "from VIM info"
604 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
605 vnfr_update["vdur.{}".format(vdu_index)] = vdur
606 break
607 else:
608 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
609 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
610
611 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
612 for net_RO in get_iterable(nsr_desc_RO, "nets"):
613 if vld["id"] != net_RO.get("vnf_net_osm_id"):
614 continue
615 vld["vim-id"] = net_RO.get("vim_net_id")
616 vld["name"] = net_RO.get("vim_name")
617 vld["status"] = net_RO.get("status")
618 vld["status-detailed"] = net_RO.get("error_msg")
619 vnfr_update["vld.{}".format(vld_index)] = vld
620 break
621 else:
622 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
623 vnf_index, vld["id"]))
624
625 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
626 break
627
628 else:
629 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
630
631 @staticmethod
632 def _get_ns_config_info(vca_deployed_list):
633 """
634 Generates a mapping between vnf,vdu elements and the N2VC id
635 :param vca_deployed_list: List of database _admin.deploy.VCA that contains this list
636 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
637 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
638 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
639 """
640 mapping = {}
641 ns_config_info = {"osm-config-mapping": mapping}
642 for vca in vca_deployed_list:
643 if not vca["member-vnf-index"]:
644 continue
645 if not vca["vdu_id"]:
646 mapping[vca["member-vnf-index"]] = vca["application"]
647 else:
648 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
649 vca["application"]
650 return ns_config_info
651
652 @staticmethod
653 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
654 """
655 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
656 primitives as verify-ssh-credentials, or config when needed
657 :param desc_primitive_list: information of the descriptor
658 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
659 this element contains a ssh public key
660 :return: The modified list. Can ba an empty list, but always a list
661 """
662 if desc_primitive_list:
663 primitive_list = desc_primitive_list.copy()
664 else:
665 primitive_list = []
666 # look for primitive config, and get the position. None if not present
667 config_position = None
668 for index, primitive in enumerate(primitive_list):
669 if primitive["name"] == "config":
670 config_position = index
671 break
672
673 # for NS, add always a config primitive if not present (bug 874)
674 if not vca_deployed["member-vnf-index"] and config_position is None:
675 primitive_list.insert(0, {"name": "config", "parameter": []})
676 config_position = 0
677 # for VNF/VDU add verify-ssh-credentials after config
678 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
679 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
680 return primitive_list
681
682 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
683 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
684
685 db_nsr_update = {}
686 RO_descriptor_number = 0 # number of descriptors created at RO
687 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
688 start_deploy = time()
689 vdu_flag = False # If any of the VNFDs has VDUs
690 ns_params = db_nslcmop.get("operationParams")
691
692 # deploy RO
693
694 # get vnfds, instantiate at RO
695
696 for c_vnf in nsd.get("constituent-vnfd", ()):
697 member_vnf_index = c_vnf["member-vnf-index"]
698 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
699 if vnfd.get("vdu"):
700 vdu_flag = True
701 vnfd_ref = vnfd["id"]
702 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
703 " RO".format(vnfd_ref, member_vnf_index)
704 # self.logger.debug(logging_text + step)
705 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
706 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
707 RO_descriptor_number += 1
708
709 # look position at deployed.RO.vnfd if not present it will be appended at the end
710 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
711 if vnf_deployed["member-vnf-index"] == member_vnf_index:
712 break
713 else:
714 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
715 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
716
717 # look if present
718 RO_update = {"member-vnf-index": member_vnf_index}
719 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
720 if vnfd_list:
721 RO_update["id"] = vnfd_list[0]["uuid"]
722 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
723 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
724 else:
725 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
726 get("additionalParamsForVnf"), nsr_id)
727 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
728 RO_update["id"] = desc["uuid"]
729 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
730 vnfd_ref, member_vnf_index, desc["uuid"]))
731 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
732 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
733 self.update_db_2("nsrs", nsr_id, db_nsr_update)
734 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
735
736 # create nsd at RO
737 nsd_ref = nsd["id"]
738 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
739 # self.logger.debug(logging_text + step)
740
741 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
742 RO_descriptor_number += 1
743 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
744 if nsd_list:
745 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
746 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
747 nsd_ref, RO_nsd_uuid))
748 else:
749 nsd_RO = deepcopy(nsd)
750 nsd_RO["id"] = RO_osm_nsd_id
751 nsd_RO.pop("_id", None)
752 nsd_RO.pop("_admin", None)
753 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
754 member_vnf_index = c_vnf["member-vnf-index"]
755 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
756 for c_vld in nsd_RO.get("vld", ()):
757 for cp in c_vld.get("vnfd-connection-point-ref", ()):
758 member_vnf_index = cp["member-vnf-index-ref"]
759 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
760
761 desc = await self.RO.create("nsd", descriptor=nsd_RO)
762 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
763 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
764 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
765 self.update_db_2("nsrs", nsr_id, db_nsr_update)
766 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
767
768 # Crate ns at RO
769 # if present use it unless in error status
770 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
771 if RO_nsr_id:
772 try:
773 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
774 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
775 desc = await self.RO.show("ns", RO_nsr_id)
776 except ROclient.ROClientException as e:
777 if e.http_code != HTTPStatus.NOT_FOUND:
778 raise
779 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
780 if RO_nsr_id:
781 ns_status, ns_status_info = self.RO.check_ns_status(desc)
782 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
783 if ns_status == "ERROR":
784 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
785 .format(RO_nsr_id)
786 self.logger.debug(logging_text + step)
787 await self.RO.delete("ns", RO_nsr_id)
788 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
789 if not RO_nsr_id:
790 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
791 # self.logger.debug(logging_text + step)
792
793 # check if VIM is creating and wait look if previous tasks in process
794 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
795 if task_dependency:
796 step = "Waiting for related tasks to be completed: {}".format(task_name)
797 self.logger.debug(logging_text + step)
798 await asyncio.wait(task_dependency, timeout=3600)
799 if ns_params.get("vnf"):
800 for vnf in ns_params["vnf"]:
801 if "vimAccountId" in vnf:
802 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
803 vnf["vimAccountId"])
804 if task_dependency:
805 step = "Waiting for related tasks to be completed: {}".format(task_name)
806 self.logger.debug(logging_text + step)
807 await asyncio.wait(task_dependency, timeout=3600)
808
809 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
810
811 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
812
813 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
814 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
815 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
816 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
817 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
818 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
819 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
820 self.update_db_2("nsrs", nsr_id, db_nsr_update)
821 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
822
823 # wait until NS is ready
824 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
825 detailed_status_old = None
826 self.logger.debug(logging_text + step)
827
828 while time() <= start_deploy + self.total_deploy_timeout:
829 desc = await self.RO.show("ns", RO_nsr_id)
830 ns_status, ns_status_info = self.RO.check_ns_status(desc)
831 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
832 if ns_status == "ERROR":
833 raise ROclient.ROClientException(ns_status_info)
834 elif ns_status == "BUILD":
835 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
836 elif ns_status == "ACTIVE":
837 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
838 try:
839 if vdu_flag:
840 self.ns_update_vnfr(db_vnfrs, desc)
841 break
842 except LcmExceptionNoMgmtIP:
843 pass
844 else:
845 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
846 if detailed_status != detailed_status_old:
847 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
848 self.update_db_2("nsrs", nsr_id, db_nsr_update)
849 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
850 await asyncio.sleep(5, loop=self.loop)
851 else: # total_deploy_timeout
852 raise ROclient.ROClientException("Timeout waiting ns to be ready")
853
854 step = "Updating NSR"
855 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
856
857 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
858 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
859 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
862
863 step = "Deployed at VIM"
864 self.logger.debug(logging_text + step)
865
866 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
867 """
868 Wait for ip addres at RO, and optionally, insert public key in virtual machine
869 :param logging_text: prefix use for logging
870 :param nsr_id:
871 :param vnfr_id:
872 :param vdu_id:
873 :param vdu_index:
874 :param pub_key: public ssh key to inject, None to skip
875 :param user: user to apply the public ssh key
876 :return: IP address
877 """
878
879 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
880 ro_nsr_id = None
881 ip_address = None
882 nb_tries = 0
883 target_vdu_id = None
884 ro_retries = 0
885
886 while True:
887
888 ro_retries += 1
889 if ro_retries >= 360: # 1 hour
890 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
891
892 await asyncio.sleep(10, loop=self.loop)
893 # wait until NS is deployed at RO
894 if not ro_nsr_id:
895 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
896 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
897 if not ro_nsr_id:
898 continue
899
900 # get ip address
901 if not target_vdu_id:
902 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
903
904 if not vdu_id: # for the VNF case
905 ip_address = db_vnfr.get("ip-address")
906 if not ip_address:
907 continue
908 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
909 else: # VDU case
910 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
911 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
912
913 if not vdur:
914 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
915 vnfr_id, vdu_id, vdu_index
916 ))
917
918 if vdur.get("status") == "ACTIVE":
919 ip_address = vdur.get("ip-address")
920 if not ip_address:
921 continue
922 target_vdu_id = vdur["vdu-id-ref"]
923 elif vdur.get("status") == "ERROR":
924 raise LcmException("Cannot inject ssh-key because target VM is in error state")
925
926 if not target_vdu_id:
927 continue
928
929 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
930
931 # inject public key into machine
932 if pub_key and user:
933 # self.logger.debug(logging_text + "Inserting RO key")
934 try:
935 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
936 result_dict = await self.RO.create_action(
937 item="ns",
938 item_id_name=ro_nsr_id,
939 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
940 )
941 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
942 if not result_dict or not isinstance(result_dict, dict):
943 raise LcmException("Unknown response from RO when injecting key")
944 for result in result_dict.values():
945 if result.get("vim_result") == 200:
946 break
947 else:
948 raise ROclient.ROClientException("error injecting key: {}".format(
949 result.get("description")))
950 break
951 except ROclient.ROClientException as e:
952 if not nb_tries:
953 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
954 format(e, 20*10))
955 nb_tries += 1
956 if nb_tries >= 20:
957 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
958 else:
959 break
960
961 return ip_address
962
963 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
964 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
965 nsr_id = db_nsr["_id"]
966 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
967 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
968 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
969 db_dict = {
970 'collection': 'nsrs',
971 'filter': {'_id': nsr_id},
972 'path': db_update_entry
973 }
974 logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"],
975 vdu_id, vdu_index)
976
977 step = ""
978 try:
979 vnfr_id = None
980 if db_vnfr:
981 vnfr_id = db_vnfr["_id"]
982
983 namespace = "{nsi}.{ns}".format(
984 nsi=nsi_id if nsi_id else "",
985 ns=nsr_id)
986 if vnfr_id:
987 namespace += "." + vnfr_id
988 if vdu_id:
989 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
990
991 # Get artifact path
992 artifact_path = "{}/{}/charms/{}".format(
993 base_folder["folder"],
994 base_folder["pkg-dir"],
995 config_descriptor["juju"]["charm"]
996 )
997
998 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
999 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1000 is_proxy_charm = False
1001
1002 # n2vc_redesign STEP 3.1
1003
1004 # find old ee_id if exists
1005 ee_id = vca_deployed.get("ee_id")
1006
1007 # create or register execution environment in VCA
1008 if is_proxy_charm:
1009 step = "create execution environment"
1010 self.logger.debug(logging_text + step)
1011 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1012 reuse_ee_id=ee_id,
1013 db_dict=db_dict)
1014 else:
1015 step = "Waiting to VM being up and getting IP address"
1016 self.logger.debug(logging_text + step)
1017 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1018 user=None, pub_key=None)
1019 credentials = {"hostname": rw_mgmt_ip}
1020 # get username
1021 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1022 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1023 # merged. Meanwhile let's get username from initial-config-primitive
1024 if not username and config_descriptor.get("initial-config-primitive"):
1025 for config_primitive in config_descriptor["initial-config-primitive"]:
1026 for param in config_primitive.get("parameter", ()):
1027 if param["name"] == "ssh-username":
1028 username = param["value"]
1029 break
1030 if not username:
1031 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1032 "'config-access.ssh-access.default-user'")
1033 credentials["username"] = username
1034 # n2vc_redesign STEP 3.2
1035
1036 step = "register execution environment {}".format(credentials)
1037 self.logger.debug(logging_text + step)
1038 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1039 db_dict=db_dict)
1040
1041 # for compatibility with MON/POL modules, the need model and application name at database
1042 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1043 ee_id_parts = ee_id.split('.')
1044 model_name = ee_id_parts[0]
1045 application_name = ee_id_parts[1]
1046 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1047 db_update_entry + "application": application_name,
1048 db_update_entry + "ee_id": ee_id})
1049
1050 # n2vc_redesign STEP 3.3
1051
1052 step = "Install configuration Software"
1053 # TODO check if already done
1054 self.logger.debug(logging_text + step)
1055 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1056
1057 # if SSH access is required, then get execution environment SSH public
1058 if is_proxy_charm: # if native charm we have waited already to VM be UP
1059 pub_key = None
1060 user = None
1061 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1062 # Needed to inject a ssh key
1063 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1064 step = "Install configuration Software, getting public ssh key"
1065 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1066
1067 step = "Insert public key into VM"
1068 else:
1069 step = "Waiting to VM being up and getting IP address"
1070 self.logger.debug(logging_text + step)
1071
1072 # n2vc_redesign STEP 5.1
1073 # wait for RO (ip-address) Insert pub_key into VM
1074 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1075 user=user, pub_key=pub_key)
1076
1077 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1078
1079 # store rw_mgmt_ip in deploy params for later replacement
1080 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1081
1082 # n2vc_redesign STEP 6 Execute initial config primitive
1083 step = 'execute initial config primitive'
1084 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1085
1086 # sort initial config primitives by 'seq'
1087 try:
1088 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1089 except Exception as e:
1090 self.logger.error(logging_text + step + ": " + str(e))
1091
1092 # add config if not present for NS charm
1093 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1094 vca_deployed)
1095
1096 for initial_config_primitive in initial_config_primitive_list:
1097 # adding information on the vca_deployed if it is a NS execution environment
1098 if not vca_deployed["member-vnf-index"]:
1099 deploy_params["ns_config_info"] = self._get_ns_config_info(vca_deployed_list)
1100 # TODO check if already done
1101 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1102
1103 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1104 self.logger.debug(logging_text + step)
1105 await self.n2vc.exec_primitive(
1106 ee_id=ee_id,
1107 primitive_name=initial_config_primitive["name"],
1108 params_dict=primitive_params_,
1109 db_dict=db_dict
1110 )
1111 # TODO register in database that primitive is done
1112
1113 step = "instantiated at VCA"
1114 self.logger.debug(logging_text + step)
1115
1116 except Exception as e: # TODO not use Exception but N2VC exception
1117 raise Exception("{} {}".format(step, e)) from e
1118 # TODO raise N2VC exception with 'step' extra information
1119
1120 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1121 error_description: str = None, error_detail: str = None):
1122 try:
1123 db_dict = dict()
1124 if ns_state:
1125 db_dict["nsState"] = ns_state
1126 db_dict["currentOperation"] = current_operation
1127 db_dict["currentOperationID"] = current_operation_id
1128 db_dict["errorDescription"] = error_description
1129 db_dict["errorDetail"] = error_detail
1130 self.update_db_2("nsrs", nsr_id, db_dict)
1131 except Exception as e:
1132 self.logger.warn('Error writing NS status: {}'.format(e))
1133
1134 async def instantiate(self, nsr_id, nslcmop_id):
1135 """
1136
1137 :param nsr_id: ns instance to deploy
1138 :param nslcmop_id: operation to run
1139 :return:
1140 """
1141
1142 # Try to lock HA task here
1143 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1144 if not task_is_locked_by_me:
1145 self.logger.debug('instantiate() task is not locked by me')
1146 return
1147
1148 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1149 self.logger.debug(logging_text + "Enter")
1150
1151 # get all needed from database
1152
1153 # database nsrs record
1154 db_nsr = None
1155
1156 # database nslcmops record
1157 db_nslcmop = None
1158
1159 # update operation on nsrs
1160 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1161 "_admin.current-operation": nslcmop_id,
1162 "_admin.operation-type": "instantiate"}
1163 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1164
1165 # update operation on nslcmops
1166 db_nslcmop_update = {}
1167
1168 nslcmop_operation_state = None
1169 db_vnfrs = {} # vnf's info indexed by member-index
1170 # n2vc_info = {}
1171 task_instantiation_list = []
1172 task_instantiation_info = {} # from task to info text
1173 exc = None
1174 try:
1175 # wait for any previous tasks in process
1176 step = "Waiting for previous operations to terminate"
1177 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1178
1179 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1180
1181 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1182 self._write_ns_status(
1183 nsr_id=nsr_id,
1184 ns_state="BUILDING",
1185 current_operation="INSTANTIATING",
1186 current_operation_id=nslcmop_id
1187 )
1188
1189 # read from db: operation
1190 step = "Getting nslcmop={} from db".format(nslcmop_id)
1191 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1192
1193 # read from db: ns
1194 step = "Getting nsr={} from db".format(nsr_id)
1195 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1196 # nsd is replicated into ns (no db read)
1197 nsd = db_nsr["nsd"]
1198 # nsr_name = db_nsr["name"] # TODO short-name??
1199
1200 # read from db: vnf's of this ns
1201 step = "Getting vnfrs from db"
1202 self.logger.debug(logging_text + step)
1203 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1204
1205 # read from db: vnfd's for every vnf
1206 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1207 db_vnfds = {} # every vnfd data indexed by vnf id
1208 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1209
1210 # for each vnf in ns, read vnfd
1211 for vnfr in db_vnfrs_list:
1212 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1213 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1214 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1215 # if we haven't this vnfd, read it from db
1216 if vnfd_id not in db_vnfds:
1217 # read from cb
1218 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1219 self.logger.debug(logging_text + step)
1220 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1221
1222 # store vnfd
1223 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1224 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1225 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1226
1227 # Get or generates the _admin.deployed.VCA list
1228 vca_deployed_list = None
1229 if db_nsr["_admin"].get("deployed"):
1230 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1231 if vca_deployed_list is None:
1232 vca_deployed_list = []
1233 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1234 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1235 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1236 elif isinstance(vca_deployed_list, dict):
1237 # maintain backward compatibility. Change a dict to list at database
1238 vca_deployed_list = list(vca_deployed_list.values())
1239 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1240 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1241
1242 db_nsr_update["detailed-status"] = "creating"
1243 db_nsr_update["operational-status"] = "init"
1244
1245 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1246 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1247 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1248
1249 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1250 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1251 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1252 self.logger.debug(logging_text + "Before deploy_kdus")
1253 # Call to deploy_kdus in case exists the "vdu:kdu" param
1254 task_kdu = asyncio.ensure_future(
1255 self.deploy_kdus(
1256 logging_text=logging_text,
1257 nsr_id=nsr_id,
1258 db_nsr=db_nsr,
1259 db_vnfrs=db_vnfrs,
1260 )
1261 )
1262 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1263 task_instantiation_info[task_kdu] = "Deploy KDUs"
1264 task_instantiation_list.append(task_kdu)
1265 # n2vc_redesign STEP 1 Get VCA public ssh-key
1266 # feature 1429. Add n2vc public key to needed VMs
1267 n2vc_key = self.n2vc.get_public_key()
1268 n2vc_key_list = [n2vc_key]
1269 if self.vca_config.get("public_key"):
1270 n2vc_key_list.append(self.vca_config["public_key"])
1271
1272 # n2vc_redesign STEP 2 Deploy Network Scenario
1273 task_ro = asyncio.ensure_future(
1274 self.instantiate_RO(
1275 logging_text=logging_text,
1276 nsr_id=nsr_id,
1277 nsd=nsd,
1278 db_nsr=db_nsr,
1279 db_nslcmop=db_nslcmop,
1280 db_vnfrs=db_vnfrs,
1281 db_vnfds_ref=db_vnfds_ref,
1282 n2vc_key_list=n2vc_key_list
1283 )
1284 )
1285 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1286 task_instantiation_info[task_ro] = "Deploy at VIM"
1287 task_instantiation_list.append(task_ro)
1288
1289 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1290 step = "Looking for needed vnfd to configure with proxy charm"
1291 self.logger.debug(logging_text + step)
1292
1293 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1294 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1295 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1296 vnfd_id = c_vnf["vnfd-id-ref"]
1297 vnfd = db_vnfds_ref[vnfd_id]
1298 member_vnf_index = str(c_vnf["member-vnf-index"])
1299 db_vnfr = db_vnfrs[member_vnf_index]
1300 base_folder = vnfd["_admin"]["storage"]
1301 vdu_id = None
1302 vdu_index = 0
1303 vdu_name = None
1304 kdu_name = None
1305
1306 # Get additional parameters
1307 deploy_params = {}
1308 if db_vnfr.get("additionalParamsForVnf"):
1309 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1310
1311 descriptor_config = vnfd.get("vnf-configuration")
1312 if descriptor_config and descriptor_config.get("juju"):
1313 self._deploy_n2vc(
1314 logging_text=logging_text,
1315 db_nsr=db_nsr,
1316 db_vnfr=db_vnfr,
1317 nslcmop_id=nslcmop_id,
1318 nsr_id=nsr_id,
1319 nsi_id=nsi_id,
1320 vnfd_id=vnfd_id,
1321 vdu_id=vdu_id,
1322 kdu_name=kdu_name,
1323 member_vnf_index=member_vnf_index,
1324 vdu_index=vdu_index,
1325 vdu_name=vdu_name,
1326 deploy_params=deploy_params,
1327 descriptor_config=descriptor_config,
1328 base_folder=base_folder,
1329 task_instantiation_list=task_instantiation_list,
1330 task_instantiation_info=task_instantiation_info
1331 )
1332
1333 # Deploy charms for each VDU that supports one.
1334 for vdud in get_iterable(vnfd, 'vdu'):
1335 vdu_id = vdud["id"]
1336 descriptor_config = vdud.get('vdu-configuration')
1337 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1338 if vdur.get("additionalParams"):
1339 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1340 else:
1341 deploy_params_vdu = deploy_params
1342 if descriptor_config and descriptor_config.get("juju"):
1343 # look for vdu index in the db_vnfr["vdu"] section
1344 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1345 # if vdur["vdu-id-ref"] == vdu_id:
1346 # break
1347 # else:
1348 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1349 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1350 # vdu_name = vdur.get("name")
1351 vdu_name = None
1352 kdu_name = None
1353 for vdu_index in range(int(vdud.get("count", 1))):
1354 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1355 self._deploy_n2vc(
1356 logging_text=logging_text,
1357 db_nsr=db_nsr,
1358 db_vnfr=db_vnfr,
1359 nslcmop_id=nslcmop_id,
1360 nsr_id=nsr_id,
1361 nsi_id=nsi_id,
1362 vnfd_id=vnfd_id,
1363 vdu_id=vdu_id,
1364 kdu_name=kdu_name,
1365 member_vnf_index=member_vnf_index,
1366 vdu_index=vdu_index,
1367 vdu_name=vdu_name,
1368 deploy_params=deploy_params_vdu,
1369 descriptor_config=descriptor_config,
1370 base_folder=base_folder,
1371 task_instantiation_list=task_instantiation_list,
1372 task_instantiation_info=task_instantiation_info
1373 )
1374 for kdud in get_iterable(vnfd, 'kdu'):
1375 kdu_name = kdud["name"]
1376 descriptor_config = kdud.get('kdu-configuration')
1377 if descriptor_config and descriptor_config.get("juju"):
1378 vdu_id = None
1379 vdu_index = 0
1380 vdu_name = None
1381 # look for vdu index in the db_vnfr["vdu"] section
1382 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1383 # if vdur["vdu-id-ref"] == vdu_id:
1384 # break
1385 # else:
1386 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1387 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1388 # vdu_name = vdur.get("name")
1389 # vdu_name = None
1390
1391 self._deploy_n2vc(
1392 logging_text=logging_text,
1393 db_nsr=db_nsr,
1394 db_vnfr=db_vnfr,
1395 nslcmop_id=nslcmop_id,
1396 nsr_id=nsr_id,
1397 nsi_id=nsi_id,
1398 vnfd_id=vnfd_id,
1399 vdu_id=vdu_id,
1400 kdu_name=kdu_name,
1401 member_vnf_index=member_vnf_index,
1402 vdu_index=vdu_index,
1403 vdu_name=vdu_name,
1404 deploy_params=deploy_params,
1405 descriptor_config=descriptor_config,
1406 base_folder=base_folder,
1407 task_instantiation_list=task_instantiation_list,
1408 task_instantiation_info=task_instantiation_info
1409 )
1410
1411 # Check if this NS has a charm configuration
1412 descriptor_config = nsd.get("ns-configuration")
1413 if descriptor_config and descriptor_config.get("juju"):
1414 vnfd_id = None
1415 db_vnfr = None
1416 member_vnf_index = None
1417 vdu_id = None
1418 kdu_name = None
1419 vdu_index = 0
1420 vdu_name = None
1421
1422 # Get additional parameters
1423 deploy_params = {}
1424 if db_nsr.get("additionalParamsForNs"):
1425 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1426 base_folder = nsd["_admin"]["storage"]
1427 self._deploy_n2vc(
1428 logging_text=logging_text,
1429 db_nsr=db_nsr,
1430 db_vnfr=db_vnfr,
1431 nslcmop_id=nslcmop_id,
1432 nsr_id=nsr_id,
1433 nsi_id=nsi_id,
1434 vnfd_id=vnfd_id,
1435 vdu_id=vdu_id,
1436 kdu_name=kdu_name,
1437 member_vnf_index=member_vnf_index,
1438 vdu_index=vdu_index,
1439 vdu_name=vdu_name,
1440 deploy_params=deploy_params,
1441 descriptor_config=descriptor_config,
1442 base_folder=base_folder,
1443 task_instantiation_list=task_instantiation_list,
1444 task_instantiation_info=task_instantiation_info
1445 )
1446
1447 # Wait until all tasks of "task_instantiation_list" have been finished
1448
1449 # while time() <= start_deploy + self.total_deploy_timeout:
1450 error_text_list = []
1451 timeout = 3600
1452
1453 # let's begin with all OK
1454 instantiated_ok = True
1455 # let's begin with RO 'running' status (later we can change it)
1456 db_nsr_update["operational-status"] = "running"
1457 # let's begin with VCA 'configured' status (later we can change it)
1458 db_nsr_update["config-status"] = "configured"
1459
1460 if task_instantiation_list:
1461 # wait for all tasks completion
1462 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1463
1464 for task in pending:
1465 instantiated_ok = False
1466 if task == task_ro:
1467 db_nsr_update["operational-status"] = "failed"
1468 else:
1469 db_nsr_update["config-status"] = "failed"
1470 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1471 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1472 for task in done:
1473 if task.cancelled():
1474 instantiated_ok = False
1475 if task == task_ro:
1476 db_nsr_update["operational-status"] = "failed"
1477 else:
1478 db_nsr_update["config-status"] = "failed"
1479 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1480 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1481 else:
1482 exc = task.exception()
1483 if exc:
1484 instantiated_ok = False
1485 if task == task_ro:
1486 db_nsr_update["operational-status"] = "failed"
1487 else:
1488 db_nsr_update["config-status"] = "failed"
1489 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1490 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1491 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1492 else:
1493 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1494 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1495 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1496 else:
1497 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1498
1499 if error_text_list:
1500 error_text = "\n".join(error_text_list)
1501 db_nsr_update["detailed-status"] = error_text
1502 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1503 db_nslcmop_update["detailed-status"] = error_text
1504 db_nslcmop_update["statusEnteredTime"] = time()
1505 else:
1506 # all is done
1507 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1508 db_nslcmop_update["statusEnteredTime"] = time()
1509 db_nslcmop_update["detailed-status"] = "done"
1510 db_nsr_update["detailed-status"] = "done"
1511
1512 except (ROclient.ROClientException, DbException, LcmException) as e:
1513 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1514 exc = e
1515 except asyncio.CancelledError:
1516 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1517 exc = "Operation was cancelled"
1518 except Exception as e:
1519 exc = traceback.format_exc()
1520 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1521 exc_info=True)
1522 finally:
1523 if exc:
1524 if db_nsr:
1525 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1526 db_nsr_update["operational-status"] = "failed"
1527 db_nsr_update["config-status"] = "failed"
1528 if db_nslcmop:
1529 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1530 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1531 db_nslcmop_update["statusEnteredTime"] = time()
1532 try:
1533 if db_nsr:
1534 db_nsr_update["_admin.nslcmop"] = None
1535 db_nsr_update["_admin.current-operation"] = None
1536 db_nsr_update["_admin.operation-type"] = None
1537 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1538
1539 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1540 ns_state = None
1541 error_description = None
1542 error_detail = None
1543 if instantiated_ok:
1544 ns_state = "READY"
1545 else:
1546 ns_state = "BROKEN"
1547 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1548 error_detail = error_text
1549 self._write_ns_status(
1550 nsr_id=nsr_id,
1551 ns_state=ns_state,
1552 current_operation="IDLE",
1553 current_operation_id=None,
1554 error_description=error_description,
1555 error_detail=error_detail
1556 )
1557
1558 if db_nslcmop_update:
1559 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1560 except DbException as e:
1561 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1562 if nslcmop_operation_state:
1563 try:
1564 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1565 "operationState": nslcmop_operation_state},
1566 loop=self.loop)
1567 except Exception as e:
1568 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1569
1570 self.logger.debug(logging_text + "Exit")
1571 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1572
1573 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1574 # Launch kdus if present in the descriptor
1575
1576 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1577
1578 def _get_cluster_id(cluster_id, cluster_type):
1579 nonlocal k8scluster_id_2_uuic
1580 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1581 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1582
1583 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1584 if not db_k8scluster:
1585 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1586 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1587 if not k8s_id:
1588 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1589 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1590 return k8s_id
1591
1592 logging_text += "Deploy kdus: "
1593 try:
1594 db_nsr_update = {"_admin.deployed.K8s": []}
1595 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1596
1597 # Look for all vnfds
1598 pending_tasks = {}
1599 index = 0
1600 for vnfr_data in db_vnfrs.values():
1601 for kdur in get_iterable(vnfr_data, "kdur"):
1602 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1603 kdumodel = None
1604 k8sclustertype = None
1605 error_text = None
1606 cluster_uuid = None
1607 if kdur.get("helm-chart"):
1608 kdumodel = kdur["helm-chart"]
1609 k8sclustertype = "chart"
1610 k8sclustertype_full = "helm-chart"
1611 elif kdur.get("juju-bundle"):
1612 kdumodel = kdur["juju-bundle"]
1613 k8sclustertype = "juju"
1614 k8sclustertype_full = "juju-bundle"
1615 else:
1616 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1617 " running"
1618 try:
1619 if not error_text:
1620 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1621 except LcmException as e:
1622 error_text = str(e)
1623 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1624
1625 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1626 "k8scluster-type": k8sclustertype,
1627 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1628 if error_text:
1629 k8s_instace_info["detailed-status"] = error_text
1630 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1631 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1632 if error_text:
1633 continue
1634
1635 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1636 "{}".format(index)}
1637 if k8sclustertype == "chart":
1638 task = asyncio.ensure_future(
1639 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1640 params=desc_params, db_dict=db_dict, timeout=3600)
1641 )
1642 else:
1643 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1644 atomic=True, params=desc_params,
1645 db_dict=db_dict, timeout=600)
1646
1647 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1648 index += 1
1649 if not pending_tasks:
1650 return
1651 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1652 pending_list = list(pending_tasks.keys())
1653 while pending_list:
1654 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1655 return_when=asyncio.FIRST_COMPLETED)
1656 if not done_list: # timeout
1657 for task in pending_list:
1658 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1659 break
1660 for task in done_list:
1661 exc = task.exception()
1662 if exc:
1663 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1664 else:
1665 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1666
1667 except Exception as e:
1668 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1669 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1670 finally:
1671 # TODO Write in data base
1672 if db_nsr_update:
1673 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1674
1675 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1676 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1677 base_folder, task_instantiation_list, task_instantiation_info):
1678 # launch instantiate_N2VC in a asyncio task and register task object
1679 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1680 # if not found, create one entry and update database
1681
1682 # fill db_nsr._admin.deployed.VCA.<index>
1683 vca_index = -1
1684 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1685 if not vca_deployed:
1686 continue
1687 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1688 vca_deployed.get("vdu_id") == vdu_id and \
1689 vca_deployed.get("kdu_name") == kdu_name and \
1690 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1691 break
1692 else:
1693 # not found, create one.
1694 vca_deployed = {
1695 "member-vnf-index": member_vnf_index,
1696 "vdu_id": vdu_id,
1697 "kdu_name": kdu_name,
1698 "vdu_count_index": vdu_index,
1699 "operational-status": "init", # TODO revise
1700 "detailed-status": "", # TODO revise
1701 "step": "initial-deploy", # TODO revise
1702 "vnfd_id": vnfd_id,
1703 "vdu_name": vdu_name,
1704 }
1705 vca_index += 1
1706 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1707 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1708
1709 # Launch task
1710 task_n2vc = asyncio.ensure_future(
1711 self.instantiate_N2VC(
1712 logging_text=logging_text,
1713 vca_index=vca_index,
1714 nsi_id=nsi_id,
1715 db_nsr=db_nsr,
1716 db_vnfr=db_vnfr,
1717 vdu_id=vdu_id,
1718 kdu_name=kdu_name,
1719 vdu_index=vdu_index,
1720 deploy_params=deploy_params,
1721 config_descriptor=descriptor_config,
1722 base_folder=base_folder,
1723 )
1724 )
1725 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1726 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1727 task_instantiation_list.append(task_n2vc)
1728
1729 # Check if this VNFD has a configured terminate action
1730 def _has_terminate_config_primitive(self, vnfd):
1731 vnf_config = vnfd.get("vnf-configuration")
1732 if vnf_config and vnf_config.get("terminate-config-primitive"):
1733 return True
1734 else:
1735 return False
1736
1737 @staticmethod
1738 def _get_terminate_config_primitive_seq_list(vnfd):
1739 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1740 # No need to check for existing primitive twice, already done before
1741 vnf_config = vnfd.get("vnf-configuration")
1742 seq_list = vnf_config.get("terminate-config-primitive")
1743 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1744 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1745 return seq_list_sorted
1746
1747 @staticmethod
1748 def _create_nslcmop(nsr_id, operation, params):
1749 """
1750 Creates a ns-lcm-opp content to be stored at database.
1751 :param nsr_id: internal id of the instance
1752 :param operation: instantiate, terminate, scale, action, ...
1753 :param params: user parameters for the operation
1754 :return: dictionary following SOL005 format
1755 """
1756 # Raise exception if invalid arguments
1757 if not (nsr_id and operation and params):
1758 raise LcmException(
1759 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1760 now = time()
1761 _id = str(uuid4())
1762 nslcmop = {
1763 "id": _id,
1764 "_id": _id,
1765 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1766 "operationState": "PROCESSING",
1767 "statusEnteredTime": now,
1768 "nsInstanceId": nsr_id,
1769 "lcmOperationType": operation,
1770 "startTime": now,
1771 "isAutomaticInvocation": False,
1772 "operationParams": params,
1773 "isCancelPending": False,
1774 "links": {
1775 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1776 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1777 }
1778 }
1779 return nslcmop
1780
1781 def _format_additional_params(self, params):
1782 params = params or {}
1783 for key, value in params.items():
1784 if str(value).startswith("!!yaml "):
1785 params[key] = yaml.safe_load(value[7:])
1786 return params
1787
1788 def _get_terminate_primitive_params(self, seq, vnf_index):
1789 primitive = seq.get('name')
1790 primitive_params = {}
1791 params = {
1792 "member_vnf_index": vnf_index,
1793 "primitive": primitive,
1794 "primitive_params": primitive_params,
1795 }
1796 desc_params = {}
1797 return self._map_primitive_params(seq, params, desc_params)
1798
1799 # sub-operations
1800
1801 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1802 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1803 if (op.get('operationState') == 'COMPLETED'):
1804 # b. Skip sub-operation
1805 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1806 return self.SUBOPERATION_STATUS_SKIP
1807 else:
1808 # c. Reintent executing sub-operation
1809 # The sub-operation exists, and operationState != 'COMPLETED'
1810 # Update operationState = 'PROCESSING' to indicate a reintent.
1811 operationState = 'PROCESSING'
1812 detailed_status = 'In progress'
1813 self._update_suboperation_status(
1814 db_nslcmop, op_index, operationState, detailed_status)
1815 # Return the sub-operation index
1816 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1817 # with arguments extracted from the sub-operation
1818 return op_index
1819
1820 # Find a sub-operation where all keys in a matching dictionary must match
1821 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1822 def _find_suboperation(self, db_nslcmop, match):
1823 if (db_nslcmop and match):
1824 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1825 for i, op in enumerate(op_list):
1826 if all(op.get(k) == match[k] for k in match):
1827 return i
1828 return self.SUBOPERATION_STATUS_NOT_FOUND
1829
1830 # Update status for a sub-operation given its index
1831 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1832 # Update DB for HA tasks
1833 q_filter = {'_id': db_nslcmop['_id']}
1834 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1835 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1836 self.db.set_one("nslcmops",
1837 q_filter=q_filter,
1838 update_dict=update_dict,
1839 fail_on_empty=False)
1840
1841 # Add sub-operation, return the index of the added sub-operation
1842 # Optionally, set operationState, detailed-status, and operationType
1843 # Status and type are currently set for 'scale' sub-operations:
1844 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1845 # 'detailed-status' : status message
1846 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1847 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1848 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1849 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1850 RO_nsr_id=None, RO_scaling_info=None):
1851 if not (db_nslcmop):
1852 return self.SUBOPERATION_STATUS_NOT_FOUND
1853 # Get the "_admin.operations" list, if it exists
1854 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1855 op_list = db_nslcmop_admin.get('operations')
1856 # Create or append to the "_admin.operations" list
1857 new_op = {'member_vnf_index': vnf_index,
1858 'vdu_id': vdu_id,
1859 'vdu_count_index': vdu_count_index,
1860 'primitive': primitive,
1861 'primitive_params': mapped_primitive_params}
1862 if operationState:
1863 new_op['operationState'] = operationState
1864 if detailed_status:
1865 new_op['detailed-status'] = detailed_status
1866 if operationType:
1867 new_op['lcmOperationType'] = operationType
1868 if RO_nsr_id:
1869 new_op['RO_nsr_id'] = RO_nsr_id
1870 if RO_scaling_info:
1871 new_op['RO_scaling_info'] = RO_scaling_info
1872 if not op_list:
1873 # No existing operations, create key 'operations' with current operation as first list element
1874 db_nslcmop_admin.update({'operations': [new_op]})
1875 op_list = db_nslcmop_admin.get('operations')
1876 else:
1877 # Existing operations, append operation to list
1878 op_list.append(new_op)
1879
1880 db_nslcmop_update = {'_admin.operations': op_list}
1881 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1882 op_index = len(op_list) - 1
1883 return op_index
1884
1885 # Helper methods for scale() sub-operations
1886
1887 # pre-scale/post-scale:
1888 # Check for 3 different cases:
1889 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1890 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1891 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1892 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1893 operationType, RO_nsr_id=None, RO_scaling_info=None):
1894 # Find this sub-operation
1895 if (RO_nsr_id and RO_scaling_info):
1896 operationType = 'SCALE-RO'
1897 match = {
1898 'member_vnf_index': vnf_index,
1899 'RO_nsr_id': RO_nsr_id,
1900 'RO_scaling_info': RO_scaling_info,
1901 }
1902 else:
1903 match = {
1904 'member_vnf_index': vnf_index,
1905 'primitive': vnf_config_primitive,
1906 'primitive_params': primitive_params,
1907 'lcmOperationType': operationType
1908 }
1909 op_index = self._find_suboperation(db_nslcmop, match)
1910 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1911 # a. New sub-operation
1912 # The sub-operation does not exist, add it.
1913 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1914 # The following parameters are set to None for all kind of scaling:
1915 vdu_id = None
1916 vdu_count_index = None
1917 vdu_name = None
1918 if (RO_nsr_id and RO_scaling_info):
1919 vnf_config_primitive = None
1920 primitive_params = None
1921 else:
1922 RO_nsr_id = None
1923 RO_scaling_info = None
1924 # Initial status for sub-operation
1925 operationState = 'PROCESSING'
1926 detailed_status = 'In progress'
1927 # Add sub-operation for pre/post-scaling (zero or more operations)
1928 self._add_suboperation(db_nslcmop,
1929 vnf_index,
1930 vdu_id,
1931 vdu_count_index,
1932 vdu_name,
1933 vnf_config_primitive,
1934 primitive_params,
1935 operationState,
1936 detailed_status,
1937 operationType,
1938 RO_nsr_id,
1939 RO_scaling_info)
1940 return self.SUBOPERATION_STATUS_NEW
1941 else:
1942 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1943 # or op_index (operationState != 'COMPLETED')
1944 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1945
1946 # Helper methods for terminate()
1947
1948 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1949 """ Create a primitive with params from VNFD
1950 Called from terminate() before deleting instance
1951 Calls action() to execute the primitive """
1952 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1953 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1954 db_vnfds = {}
1955 # Loop over VNFRs
1956 for vnfr in db_vnfrs_list:
1957 vnfd_id = vnfr["vnfd-id"]
1958 vnf_index = vnfr["member-vnf-index-ref"]
1959 if vnfd_id not in db_vnfds:
1960 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1961 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1962 db_vnfds[vnfd_id] = vnfd
1963 vnfd = db_vnfds[vnfd_id]
1964 if not self._has_terminate_config_primitive(vnfd):
1965 continue
1966 # Get the primitive's sorted sequence list
1967 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1968 for seq in seq_list:
1969 # For each sequence in list, get primitive and call _ns_execute_primitive()
1970 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1971 vnf_index, seq.get("name"))
1972 self.logger.debug(logging_text + step)
1973 # Create the primitive for each sequence, i.e. "primitive": "touch"
1974 primitive = seq.get('name')
1975 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1976 # The following 3 parameters are currently set to None for 'terminate':
1977 # vdu_id, vdu_count_index, vdu_name
1978 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1979 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1980 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1981 # Add sub-operation
1982 self._add_suboperation(db_nslcmop,
1983 nslcmop_id,
1984 vnf_index,
1985 vdu_id,
1986 vdu_count_index,
1987 vdu_name,
1988 primitive,
1989 mapped_primitive_params)
1990 # Sub-operations: Call _ns_execute_primitive() instead of action()
1991 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1992 # nsr_deployed = db_nsr["_admin"]["deployed"]
1993
1994 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1995 # nsr_id, nslcmop_terminate_action_id)
1996 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1997 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1998 # if result not in result_ok:
1999 # raise LcmException(
2000 # "terminate_primitive_action for vnf_member_index={}",
2001 # " primitive={} fails with error {}".format(
2002 # vnf_index, seq.get("name"), result_detail))
2003
2004 # TODO: find ee_id
2005 ee_id = None
2006 try:
2007 await self.n2vc.exec_primitive(
2008 ee_id=ee_id,
2009 primitive_name=primitive,
2010 params_dict=mapped_primitive_params
2011 )
2012 except Exception as e:
2013 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2014 raise LcmException(
2015 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2016 .format(vnf_index, seq.get("name"), e),
2017 )
2018
2019 async def terminate(self, nsr_id, nslcmop_id):
2020
2021 # Try to lock HA task here
2022 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2023 if not task_is_locked_by_me:
2024 return
2025
2026 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2027 self.logger.debug(logging_text + "Enter")
2028 db_nsr = None
2029 db_nslcmop = None
2030 exc = None
2031 failed_detail = [] # annotates all failed error messages
2032 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2033 "_admin.current-operation": nslcmop_id,
2034 "_admin.operation-type": "terminate"}
2035 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2036 db_nslcmop_update = {}
2037 nslcmop_operation_state = None
2038 autoremove = False # autoremove after terminated
2039 pending_tasks = []
2040 try:
2041 # wait for any previous tasks in process
2042 step = "Waiting for previous operations to terminate"
2043 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2044
2045 self._write_ns_status(
2046 nsr_id=nsr_id,
2047 ns_state="TERMINATING",
2048 current_operation="TERMINATING",
2049 current_operation_id=nslcmop_id
2050 )
2051
2052 step = "Getting nslcmop={} from db".format(nslcmop_id)
2053 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2054 step = "Getting nsr={} from db".format(nsr_id)
2055 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2056 # nsd = db_nsr["nsd"]
2057 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2058 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2059 return
2060 # #TODO check if VIM is creating and wait
2061 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2062 # Call internal terminate action
2063 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2064
2065 pending_tasks = []
2066
2067 db_nsr_update["operational-status"] = "terminating"
2068 db_nsr_update["config-status"] = "terminating"
2069
2070 # remove NS
2071 try:
2072 step = "delete execution environment"
2073 self.logger.debug(logging_text + step)
2074
2075 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2076 pending_tasks.append(task_delete_ee)
2077 except Exception as e:
2078 msg = "Failed while deleting NS in VCA: {}".format(e)
2079 self.logger.error(msg)
2080 failed_detail.append(msg)
2081
2082 try:
2083 # Delete from k8scluster
2084 step = "delete kdus"
2085 self.logger.debug(logging_text + step)
2086 # print(nsr_deployed)
2087 if nsr_deployed:
2088 for kdu in nsr_deployed.get("K8s", ()):
2089 kdu_instance = kdu.get("kdu-instance")
2090 if not kdu_instance:
2091 continue
2092 if kdu.get("k8scluster-type") == "chart":
2093 task_delete_kdu_instance = asyncio.ensure_future(
2094 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2095 kdu_instance=kdu_instance))
2096 elif kdu.get("k8scluster-type") == "juju":
2097 task_delete_kdu_instance = asyncio.ensure_future(
2098 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2099 kdu_instance=kdu_instance))
2100 else:
2101 self.error(logging_text + "Unknown k8s deployment type {}".
2102 format(kdu.get("k8scluster-type")))
2103 continue
2104 pending_tasks.append(task_delete_kdu_instance)
2105 except LcmException as e:
2106 msg = "Failed while deleting KDUs from NS: {}".format(e)
2107 self.logger.error(msg)
2108 failed_detail.append(msg)
2109
2110 # remove from RO
2111 RO_fail = False
2112
2113 # Delete ns
2114 RO_nsr_id = RO_delete_action = None
2115 if nsr_deployed and nsr_deployed.get("RO"):
2116 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2117 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2118 try:
2119 if RO_nsr_id:
2120 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2121 "Deleting ns from VIM"
2122 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2123 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2124 self.logger.debug(logging_text + step)
2125 desc = await self.RO.delete("ns", RO_nsr_id)
2126 RO_delete_action = desc["action_id"]
2127 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2128 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2129 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2130 if RO_delete_action:
2131 # wait until NS is deleted from VIM
2132 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2133 format(RO_nsr_id, RO_delete_action)
2134 detailed_status_old = None
2135 self.logger.debug(logging_text + step)
2136
2137 delete_timeout = 20 * 60 # 20 minutes
2138 while delete_timeout > 0:
2139 desc = await self.RO.show(
2140 "ns",
2141 item_id_name=RO_nsr_id,
2142 extra_item="action",
2143 extra_item_id=RO_delete_action)
2144 ns_status, ns_status_info = self.RO.check_action_status(desc)
2145 if ns_status == "ERROR":
2146 raise ROclient.ROClientException(ns_status_info)
2147 elif ns_status == "BUILD":
2148 detailed_status = step + "; {}".format(ns_status_info)
2149 elif ns_status == "ACTIVE":
2150 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2151 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2152 break
2153 else:
2154 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2155 if detailed_status != detailed_status_old:
2156 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2157 db_nsr_update["detailed-status"] = detailed_status
2158 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2159 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2160 await asyncio.sleep(5, loop=self.loop)
2161 delete_timeout -= 5
2162 else: # delete_timeout <= 0:
2163 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2164
2165 except ROclient.ROClientException as e:
2166 if e.http_code == 404: # not found
2167 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2168 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2169 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2170 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2171 elif e.http_code == 409: # conflict
2172 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2173 self.logger.debug(logging_text + failed_detail[-1])
2174 RO_fail = True
2175 else:
2176 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2177 self.logger.error(logging_text + failed_detail[-1])
2178 RO_fail = True
2179
2180 # Delete nsd
2181 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2182 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2183 try:
2184 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2185 "Deleting nsd from RO"
2186 await self.RO.delete("nsd", RO_nsd_id)
2187 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2188 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2189 except ROclient.ROClientException as e:
2190 if e.http_code == 404: # not found
2191 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2192 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2193 elif e.http_code == 409: # conflict
2194 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2195 self.logger.debug(logging_text + failed_detail[-1])
2196 RO_fail = True
2197 else:
2198 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2199 self.logger.error(logging_text + failed_detail[-1])
2200 RO_fail = True
2201
2202 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2203 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2204 if not vnf_deployed or not vnf_deployed["id"]:
2205 continue
2206 try:
2207 RO_vnfd_id = vnf_deployed["id"]
2208 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2209 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2210 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2211 await self.RO.delete("vnfd", RO_vnfd_id)
2212 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2213 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2214 except ROclient.ROClientException as e:
2215 if e.http_code == 404: # not found
2216 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2217 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2218 elif e.http_code == 409: # conflict
2219 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2220 self.logger.debug(logging_text + failed_detail[-1])
2221 else:
2222 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2223 self.logger.error(logging_text + failed_detail[-1])
2224
2225 if failed_detail:
2226 terminate_ok = False
2227 self.logger.error(logging_text + " ;".join(failed_detail))
2228 db_nsr_update["operational-status"] = "failed"
2229 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2230 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2231 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2232 db_nslcmop_update["statusEnteredTime"] = time()
2233 else:
2234 terminate_ok = True
2235 db_nsr_update["operational-status"] = "terminated"
2236 db_nsr_update["detailed-status"] = "Done"
2237 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2238 db_nslcmop_update["detailed-status"] = "Done"
2239 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2240 db_nslcmop_update["statusEnteredTime"] = time()
2241 if db_nslcmop["operationParams"].get("autoremove"):
2242 autoremove = True
2243
2244 except (ROclient.ROClientException, DbException, LcmException) as e:
2245 self.logger.error(logging_text + "Exit Exception {}".format(e))
2246 exc = e
2247 except asyncio.CancelledError:
2248 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2249 exc = "Operation was cancelled"
2250 except Exception as e:
2251 exc = traceback.format_exc()
2252 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2253 finally:
2254 if exc and db_nslcmop:
2255 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2256 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2257 db_nslcmop_update["statusEnteredTime"] = time()
2258 try:
2259 if db_nslcmop and db_nslcmop_update:
2260 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2261 if db_nsr:
2262 db_nsr_update["_admin.nslcmop"] = None
2263 db_nsr_update["_admin.current-operation"] = None
2264 db_nsr_update["_admin.operation-type"] = None
2265 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2266
2267 if terminate_ok:
2268 ns_state = "IDLE"
2269 error_description = None
2270 error_detail = None
2271 else:
2272 ns_state = "BROKEN"
2273 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2274 error_detail = "; ".join(failed_detail)
2275
2276 self._write_ns_status(
2277 nsr_id=nsr_id,
2278 ns_state=ns_state,
2279 current_operation="IDLE",
2280 current_operation_id=None,
2281 error_description=error_description,
2282 error_detail=error_detail
2283 )
2284
2285 except DbException as e:
2286 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2287 if nslcmop_operation_state:
2288 try:
2289 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2290 "operationState": nslcmop_operation_state,
2291 "autoremove": autoremove},
2292 loop=self.loop)
2293 except Exception as e:
2294 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2295
2296 # wait for pending tasks
2297 done = None
2298 pending = None
2299 if pending_tasks:
2300 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2301 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2302 if not pending:
2303 self.logger.debug(logging_text + 'All tasks finished...')
2304 else:
2305 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2306
2307 self.logger.debug(logging_text + "Exit")
2308 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2309
2310 @staticmethod
2311 def _map_primitive_params(primitive_desc, params, instantiation_params):
2312 """
2313 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2314 The default-value is used. If it is between < > it look for a value at instantiation_params
2315 :param primitive_desc: portion of VNFD/NSD that describes primitive
2316 :param params: Params provided by user
2317 :param instantiation_params: Instantiation params provided by user
2318 :return: a dictionary with the calculated params
2319 """
2320 calculated_params = {}
2321 for parameter in primitive_desc.get("parameter", ()):
2322 param_name = parameter["name"]
2323 if param_name in params:
2324 calculated_params[param_name] = params[param_name]
2325 elif "default-value" in parameter or "value" in parameter:
2326 if "value" in parameter:
2327 calculated_params[param_name] = parameter["value"]
2328 else:
2329 calculated_params[param_name] = parameter["default-value"]
2330 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2331 and calculated_params[param_name].endswith(">"):
2332 if calculated_params[param_name][1:-1] in instantiation_params:
2333 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2334 else:
2335 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2336 format(calculated_params[param_name], primitive_desc["name"]))
2337 else:
2338 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2339 format(param_name, primitive_desc["name"]))
2340
2341 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2342 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2343 width=256)
2344 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2345 calculated_params[param_name] = calculated_params[param_name][7:]
2346
2347 # add always ns_config_info if primitive name is config
2348 if primitive_desc["name"] == "config":
2349 if "ns_config_info" in instantiation_params:
2350 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2351 return calculated_params
2352
2353 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2354 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2355
2356 # find vca_deployed record for this action
2357 try:
2358 for vca_deployed in db_deployed["VCA"]:
2359 if not vca_deployed:
2360 continue
2361 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2362 continue
2363 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2364 continue
2365 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2366 continue
2367 break
2368 else:
2369 # vca_deployed not found
2370 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2371 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2372
2373 # get ee_id
2374 ee_id = vca_deployed.get("ee_id")
2375 if not ee_id:
2376 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2377 "execution environment"
2378 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2379
2380 if primitive == "config":
2381 primitive_params = {"params": primitive_params}
2382
2383 while retries >= 0:
2384 try:
2385 output = await self.n2vc.exec_primitive(
2386 ee_id=ee_id,
2387 primitive_name=primitive,
2388 params_dict=primitive_params
2389 )
2390 # execution was OK
2391 break
2392 except Exception as e:
2393 retries -= 1
2394 if retries >= 0:
2395 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2396 # wait and retry
2397 await asyncio.sleep(retries_interval, loop=self.loop)
2398 else:
2399 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2400
2401 return output, 'OK'
2402
2403 except Exception as e:
2404 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2405
2406 async def action(self, nsr_id, nslcmop_id):
2407
2408 # Try to lock HA task here
2409 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2410 if not task_is_locked_by_me:
2411 return
2412
2413 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2414 self.logger.debug(logging_text + "Enter")
2415 # get all needed from database
2416 db_nsr = None
2417 db_nslcmop = None
2418 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2419 "_admin.current-operation": nslcmop_id,
2420 "_admin.operation-type": "action"}
2421 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2422 db_nslcmop_update = {}
2423 nslcmop_operation_state = None
2424 nslcmop_operation_state_detail = None
2425 exc = None
2426 try:
2427 # wait for any previous tasks in process
2428 step = "Waiting for previous operations to terminate"
2429 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2430
2431 self._write_ns_status(
2432 nsr_id=nsr_id,
2433 ns_state=None,
2434 current_operation="RUNNING ACTION",
2435 current_operation_id=nslcmop_id
2436 )
2437
2438 step = "Getting information from database"
2439 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2440 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2441
2442 nsr_deployed = db_nsr["_admin"].get("deployed")
2443 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2444 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2445 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2446 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2447 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2448
2449 if vnf_index:
2450 step = "Getting vnfr from database"
2451 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2452 step = "Getting vnfd from database"
2453 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2454 else:
2455 if db_nsr.get("nsd"):
2456 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2457 else:
2458 step = "Getting nsd from database"
2459 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2460
2461 # for backward compatibility
2462 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2463 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2464 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2465 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2466
2467 primitive = db_nslcmop["operationParams"]["primitive"]
2468 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2469
2470 # look for primitive
2471 config_primitive_desc = None
2472 if vdu_id:
2473 for vdu in get_iterable(db_vnfd, "vdu"):
2474 if vdu_id == vdu["id"]:
2475 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2476 if config_primitive["name"] == primitive:
2477 config_primitive_desc = config_primitive
2478 break
2479 elif kdu_name:
2480 self.logger.debug(logging_text + "Checking actions in KDUs")
2481 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2482 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2483 if primitive_params:
2484 desc_params.update(primitive_params)
2485 # TODO Check if we will need something at vnf level
2486 index = 0
2487 for kdu in get_iterable(nsr_deployed, "K8s"):
2488 if kdu_name == kdu["kdu-name"]:
2489 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2490 "path": "_admin.deployed.K8s.{}".format(index)}
2491 if primitive == "upgrade":
2492 if desc_params.get("kdu_model"):
2493 kdu_model = desc_params.get("kdu_model")
2494 del desc_params["kdu_model"]
2495 else:
2496 kdu_model = kdu.get("kdu-model")
2497 parts = kdu_model.split(sep=":")
2498 if len(parts) == 2:
2499 kdu_model = parts[0]
2500
2501 if kdu.get("k8scluster-type") == "chart":
2502 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2503 kdu_instance=kdu.get("kdu-instance"),
2504 atomic=True, kdu_model=kdu_model,
2505 params=desc_params, db_dict=db_dict,
2506 timeout=300)
2507 elif kdu.get("k8scluster-type") == "juju":
2508 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2509 kdu_instance=kdu.get("kdu-instance"),
2510 atomic=True, kdu_model=kdu_model,
2511 params=desc_params, db_dict=db_dict,
2512 timeout=300)
2513
2514 else:
2515 msg = "k8scluster-type not defined"
2516 raise LcmException(msg)
2517
2518 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2519 break
2520 elif primitive == "rollback":
2521 if kdu.get("k8scluster-type") == "chart":
2522 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2523 kdu_instance=kdu.get("kdu-instance"),
2524 db_dict=db_dict)
2525 elif kdu.get("k8scluster-type") == "juju":
2526 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2527 kdu_instance=kdu.get("kdu-instance"),
2528 db_dict=db_dict)
2529 else:
2530 msg = "k8scluster-type not defined"
2531 raise LcmException(msg)
2532 break
2533 elif primitive == "status":
2534 if kdu.get("k8scluster-type") == "chart":
2535 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2536 kdu_instance=kdu.get("kdu-instance"))
2537 elif kdu.get("k8scluster-type") == "juju":
2538 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2539 kdu_instance=kdu.get("kdu-instance"))
2540 else:
2541 msg = "k8scluster-type not defined"
2542 raise LcmException(msg)
2543 break
2544 index += 1
2545
2546 else:
2547 raise LcmException("KDU '{}' not found".format(kdu_name))
2548 if output:
2549 db_nslcmop_update["detailed-status"] = output
2550 db_nslcmop_update["operationState"] = 'COMPLETED'
2551 db_nslcmop_update["statusEnteredTime"] = time()
2552 else:
2553 db_nslcmop_update["detailed-status"] = ''
2554 db_nslcmop_update["operationState"] = 'FAILED'
2555 db_nslcmop_update["statusEnteredTime"] = time()
2556 return
2557 elif vnf_index:
2558 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2559 if config_primitive["name"] == primitive:
2560 config_primitive_desc = config_primitive
2561 break
2562 else:
2563 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2564 if config_primitive["name"] == primitive:
2565 config_primitive_desc = config_primitive
2566 break
2567
2568 if not config_primitive_desc:
2569 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2570 format(primitive))
2571
2572 desc_params = {}
2573 if vnf_index:
2574 if db_vnfr.get("additionalParamsForVnf"):
2575 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2576 if vdu_id:
2577 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2578 if vdur.get("additionalParams"):
2579 desc_params = self._format_additional_params(vdur["additionalParams"])
2580 else:
2581 if db_nsr.get("additionalParamsForNs"):
2582 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2583
2584 # TODO check if ns is in a proper status
2585 output, detail = await self._ns_execute_primitive(
2586 db_deployed=nsr_deployed,
2587 member_vnf_index=vnf_index,
2588 vdu_id=vdu_id,
2589 vdu_name=vdu_name,
2590 vdu_count_index=vdu_count_index,
2591 primitive=primitive,
2592 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2593
2594 detailed_status = output
2595 if detail == 'OK':
2596 result = 'COMPLETED'
2597 else:
2598 result = 'FAILED'
2599
2600 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2601 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2602 db_nslcmop_update["statusEnteredTime"] = time()
2603 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2604 return # database update is called inside finally
2605
2606 except (DbException, LcmException) as e:
2607 self.logger.error(logging_text + "Exit Exception {}".format(e))
2608 exc = e
2609 except asyncio.CancelledError:
2610 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2611 exc = "Operation was cancelled"
2612 except Exception as e:
2613 exc = traceback.format_exc()
2614 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2615 finally:
2616 if exc and db_nslcmop:
2617 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2618 "FAILED {}: {}".format(step, exc)
2619 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2620 db_nslcmop_update["statusEnteredTime"] = time()
2621 try:
2622 if db_nslcmop_update:
2623 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2624 if db_nsr:
2625 db_nsr_update["_admin.nslcmop"] = None
2626 db_nsr_update["_admin.operation-type"] = None
2627 db_nsr_update["_admin.nslcmop"] = None
2628 db_nsr_update["_admin.current-operation"] = None
2629 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2630 self._write_ns_status(
2631 nsr_id=nsr_id,
2632 ns_state=None,
2633 current_operation="IDLE",
2634 current_operation_id=None
2635 )
2636 except DbException as e:
2637 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2638 self.logger.debug(logging_text + "Exit")
2639 if nslcmop_operation_state:
2640 try:
2641 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2642 "operationState": nslcmop_operation_state},
2643 loop=self.loop)
2644 except Exception as e:
2645 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2646 self.logger.debug(logging_text + "Exit")
2647 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2648 return nslcmop_operation_state, nslcmop_operation_state_detail
2649
2650 async def scale(self, nsr_id, nslcmop_id):
2651
2652 # Try to lock HA task here
2653 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2654 if not task_is_locked_by_me:
2655 return
2656
2657 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2658 self.logger.debug(logging_text + "Enter")
2659 # get all needed from database
2660 db_nsr = None
2661 db_nslcmop = None
2662 db_nslcmop_update = {}
2663 nslcmop_operation_state = None
2664 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2665 "_admin.current-operation": nslcmop_id,
2666 "_admin.operation-type": "scale"}
2667 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2668 exc = None
2669 # in case of error, indicates what part of scale was failed to put nsr at error status
2670 scale_process = None
2671 old_operational_status = ""
2672 old_config_status = ""
2673 vnfr_scaled = False
2674 try:
2675 # wait for any previous tasks in process
2676 step = "Waiting for previous operations to terminate"
2677 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2678
2679 self._write_ns_status(
2680 nsr_id=nsr_id,
2681 ns_state=None,
2682 current_operation="SCALING",
2683 current_operation_id=nslcmop_id
2684 )
2685
2686 step = "Getting nslcmop from database"
2687 self.logger.debug(step + " after having waited for previous tasks to be completed")
2688 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2689 step = "Getting nsr from database"
2690 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2691
2692 old_operational_status = db_nsr["operational-status"]
2693 old_config_status = db_nsr["config-status"]
2694 step = "Parsing scaling parameters"
2695 # self.logger.debug(step)
2696 db_nsr_update["operational-status"] = "scaling"
2697 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2698 nsr_deployed = db_nsr["_admin"].get("deployed")
2699
2700 #######
2701 nsr_deployed = db_nsr["_admin"].get("deployed")
2702 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2703 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2704 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2705 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2706 #######
2707
2708 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2709 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2710 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2711 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2712 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2713
2714 # for backward compatibility
2715 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2716 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2717 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2718 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2719
2720 step = "Getting vnfr from database"
2721 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2722 step = "Getting vnfd from database"
2723 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2724
2725 step = "Getting scaling-group-descriptor"
2726 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2727 if scaling_descriptor["name"] == scaling_group:
2728 break
2729 else:
2730 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2731 "at vnfd:scaling-group-descriptor".format(scaling_group))
2732
2733 # cooldown_time = 0
2734 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2735 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2736 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2737 # break
2738
2739 # TODO check if ns is in a proper status
2740 step = "Sending scale order to VIM"
2741 nb_scale_op = 0
2742 if not db_nsr["_admin"].get("scaling-group"):
2743 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2744 admin_scale_index = 0
2745 else:
2746 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2747 if admin_scale_info["name"] == scaling_group:
2748 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2749 break
2750 else: # not found, set index one plus last element and add new entry with the name
2751 admin_scale_index += 1
2752 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2753 RO_scaling_info = []
2754 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2755 if scaling_type == "SCALE_OUT":
2756 # count if max-instance-count is reached
2757 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2758 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2759 if nb_scale_op >= max_instance_count:
2760 raise LcmException("reached the limit of {} (max-instance-count) "
2761 "scaling-out operations for the "
2762 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2763
2764 nb_scale_op += 1
2765 vdu_scaling_info["scaling_direction"] = "OUT"
2766 vdu_scaling_info["vdu-create"] = {}
2767 for vdu_scale_info in scaling_descriptor["vdu"]:
2768 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2769 "type": "create", "count": vdu_scale_info.get("count", 1)})
2770 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2771
2772 elif scaling_type == "SCALE_IN":
2773 # count if min-instance-count is reached
2774 min_instance_count = 0
2775 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2776 min_instance_count = int(scaling_descriptor["min-instance-count"])
2777 if nb_scale_op <= min_instance_count:
2778 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2779 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2780 nb_scale_op -= 1
2781 vdu_scaling_info["scaling_direction"] = "IN"
2782 vdu_scaling_info["vdu-delete"] = {}
2783 for vdu_scale_info in scaling_descriptor["vdu"]:
2784 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2785 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2786 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2787
2788 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2789 vdu_create = vdu_scaling_info.get("vdu-create")
2790 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2791 if vdu_scaling_info["scaling_direction"] == "IN":
2792 for vdur in reversed(db_vnfr["vdur"]):
2793 if vdu_delete.get(vdur["vdu-id-ref"]):
2794 vdu_delete[vdur["vdu-id-ref"]] -= 1
2795 vdu_scaling_info["vdu"].append({
2796 "name": vdur["name"],
2797 "vdu_id": vdur["vdu-id-ref"],
2798 "interface": []
2799 })
2800 for interface in vdur["interfaces"]:
2801 vdu_scaling_info["vdu"][-1]["interface"].append({
2802 "name": interface["name"],
2803 "ip_address": interface["ip-address"],
2804 "mac_address": interface.get("mac-address"),
2805 })
2806 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2807
2808 # PRE-SCALE BEGIN
2809 step = "Executing pre-scale vnf-config-primitive"
2810 if scaling_descriptor.get("scaling-config-action"):
2811 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2812 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2813 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2814 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2815 step = db_nslcmop_update["detailed-status"] = \
2816 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2817
2818 # look for primitive
2819 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2820 if config_primitive["name"] == vnf_config_primitive:
2821 break
2822 else:
2823 raise LcmException(
2824 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2825 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2826 "primitive".format(scaling_group, config_primitive))
2827
2828 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2829 if db_vnfr.get("additionalParamsForVnf"):
2830 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2831
2832 scale_process = "VCA"
2833 db_nsr_update["config-status"] = "configuring pre-scaling"
2834 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2835
2836 # Pre-scale reintent check: Check if this sub-operation has been executed before
2837 op_index = self._check_or_add_scale_suboperation(
2838 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2839 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2840 # Skip sub-operation
2841 result = 'COMPLETED'
2842 result_detail = 'Done'
2843 self.logger.debug(logging_text +
2844 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2845 vnf_config_primitive, result, result_detail))
2846 else:
2847 if (op_index == self.SUBOPERATION_STATUS_NEW):
2848 # New sub-operation: Get index of this sub-operation
2849 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2850 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2851 format(vnf_config_primitive))
2852 else:
2853 # Reintent: Get registered params for this existing sub-operation
2854 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2855 vnf_index = op.get('member_vnf_index')
2856 vnf_config_primitive = op.get('primitive')
2857 primitive_params = op.get('primitive_params')
2858 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2859 format(vnf_config_primitive))
2860 # Execute the primitive, either with new (first-time) or registered (reintent) args
2861 result, result_detail = await self._ns_execute_primitive(
2862 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2863 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2864 vnf_config_primitive, result, result_detail))
2865 # Update operationState = COMPLETED | FAILED
2866 self._update_suboperation_status(
2867 db_nslcmop, op_index, result, result_detail)
2868
2869 if result == "FAILED":
2870 raise LcmException(result_detail)
2871 db_nsr_update["config-status"] = old_config_status
2872 scale_process = None
2873 # PRE-SCALE END
2874
2875 # SCALE RO - BEGIN
2876 # Should this block be skipped if 'RO_nsr_id' == None ?
2877 # if (RO_nsr_id and RO_scaling_info):
2878 if RO_scaling_info:
2879 scale_process = "RO"
2880 # Scale RO reintent check: Check if this sub-operation has been executed before
2881 op_index = self._check_or_add_scale_suboperation(
2882 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2883 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2884 # Skip sub-operation
2885 result = 'COMPLETED'
2886 result_detail = 'Done'
2887 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2888 result, result_detail))
2889 else:
2890 if (op_index == self.SUBOPERATION_STATUS_NEW):
2891 # New sub-operation: Get index of this sub-operation
2892 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2893 self.logger.debug(logging_text + "New sub-operation RO")
2894 else:
2895 # Reintent: Get registered params for this existing sub-operation
2896 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2897 RO_nsr_id = op.get('RO_nsr_id')
2898 RO_scaling_info = op.get('RO_scaling_info')
2899 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2900 vnf_config_primitive))
2901
2902 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2903 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2904 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2905 # wait until ready
2906 RO_nslcmop_id = RO_desc["instance_action_id"]
2907 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2908
2909 RO_task_done = False
2910 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2911 detailed_status_old = None
2912 self.logger.debug(logging_text + step)
2913
2914 deployment_timeout = 1 * 3600 # One hour
2915 while deployment_timeout > 0:
2916 if not RO_task_done:
2917 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2918 extra_item_id=RO_nslcmop_id)
2919 ns_status, ns_status_info = self.RO.check_action_status(desc)
2920 if ns_status == "ERROR":
2921 raise ROclient.ROClientException(ns_status_info)
2922 elif ns_status == "BUILD":
2923 detailed_status = step + "; {}".format(ns_status_info)
2924 elif ns_status == "ACTIVE":
2925 RO_task_done = True
2926 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2927 self.logger.debug(logging_text + step)
2928 else:
2929 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2930 else:
2931
2932 if ns_status == "ERROR":
2933 raise ROclient.ROClientException(ns_status_info)
2934 elif ns_status == "BUILD":
2935 detailed_status = step + "; {}".format(ns_status_info)
2936 elif ns_status == "ACTIVE":
2937 step = detailed_status = \
2938 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2939 if not vnfr_scaled:
2940 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2941 vnfr_scaled = True
2942 try:
2943 desc = await self.RO.show("ns", RO_nsr_id)
2944 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2945 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2946 break
2947 except LcmExceptionNoMgmtIP:
2948 pass
2949 else:
2950 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2951 if detailed_status != detailed_status_old:
2952 self._update_suboperation_status(
2953 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2954 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2955 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2956
2957 await asyncio.sleep(5, loop=self.loop)
2958 deployment_timeout -= 5
2959 if deployment_timeout <= 0:
2960 self._update_suboperation_status(
2961 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2962 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2963
2964 # update VDU_SCALING_INFO with the obtained ip_addresses
2965 if vdu_scaling_info["scaling_direction"] == "OUT":
2966 for vdur in reversed(db_vnfr["vdur"]):
2967 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2968 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2969 vdu_scaling_info["vdu"].append({
2970 "name": vdur["name"],
2971 "vdu_id": vdur["vdu-id-ref"],
2972 "interface": []
2973 })
2974 for interface in vdur["interfaces"]:
2975 vdu_scaling_info["vdu"][-1]["interface"].append({
2976 "name": interface["name"],
2977 "ip_address": interface["ip-address"],
2978 "mac_address": interface.get("mac-address"),
2979 })
2980 del vdu_scaling_info["vdu-create"]
2981
2982 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2983 # SCALE RO - END
2984
2985 scale_process = None
2986 if db_nsr_update:
2987 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2988
2989 # POST-SCALE BEGIN
2990 # execute primitive service POST-SCALING
2991 step = "Executing post-scale vnf-config-primitive"
2992 if scaling_descriptor.get("scaling-config-action"):
2993 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2994 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2995 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2996 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2997 step = db_nslcmop_update["detailed-status"] = \
2998 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
2999
3000 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3001 if db_vnfr.get("additionalParamsForVnf"):
3002 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3003
3004 # look for primitive
3005 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3006 if config_primitive["name"] == vnf_config_primitive:
3007 break
3008 else:
3009 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3010 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3011 "match any vnf-configuration:config-primitive".format(scaling_group,
3012 config_primitive))
3013 scale_process = "VCA"
3014 db_nsr_update["config-status"] = "configuring post-scaling"
3015 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3016
3017 # Post-scale reintent check: Check if this sub-operation has been executed before
3018 op_index = self._check_or_add_scale_suboperation(
3019 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3020 if op_index == self.SUBOPERATION_STATUS_SKIP:
3021 # Skip sub-operation
3022 result = 'COMPLETED'
3023 result_detail = 'Done'
3024 self.logger.debug(logging_text +
3025 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3026 format(vnf_config_primitive, result, result_detail))
3027 else:
3028 if op_index == self.SUBOPERATION_STATUS_NEW:
3029 # New sub-operation: Get index of this sub-operation
3030 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3031 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3032 format(vnf_config_primitive))
3033 else:
3034 # Reintent: Get registered params for this existing sub-operation
3035 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3036 vnf_index = op.get('member_vnf_index')
3037 vnf_config_primitive = op.get('primitive')
3038 primitive_params = op.get('primitive_params')
3039 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3040 format(vnf_config_primitive))
3041 # Execute the primitive, either with new (first-time) or registered (reintent) args
3042 result, result_detail = await self._ns_execute_primitive(
3043 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3044 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3045 vnf_config_primitive, result, result_detail))
3046 # Update operationState = COMPLETED | FAILED
3047 self._update_suboperation_status(
3048 db_nslcmop, op_index, result, result_detail)
3049
3050 if result == "FAILED":
3051 raise LcmException(result_detail)
3052 db_nsr_update["config-status"] = old_config_status
3053 scale_process = None
3054 # POST-SCALE END
3055
3056 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3057 db_nslcmop_update["statusEnteredTime"] = time()
3058 db_nslcmop_update["detailed-status"] = "done"
3059 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3060 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3061 else old_operational_status
3062 db_nsr_update["config-status"] = old_config_status
3063 return
3064 except (ROclient.ROClientException, DbException, LcmException) as e:
3065 self.logger.error(logging_text + "Exit Exception {}".format(e))
3066 exc = e
3067 except asyncio.CancelledError:
3068 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3069 exc = "Operation was cancelled"
3070 except Exception as e:
3071 exc = traceback.format_exc()
3072 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3073 finally:
3074 if exc:
3075 if db_nslcmop:
3076 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3077 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3078 db_nslcmop_update["statusEnteredTime"] = time()
3079 if db_nsr:
3080 db_nsr_update["operational-status"] = old_operational_status
3081 db_nsr_update["config-status"] = old_config_status
3082 db_nsr_update["detailed-status"] = ""
3083 db_nsr_update["_admin.nslcmop"] = None
3084 if scale_process:
3085 if "VCA" in scale_process:
3086 db_nsr_update["config-status"] = "failed"
3087 if "RO" in scale_process:
3088 db_nsr_update["operational-status"] = "failed"
3089 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3090 exc)
3091 try:
3092 if db_nslcmop and db_nslcmop_update:
3093 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3094 if db_nsr:
3095 db_nsr_update["_admin.current-operation"] = None
3096 db_nsr_update["_admin.operation-type"] = None
3097 db_nsr_update["_admin.nslcmop"] = None
3098 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3099
3100 self._write_ns_status(
3101 nsr_id=nsr_id,
3102 ns_state=None,
3103 current_operation="IDLE",
3104 current_operation_id=None
3105 )
3106
3107 except DbException as e:
3108 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3109 if nslcmop_operation_state:
3110 try:
3111 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3112 "operationState": nslcmop_operation_state},
3113 loop=self.loop)
3114 # if cooldown_time:
3115 # await asyncio.sleep(cooldown_time, loop=self.loop)
3116 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3117 except Exception as e:
3118 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3119 self.logger.debug(logging_text + "Exit")
3120 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")